メインコンテンツまでスキップ

PySpark の基本

この記事では、PySpark の簡単な例で PySpark の使用方法を説明します。 これは、基本的なApache Spark概念を理解し、コンピュートに接続されたDatabricksノートブックでコマンドを実行していることを前提としています。サンプル データを使用して DataFrames を作成し、このデータに対して行操作や列操作などの基本的な変換を実行し、複数の DataFrames を組み合わせてこのデータを集計し、このデータを視覚化して、テーブルまたはファイルに保存します。

データのアップロード

この記事の一部の例では Databricksが提供するサンプル データを使用して、 DataFrames を使用してデータを読み込み、変換、および保存する方法を示します。 まだ Databricks にない独自のデータを使用する場合は、最初にデータをアップロードし、そこから DataFrame を作成できます。 「ファイルのアップロードを使用してテーブルを作成または変更する」と「Unity Catalog ボリュームにファイルをアップロードする」を参照してください。

Databricks のサンプル データについて

Databricks では、 samples カタログと /databricks-datasets ディレクトリにサンプル データが用意されています。

  • samples カタログのサンプル・データにアクセスするには、samples.<schema-name>.<table-name>の形式を使用します。この記事では、架空のビジネスのデータを含む samples.tpch スキーマのテーブルを使用します。 customer テーブルには顧客に関する情報が含まれ、orders にはそれらの顧客による注文に関する情報が含まれます。
  • dbutils.fs.ls を使用して、/databricks-datasetsのデータを探索します。Spark SQL または DataFrames を使用して、ファイル パスを使用してこの場所のデータをクエリします。 Databricks が提供するサンプル データの詳細については、「 サンプル データセット」を参照してください。

データ・タイプのインポート

多くの PySpark 操作では、SQL 関数を使用するか、ネイティブの Spark タイプと対話する必要があります。 必要な関数と型のみを直接インポートすることも、モジュール全体をインポートすることもできます。

Python
# import all
from pyspark.sql.types import *
from pyspark.sql.functions import *

# import select functions and types
from pyspark.sql.types import IntegerType, StringType
from pyspark.sql.functions import floor, round

インポートされた関数の中には Python の組み込み関数をオーバーライドするものもあるため、一部のユーザーはエイリアスを使用してこれらのモジュールをインポートすることを選択します。 次の例は、Apache Spark のコード例で使用される一般的なエイリアスを示しています。

Python
import pyspark.sql.types as T
import pyspark.sql.functions as F

データ型の包括的な一覧については、「 Spark データ型」を参照してください。

PySpark SQL 関数の包括的な一覧については、「 Spark 関数」を参照してください。

DataFrame の作成

DataFrame を作成するには、いくつかの方法があります。 通常、 DataFrame は、テーブルやファイルのコレクションなどのデータソースに対して定義します。 次に、「Apache Spark の基本概念」セクションで説明されているように、 displayなどのアクションを使用して、実行する変換をトリガーします。 displayメソッドは DataFramesを出力します。

指定した値で DataFrame を作成する

指定された値を持つ DataFrame を作成するには、 createDataFrame メソッドを使用します。ここで、行はタプルのリストとして表されます。

Python
df_children = spark.createDataFrame(
data = [("Mikhail", 15), ("Zaky", 13), ("Zoya", 8)],
schema = ['name', 'age'])
display(df_children)

出力では、 df_children の列のデータ型が自動的に推論されることに注意してください。 または、スキーマを追加してタイプを指定することもできます。 スキーマは、名前、データ型、およびnull値が含まれているかどうかを示すブールフラグを指定するStructFieldsで構成されるStructTypeを使用して定義されます。データ型は pyspark.sql.typesからインポートする必要があります。

Python
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

df_children_with_schema = spark.createDataFrame(
data = [("Mikhail", 15), ("Zaky", 13), ("Zoya", 8)],
schema = StructType([
StructField('name', StringType(), True),
StructField('age', IntegerType(), True)
])
)
display(df_children_with_schema)

Unity Catalog のテーブルから DataFrame を作成する

Unity Catalog のテーブルから DataFrame を作成するには、 table メソッドを使用してテーブルを <catalog-name>.<schema-name>.<table-name>形式で識別します。 左側のナビゲーションバーにある 「カタログ 」をクリックして、 カタログエクスプローラー を使用してテーブルに移動します。 それをクリックし、[ テーブル パスのコピー ] を選択して、テーブル パスをノートブックに挿入します。

次の例では、テーブル samples.tpch.customerをロードしますが、独自のテーブルへのパスを指定することもできます。

Python
df_customer = spark.table('samples.tpch.customer')
display(df_customer)

アップロードしたファイルから DataFrame を作成する

Unity Catalog ボリュームにアップロードしたファイルから DataFrame を作成するには、 read プロパティを使用します。 このメソッドは、 DataFrameReaderを返し、これを使用して適切な形式を読み取ることができます。 左側の小さなサイドバーにあるカタログオプションをクリックし、カタログブラウザを使用してファイルを見つけます。 それを選択し、[ ボリューム ファイル パスのコピー ] をクリックします。

以下の例は *.csv ファイルから読み取りますが、 DataFrameReader は他の多くの形式でのファイルのアップロードをサポートしています。 「DataFrameReader メソッド」を参照してください。

Python
# Assign this variable your full volume file path
volume_file_path = ""

df_csv = (spark.read
.format("csv")
.option("header", True)
.option("inferSchema", True)
.load(volume_file_path)
)
display(df_csv)

Unity Catalog ボリュームの詳細については、「Unity Catalog ボリュームとは」を参照してください。

JSON 応答から DataFrame を作成する

REST API によって返された JSON 応答ペイロードから DataFrame を作成するには、Python requests パッケージを使用して応答のクエリと解析を行います。 パッケージを使用するには、パッケージをインポートする必要があります。 この例では、米国食品医薬品局(FDA)の医薬品アプリケーションデータベースのデータを使用します。

Python
import requests

# Download data from URL
url = "https://api.fda.gov/drug/drugsfda.json?limit=100"
response = requests.get(url)

# Create the DataFrame
df_drugs = spark.createDataFrame(response.json()["results"])
display(df_drugs)

で JSONおよびその他の半構造化データの操作に関する情報については、「Databricks 半構造化データのモデル化 」を参照してください。

JSON フィールドまたはオブジェクトを選択します

変換された JSON から特定のフィールドまたはオブジェクトを選択するには、 [] 表記を使用します。 たとえば、それ自体が製品の配列である products フィールドを選択するには、次のようにします。

Python
display(df_drugs.select(df_drugs["products"]))

また、メソッド呼び出しをチェーンして、複数のフィールドを走査することもできます。 たとえば、医薬品アプリケーションの最初の製品のブランド名を出力するには、次のようにします。

Python
display(df_drugs.select(df_drugs["products"][0]["brand_name"]))

ファイルからの DataFrame の作成

ファイルから DataFrame を作成することを示すために、この例では CSV データを /databricks-datasets ディレクトリに読み込みます。

サンプル データセットに移動するには、 Databricks Utilties ファイル システム コマンドを使用できます。 次の例では、 dbutils を使用して、 /databricks-datasetsで使用可能なデータセットを一覧表示します。

Python
display(dbutils.fs.ls('/databricks-datasets'))

または、次の例に示すように、 %fs を使用して Databricks CLI ファイル システム コマンドにアクセスすることもできます。

Python
%fs ls '/databricks-datasets'

ファイルまたはファイルのディレクトリから DataFrame を作成するには、 load メソッドでパスを指定します。

Python
df_population = (spark.read
.format("csv")
.option("header", True)
.option("inferSchema", True)
.load("/databricks-datasets/samples/population-vs-price/data_geo.csv")
)
display(df_population)

DataFrames によるデータの変換

DataFrames を使用すると、データの並べ替え、フィルター処理、集計を行う組み込みのメソッドを使用して、データを簡単に変換できます。 多くの変換は DataFramesのメソッドとして指定されていませんが、代わりに spark.sql.functions パッケージで提供されます。 「Databricks Spark SQL 関数」を参照してください。

列の操作

Spark には、多くの基本的な列操作が用意されています。

ヒント

DataFrame のすべての列を出力するには、 columnsを使用します (例: df_customer.columns)。

列の選択

selectcolを使用して特定の列を選択できます。col 関数は pyspark.sql.functions サブモジュールにあります。

Python
from pyspark.sql.functions import col

df_customer.select(
col("c_custkey"),
col("c_acctbal")
)

また、文字列として定義された式を受け取る expr を使用して列を参照することもできます。

Python
from pyspark.sql.functions import expr

df_customer.select(
expr("c_custkey"),
expr("c_acctbal")
)

また、SQL 式を受け入れる selectExprを使用することもできます。

Python
df_customer.selectExpr(
"c_custkey as key",
"round(c_acctbal) as account_rounded"
)

文字列リテラルを使用して列を選択するには、次の操作を行います。

Python
df_customer.select(
"c_custkey",
"c_acctbal"
)

特定の DataFrame から列を明示的に選択するには、 [] 演算子または . 演算子を使用できます。 ( . 演算子を使用して、整数で始まる列、またはスペースや特殊文字を含む列を選択することはできません。 これは、一部の列が同じ名前を持つ DataFrames を結合する場合に特に役立ちます。

Python
df_customer.select(
df_customer["c_custkey"],
df_customer["c_acctbal"]
)
Python
df_customer.select(
df_customer.c_custkey,
df_customer.c_acctbal
)

列の作成

新しい列を作成するには、 withColumn 方法を使用します。 次の例では、顧客アカウント残高 c_acctbal1000を超えているかどうかに基づいて、ブール値を含む新しい列を作成します。

Python
df_customer_flag = df_customer.withColumn("balance_flag", col("c_acctbal") > 1000)

列の名前を変更する

列の名前を変更するには、既存の列名と新しい列名を受け付ける withColumnRenamed メソッドを使います。

Python
df_customer_flag_renamed = df_customer_flag.withColumnRenamed("balance_flag", "balance_flag_renamed")

alias方法は、集計の一部として列の名前を変更する場合に特に役立ちます。

Python
from pyspark.sql.functions import avg

df_segment_balance = df_customer.groupBy("c_mktsegment").agg(
avg(df_customer["c_acctbal"]).alias("avg_account_balance")
)

display(df_segment_balance)

キャスト列のタイプ

場合によっては、DataFrame の 1 つ以上の列のデータ型を変更したい場合があります。 これを行うには、 cast メソッドを使用して列データ型間で変換します。 次の例は、 col メソッドを使用して列を参照し、列を整数型から文字列型に変換する方法を示しています。

Python
from pyspark.sql.functions import col

df_casted = df_customer.withColumn("c_custkey", col("c_custkey").cast(StringType()))
print(type(df_casted))

列の削除

列を削除するには、選択または select(*) except 中に列を省略するか、 drop 方法を使用できます。

Python
df_customer_flag_renamed.drop("balance_flag_renamed")

一度に複数の列を削除することもできます。

Python
df_customer_flag_renamed.drop("c_phone", "balance_flag_renamed")

行操作

Spark には、多くの基本的な行操作が用意されています。

行をフィルタリングする

行をフィルター処理するには、DataFrame で filter メソッドまたは where メソッドを使用して、特定の行のみを返します。 フィルター処理する列を識別するには、 col メソッドまたは列に評価される式を使用します。

Python
from pyspark.sql.functions import col

df_that_one_customer = df_customer.filter(col("c_custkey") == 412449)

複数の条件でフィルタリングするには、論理演算子を使用します。 たとえば、 &| を使用すると、それぞれ条件を AND および OR できます。 次の例では、 c_nationkey20 に等しく、 c_acctbal1000より大きい行をフィルタリングします。

Python
df_customer.filter((col("c_nationkey") == 20) & (col("c_acctbal") > 1000))
Python
df_filtered_customer = df_customer.filter((col("c_custkey") == 412446) | (col("c_custkey") == 412447))

重複する行を削除する

行の重複を排除するには、 distinctを使用して、一意の行のみを返します。

Python
df_unique = df_customer.distinct()

null 値を処理する

null 値を処理するには、 na.drop メソッドを使用して null 値を含む行を削除します。 このメソッドでは、 any null 値を含む行を削除するか、null 値 all 行を削除するかを指定できます。

null 値を削除するには、次のいずれかの例を使用します。

Python
df_customer_no_nulls = df_customer.na.drop()
df_customer_no_nulls = df_customer.na.drop("any")

代わりに、すべての null 値を含む行のみをフィルターで除外する場合は、次を使用します。

Python
df_customer_no_nulls = df_customer.na.drop("all")

これを適用するには、次に示すように、これを指定します。

Python
df_customer_no_nulls = df_customer.na.drop("all", subset=["c_acctbal", "c_custkey"])

欠損値を埋めるには、 fill 方法を使用します。 これをすべての列に適用するか、列のサブセットに適用するかを選択できます。 次の例では、アカウント残高 c_acctbal にnull値を持つアカウント残高には 0が入力されています。

Python
df_customer_filled = df_customer.na.fill("0", subset=["c_acctbal"])

文字列を他の値に置き換えるには、 replace メソッドを使用します。 次の例では、空のアドレス文字列が UNKNOWNという単語に置き換えられています。

Python
df_customer_phone_filled = df_customer.na.replace([""], ["UNKNOWN"], subset=["c_phone"])

行の追加

行を追加するには、 union メソッドを使用して新しいDataFrameを作成する必要があります。 次の例では、以前に作成した DataFrame df_that_one_customerdf_filtered_customer が組み合わされ、3 人の顧客を含む DataFrame が返されます。

Python
df_appended_rows = df_that_one_customer.union(df_filtered_customer)

display(df_appended_rows)
注記

また、 DataFrames をテーブルに書き込んでから新しい行を追加することで、を組み合わせることもできます。 本番運用ワークロードの場合、データソースをターゲットテーブルに増分処理すると、データのサイズが大きくなるにつれてレイテンシーとコンピュートのコストを大幅に削減できます。 「Databricks レイクハウスへのデータの取り込み」を参照してください。

行をソートする

important

並べ替えは大規模にコストがかかる場合があり、並べ替えられたデータを格納し、Spark でデータを再読み込みすると、順序は保証されません。 仕分けの使用に意図的であることを確認してください。

1 つ以上の列で行を並べ替えるには、 sort または orderBy の方法を使用します。 デフォルトでは、これらのメソッドは昇順でソートされます。

Python
df_customer.orderBy(col("c_acctbal"))

降順でフィルタリングするには、次の descを使用します。

Python
df_customer.sort(col("c_custkey").desc())

次の例は、2 つの列で並べ替える方法を示しています。

Python
df_sorted = df_customer.orderBy(col("c_acctbal").desc(), col("c_custkey").asc())
df_sorted = df_customer.sort(col("c_acctbal").desc(), col("c_custkey").asc())

DataFrame が並べ替えられた後に返される行数を制限するには、 limit メソッドを使用します。 次の例では、上位 10 の結果のみを表示します。

Python
display(df_sorted.limit(10))

ジョイン DataFrames

2 つ以上の DataFramesを結合するには、join メソッドを使用します。 DataFramesの結合方法は、how (結合タイプ) パラメーターと on (結合の基になる列) パラメーターで指定できます。一般的な結合タイプには、次のものがあります。

  • inner: これは結合タイプ デフォルトであり、 全体でDataFrame onパラメーターに一致する行のみを保持するDataFrames を返します。
  • left: 最初に指定した DataFrame のすべての行と、2 番目に指定した DataFrame の行のうち、最初の DataFrame と一致する行のみが保持されます。
  • outer: 外部ジョインは、一致に関係なく、両方の DataFrames のすべてのローを保持します。

結合の詳細については、「Databricksでの結合の操作」を参照してください。PySpark でサポートされている結合の一覧については、「 DataFrame 結合」を参照してください

次の例では、 orders DataFrame の各行が customers DataFrame の対応する行と結合されている 1 つの DataFrame を返します。 内部結合は、すべての注文が 1 人の顧客に対応することが想定されるため、使用されます。

Python
df_customer = spark.table('samples.tpch.customer')
df_order = spark.table('samples.tpch.orders')

df_joined = df_order.join(
df_customer,
on = df_order["o_custkey"] == df_customer["c_custkey"],
how = "inner"
)

display(df_joined)

複数の条件で結合するには、 &| などのブール演算子を使用して、それぞれ ANDORを指定します。 次の例では、 o_totalprice500,000より大きい行のみにフィルタする条件を追加します。

Python
df_customer = spark.table('samples.tpch.customer')
df_order = spark.table('samples.tpch.orders')

df_complex_joined = df_order.join(
df_customer,
on = ((df_order["o_custkey"] == df_customer["c_custkey"]) & (df_order["o_totalprice"] > 500000)),
how = "inner"
)

display(df_complex_joined)

データの集計

DataFrame でデータを集計するには、SQL の GROUP BY と同様に、 groupBy メソッドを使用してグループ化する列を指定し、 agg メソッドを使用して集計を指定します。 avgsummaxmin などの一般的な集計を pyspark.sql.functionsからインポートします。次の例は、市場セグメント別の平均顧客残高を示しています。

Python
from pyspark.sql.functions import avg

# group by one column
df_segment_balance = df_customer.groupBy("c_mktsegment").agg(
avg(df_customer["c_acctbal"])
)

display(df_segment_balance)
Python
from pyspark.sql.functions import avg

# group by two columns
df_segment_nation_balance = df_customer.groupBy("c_mktsegment", "c_nationkey").agg(
avg(df_customer["c_acctbal"])
)

display(df_segment_nation_balance)

一部の集計はアクションであり、計算をトリガーします。 この場合、結果を出力するために他のアクションを使用する必要はありません。

DataFrame の行をカウントするには、 count メソッドを使用します。

Python
df_customer.count()

呼び出しのチェーン

DataFrames を変換するメソッドは DataFramesを返し、アクションが呼び出されるまで Spark は変換に対して動作しません。この 遅延評価 は、利便性と読みやすさのために複数のメソッドをチェーンできることを意味します。 次の例は、フィルタリング、集計、および順序付けをチェーンする方法を示しています。

Python
from pyspark.sql.functions import count

df_chained = (
df_order.filter(col("o_orderstatus") == "F")
.groupBy(col("o_orderpriority"))
.agg(count(col("o_orderkey")).alias("n_orders"))
.sort(col("n_orders").desc())
)

display(df_chained)

DataFrame を視覚化する

ノートブックで DataFrame を視覚化するには、DataFrame の左上にあるテーブルの横にある [+ ] 記号をクリックし、[ 視覚化 ] を選択して、DataFrame に基づいて 1 つ以上のグラフを追加します。 視覚化の詳細については、「 Databricks ノートブックでの視覚化」を参照してください。

Python
display(df_order)

Databricks追加のビジュアライゼーションを実行するには、PandasAPI にSpark を使用することをお勧めします。.pandas_api()このPandasAPI を使用すると、 の対応するSparkDataFrame にキャストできます。詳細については、「PandasAPI Spark」を参照してください。

データを保存する

データを変換したら、 DataFrameWriter メソッドを使用してデータを保存できます。 これらのメソッドの完全な一覧は、 DataFrameWriter にあります。 次のセクションでは、DataFrame をテーブルおよびデータ ファイルのコレクションとして保存する方法を示します。

DataFrame をテーブルとして保存する

DataFrame を Unity Catalog のテーブルとして保存するには、 write.saveAsTable メソッドを使用し、パスを の形式で指定します <catalog-name>.<schema-name>.<table-name>.

Python
df_joined.write.saveAsTable(f"{catalog_name}.{schema_name}.{table_name}")

DataFrame を CSV 形式で記述します

DataFrame を *.csv 形式に書き込むには、 write.csv メソッドを使用して、形式とオプションを指定します。 デフォルトでは、指定したパスにデータが存在する場合、書き込み操作は失敗します。 次のいずれかのモードを指定して、別のアクションを実行できます。

  • overwrite ターゲットパス内のすべての既存のデータをDataFrameの内容で上書きします。
  • append DataFrame の内容をターゲット パス内のデータに追加します。
  • ignore ターゲット・パスにデータが存在する場合、書き込みはサイレントに失敗します。

次の例は、DataFrame の内容を CSV ファイルとしてデータに上書きする方法を示しています。

Python
# Assign this variable your file path
file_path = ""

(df_joined.write
.format("csv")
.mode("overwrite")
.write(file_path)
)

次のステップ

Databricks でより多くの Spark 機能を活用するには、以下を参照してください。