PySpark の基本
この記事では、PySpark の簡単な例で PySpark の使用方法を説明します。 これは、基本的なApache Spark概念を理解し、コンピュートに接続されたDatabricksノートブックでコマンドを実行していることを前提としています。サンプル データを使用して DataFrames を作成し、このデータに対して行操作や列操作などの基本的な変換を実行し、複数の DataFrames を組み合わせてこのデータを集計し、このデータを視覚化して、テーブルまたはファイルに保存します。
データのアップロード
この記事の一部の例では Databricksが提供するサンプル データを使用して、 DataFrames を使用してデータを読み込み、変換、および保存する方法を示します。 まだ Databricks にない独自のデータを使用する場合は、最初にデータをアップロードし、そこから DataFrame を作成できます。 「ファイルのアップロードを使用してテーブルを作成または変更する」と「Unity Catalog ボリュームにファイルをアップロードする」を参照してください。
Databricks のサンプル データについて
Databricks では、 samples
カタログと /databricks-datasets
ディレクトリにサンプル データが用意されています。
samples
カタログのサンプル・データにアクセスするには、samples.<schema-name>.<table-name>
の形式を使用します。この記事では、架空のビジネスのデータを含むsamples.tpch
スキーマのテーブルを使用します。customer
テーブルには顧客に関する情報が含まれ、orders
にはそれらの顧客による注文に関する情報が含まれます。dbutils.fs.ls
を使用して、/databricks-datasets
のデータを探索します。Spark SQL または DataFrames を使用して、ファイル パスを使用してこの場所のデータをクエリします。 Databricks が提供するサンプル データの詳細については、「 サンプル データセット」を参照してください。
データ・タイプのインポート
多くの PySpark 操作では、SQL 関数を使用するか、ネイティブの Spark タイプと対話する必要があります。 必要な関数と型のみを直接インポートすることも、モジュール全体をインポートすることもできます。
# import all
from pyspark.sql.types import *
from pyspark.sql.functions import *
# import select functions and types
from pyspark.sql.types import IntegerType, StringType
from pyspark.sql.functions import floor, round
インポートされた関数の中には Python の組み込み関数をオーバーライドするものもあるため、一部のユーザーはエイリアスを使用してこれらのモジュールをインポートすることを選択します。 次の例は、Apache Spark のコード例で使用される一般的なエイリアスを示しています。
import pyspark.sql.types as T
import pyspark.sql.functions as F
データ型の包括的な一覧については、「 Spark データ型」を参照してください。
PySpark SQL 関数の包括的な一覧については、「 Spark 関数」を参照してください。
DataFrame の作成
DataFrame を作成するには、いくつかの方法があります。 通常、 DataFrame は、テーブルやファイルのコレクションなどのデータソースに対して定義します。 次に、「Apache Spark の基本概念」セクションで説明されているように、 display
などのアクションを使用して、実行する変換をトリガーします。 display
メソッドは DataFramesを出力します。
指定した値で DataFrame を作成する
指定された値を持つ DataFrame を作成するには、 createDataFrame
メソッドを使用します。ここで、行はタプルのリストとして表されます。
df_children = spark.createDataFrame(
data = [("Mikhail", 15), ("Zaky", 13), ("Zoya", 8)],
schema = ['name', 'age'])
display(df_children)
出力では、 df_children
の列のデータ型が自動的に推論されることに注意してください。 または、スキーマを追加してタイプを指定することもできます。 スキーマは、名前、データ型、およびnull値が含まれているかどうかを示すブールフラグを指定するStructFields
で構成されるStructType
を使用して定義されます。データ型は pyspark.sql.types
からインポートする必要があります。
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
df_children_with_schema = spark.createDataFrame(
data = [("Mikhail", 15), ("Zaky", 13), ("Zoya", 8)],
schema = StructType([
StructField('name', StringType(), True),
StructField('age', IntegerType(), True)
])
)
display(df_children_with_schema)
Unity Catalog のテーブルから DataFrame を作成する
Unity Catalog のテーブルから DataFrame を作成するには、 table
メソッドを使用してテーブルを <catalog-name>.<schema-name>.<table-name>
形式で識別します。 左側のナビゲーションバーにある 「カタログ 」をクリックして、 カタログエクスプローラー を使用してテーブルに移動します。 それをクリックし、[ テーブル パスのコピー ] を選択して、テーブル パスをノートブックに挿入します。
次の例では、テーブル samples.tpch.customer
をロードしますが、独自のテーブルへのパスを指定することもできます。
df_customer = spark.table('samples.tpch.customer')
display(df_customer)
アップロードしたファイルから DataFrame を作成する
Unity Catalog ボリュームにアップロードしたファイルから DataFrame を作成するには、 read
プロパティを使用します。 このメソッドは、 DataFrameReader
を返し、これを使用して適切な形式を読み取ることができます。 左側の小さなサイドバーにあるカタログオプションをクリックし、カタログブラウザを使用してファイルを見つけます。 それを選択し、[ ボリューム ファイル パスのコピー ] をクリックします。
以下の例は *.csv
ファイルから読み取りますが、 DataFrameReader
は他の多くの形式でのファイルのアップロードをサポートしています。 「DataFrameReader メソッド」を参照してください。
# Assign this variable your full volume file path
volume_file_path = ""
df_csv = (spark.read
.format("csv")
.option("header", True)
.option("inferSchema", True)
.load(volume_file_path)
)
display(df_csv)
Unity Catalog ボリュームの詳細については、「Unity Catalog ボリュームとは」を参照してください。
JSON 応答から DataFrame を作成する
REST API によって返された JSON 応答ペイロードから DataFrame を作成するには、Python requests
パッケージを使用して応答のクエリと解析を行います。 パッケージを使用するには、パッケージをインポートする必要があります。 この例では、米国食品医薬品局(FDA)の医薬品アプリケーションデータベースのデータを使用します。
import requests
# Download data from URL
url = "https://api.fda.gov/drug/drugsfda.json?limit=100"
response = requests.get(url)
# Create the DataFrame
df_drugs = spark.createDataFrame(response.json()["results"])
display(df_drugs)
で JSONおよびその他の半構造化データの操作に関する情報については、「Databricks 半構造化データのモデル化 」を参照してください。
JSON フィールドまたはオブジェクトを選択します
変換された JSON から特定のフィールドまたはオブジェクトを選択するには、 []
表記を使用します。 たとえば、それ自体が製品の配列である products
フィールドを選択するには、次のようにします。
display(df_drugs.select(df_drugs["products"]))
また、メソッド呼び出しをチェーンして、複数のフィールドを走査することもできます。 たとえば、医薬品アプリケーションの最初の製品のブランド名を出力するには、次のようにします。
display(df_drugs.select(df_drugs["products"][0]["brand_name"]))
ファイルからの DataFrame の作成
ファイルから DataFrame を作成することを示すために、この例では CSV データを /databricks-datasets
ディレクトリに読み込みます。
サンプル データセットに移動するには、 Databricks Utilties ファイル システム コマンドを使用できます。 次の例では、 dbutils
を使用して、 /databricks-datasets
で使用可能なデータセットを一覧表示します。
display(dbutils.fs.ls('/databricks-datasets'))
または、次の例に示すように、 %fs
を使用して Databricks CLI ファイル システム コマンドにアクセスすることもできます。
%fs ls '/databricks-datasets'
ファイルまたはファイルのディレクトリから DataFrame を作成するには、 load
メソッドでパスを指定します。
df_population = (spark.read
.format("csv")
.option("header", True)
.option("inferSchema", True)
.load("/databricks-datasets/samples/population-vs-price/data_geo.csv")
)
display(df_population)
DataFrames によるデータの変換
DataFrames を使用すると、データの並べ替え、フィルター処理、集計を行う組み込みのメソッドを使用して、データを簡単に変換できます。 多くの変換は DataFramesのメソッドとして指定されていませんが、代わりに spark.sql.functions
パッケージで提供されます。 「Databricks Spark SQL 関数」を参照してください。
列の操作
Spark には、多くの基本的な列操作が用意されています。
DataFrame のすべての列を出力するには、 columns
を使用します (例: df_customer.columns
)。
列の選択
select
と col
を使用して特定の列を選択できます。col
関数は pyspark.sql.functions
サブモジュールにあります。
from pyspark.sql.functions import col
df_customer.select(
col("c_custkey"),
col("c_acctbal")
)
また、文字列として定義された式を受け取る expr
を使用して列を参照することもできます。
from pyspark.sql.functions import expr
df_customer.select(
expr("c_custkey"),
expr("c_acctbal")
)
また、SQL 式を受け入れる selectExpr
を使用することもできます。
df_customer.selectExpr(
"c_custkey as key",
"round(c_acctbal) as account_rounded"
)
文字列リテラルを使用して列を選択するには、次の操作を行います。
df_customer.select(
"c_custkey",
"c_acctbal"
)
特定の DataFrame から列を明示的に選択するには、 []
演算子または .
演算子を使用できます。 ( .
演算子を使用して、整数で始まる列、またはスペースや特殊文字を含む列を選択することはできません。 これは、一部の列が同じ名前を持つ DataFrames を結合する場合に特に役立ちます。
df_customer.select(
df_customer["c_custkey"],
df_customer["c_acctbal"]
)
df_customer.select(
df_customer.c_custkey,
df_customer.c_acctbal
)
列の作成
新しい列を作成するには、 withColumn
方法を使用します。 次の例では、顧客アカウント残高 c_acctbal
が 1000
を超えているかどうかに基づいて、ブール値を含む新しい列を作成します。
df_customer_flag = df_customer.withColumn("balance_flag", col("c_acctbal") > 1000)
列の名前を変更する
列の名前を変更するには、既存の列名と新しい列名を受け付ける withColumnRenamed
メソッドを使います。
df_customer_flag_renamed = df_customer_flag.withColumnRenamed("balance_flag", "balance_flag_renamed")
alias
方法は、集計の一部として列の名前を変更する場合に特に役立ちます。
from pyspark.sql.functions import avg
df_segment_balance = df_customer.groupBy("c_mktsegment").agg(
avg(df_customer["c_acctbal"]).alias("avg_account_balance")
)
display(df_segment_balance)
キャスト列のタイプ
場合によっては、DataFrame の 1 つ以上の列のデータ型を変更したい場合があります。 これを行うには、 cast
メソッドを使用して列データ型間で変換します。 次の例は、 col
メソッドを使用して列を参照し、列を整数型から文字列型に変換する方法を示しています。
from pyspark.sql.functions import col
df_casted = df_customer.withColumn("c_custkey", col("c_custkey").cast(StringType()))
print(type(df_casted))
列の削除
列を削除するには、選択または select(*) except
中に列を省略するか、 drop
方法を使用できます。
df_customer_flag_renamed.drop("balance_flag_renamed")
一度に複数の列を削除することもできます。
df_customer_flag_renamed.drop("c_phone", "balance_flag_renamed")
行操作
Spark には、多くの基本的な行操作が用意されています。
行をフィルタリングする
行をフィルター処理するには、DataFrame で filter
メソッドまたは where
メソッドを使用して、特定の行のみを返します。 フィルター処理する列を識別するには、 col
メソッドまたは列に評価される式を使用します。
from pyspark.sql.functions import col
df_that_one_customer = df_customer.filter(col("c_custkey") == 412449)
複数の条件でフィルタリングするには、論理演算子を使用します。 たとえば、 &
と |
を使用すると、それぞれ条件を AND
および OR
できます。 次の例では、 c_nationkey
が 20
に等しく、 c_acctbal
が 1000
より大きい行をフィルタリングします。
df_customer.filter((col("c_nationkey") == 20) & (col("c_acctbal") > 1000))
df_filtered_customer = df_customer.filter((col("c_custkey") == 412446) | (col("c_custkey") == 412447))
重複する行を削除する
行の重複を排除するには、 distinct
を使用して、一意の行のみを返します。
df_unique = df_customer.distinct()
null 値を処理する
null 値を処理するには、 na.drop
メソッドを使用して null 値を含む行を削除します。 このメソッドでは、 any
null 値を含む行を削除するか、null 値 all
行を削除するかを指定できます。
null 値を削除するには、次のいずれかの例を使用します。
df_customer_no_nulls = df_customer.na.drop()
df_customer_no_nulls = df_customer.na.drop("any")
代わりに、すべての null 値を含む行のみをフィルターで除外する場合は、次を使用します。
df_customer_no_nulls = df_customer.na.drop("all")
これを適用するには、次に示すように、これを指定します。
df_customer_no_nulls = df_customer.na.drop("all", subset=["c_acctbal", "c_custkey"])
欠損値を埋めるには、 fill
方法を使用します。 これをすべての列に適用するか、列のサブセットに適用するかを選択できます。 次の例では、アカウント残高 c_acctbal
にnull値を持つアカウント残高には 0
が入力されています。
df_customer_filled = df_customer.na.fill("0", subset=["c_acctbal"])
文字列を他の値に置き換えるには、 replace
メソッドを使用します。 次の例では、空のアドレス文字列が UNKNOWN
という単語に置き換えられています。
df_customer_phone_filled = df_customer.na.replace([""], ["UNKNOWN"], subset=["c_phone"])
行の追加
行を追加するには、 union
メソッドを使用して新しいDataFrameを作成する必要があります。 次の例では、以前に作成した DataFrame df_that_one_customer
と df_filtered_customer
が組み合わされ、3 人の顧客を含む DataFrame が返されます。
df_appended_rows = df_that_one_customer.union(df_filtered_customer)
display(df_appended_rows)
また、 DataFrames をテーブルに書き込んでから新しい行を追加することで、を組み合わせることもできます。 本番運用ワークロードの場合、データソースをターゲットテーブルに増分処理すると、データのサイズが大きくなるにつれてレイテンシーとコンピュートのコストを大幅に削減できます。 「Databricks レイクハウスへのデータの取り込み」を参照してください。
行をソートする
並べ替えは大規模にコストがかかる場合があり、並べ替えられたデータを格納し、Spark でデータを再読み込みすると、順序は保証されません。 仕分けの使用に意図的であることを確認してください。
1 つ以上の列で行を並べ替えるには、 sort
または orderBy
の方法を使用します。 デフォルトでは、これらのメソッドは昇順でソートされます。
df_customer.orderBy(col("c_acctbal"))
降順でフィルタリングするには、次の desc
を使用します。
df_customer.sort(col("c_custkey").desc())
次の例は、2 つの列で並べ替える方法を示しています。
df_sorted = df_customer.orderBy(col("c_acctbal").desc(), col("c_custkey").asc())
df_sorted = df_customer.sort(col("c_acctbal").desc(), col("c_custkey").asc())
DataFrame が並べ替えられた後に返される行数を制限するには、 limit
メソッドを使用します。 次の例では、上位 10
の結果のみを表示します。
display(df_sorted.limit(10))
ジョイン DataFrames
2 つ以上の DataFramesを結合するには、join
メソッドを使用します。 DataFramesの結合方法は、how
(結合タイプ) パラメーターと on
(結合の基になる列) パラメーターで指定できます。一般的な結合タイプには、次のものがあります。
inner
: これは結合タイプ デフォルトであり、 全体でDataFrameon
パラメーターに一致する行のみを保持するDataFrames を返します。left
: 最初に指定した DataFrame のすべての行と、2 番目に指定した DataFrame の行のうち、最初の DataFrame と一致する行のみが保持されます。outer
: 外部ジョインは、一致に関係なく、両方の DataFrames のすべてのローを保持します。
結合の詳細については、「Databricksでの結合の操作」を参照してください。PySpark でサポートされている結合の一覧については、「 DataFrame 結合」を参照してください。
次の例では、 orders
DataFrame の各行が customers
DataFrame の対応する行と結合されている 1 つの DataFrame を返します。 内部結合は、すべての注文が 1 人の顧客に対応することが想定されるため、使用されます。
df_customer = spark.table('samples.tpch.customer')
df_order = spark.table('samples.tpch.orders')
df_joined = df_order.join(
df_customer,
on = df_order["o_custkey"] == df_customer["c_custkey"],
how = "inner"
)
display(df_joined)
複数の条件で結合するには、 &
や |
などのブール演算子を使用して、それぞれ AND
と OR
を指定します。 次の例では、 o_totalprice
が 500,000
より大きい行のみにフィルタする条件を追加します。
df_customer = spark.table('samples.tpch.customer')
df_order = spark.table('samples.tpch.orders')
df_complex_joined = df_order.join(
df_customer,
on = ((df_order["o_custkey"] == df_customer["c_custkey"]) & (df_order["o_totalprice"] > 500000)),
how = "inner"
)
display(df_complex_joined)
データの集計
DataFrame でデータを集計するには、SQL の GROUP BY
と同様に、 groupBy
メソッドを使用してグループ化する列を指定し、 agg
メソッドを使用して集計を指定します。 avg
、 sum
、 max
、 min
などの一般的な集計を pyspark.sql.functions
からインポートします。次の例は、市場セグメント別の平均顧客残高を示しています。
from pyspark.sql.functions import avg
# group by one column
df_segment_balance = df_customer.groupBy("c_mktsegment").agg(
avg(df_customer["c_acctbal"])
)
display(df_segment_balance)
from pyspark.sql.functions import avg
# group by two columns
df_segment_nation_balance = df_customer.groupBy("c_mktsegment", "c_nationkey").agg(
avg(df_customer["c_acctbal"])
)
display(df_segment_nation_balance)
一部の集計はアクションであり、計算をトリガーします。 この場合、結果を出力するために他のアクションを使用する必要はありません。
DataFrame の行をカウントするには、 count
メソッドを使用します。
df_customer.count()
呼び出しのチェーン
DataFrames を変換するメソッドは DataFramesを返し、アクションが呼び出されるまで Spark は変換に対して動作しません。この 遅延評価 は、利便性と読みやすさのために複数のメソッドをチェーンできることを意味します。 次の例は、フィルタリング、集計、および順序付けをチェーンする方法を示しています。
from pyspark.sql.functions import count
df_chained = (
df_order.filter(col("o_orderstatus") == "F")
.groupBy(col("o_orderpriority"))
.agg(count(col("o_orderkey")).alias("n_orders"))
.sort(col("n_orders").desc())
)
display(df_chained)
DataFrame を視覚化する
ノートブックで DataFrame を視覚化するには、DataFrame の左上にあるテーブルの横にある [+ ] 記号をクリックし、[ 視覚化 ] を選択して、DataFrame に基づいて 1 つ以上のグラフを追加します。 視覚化の詳細については、「 Databricks ノートブックでの視覚化」を参照してください。
display(df_order)
Databricks追加のビジュアライゼーションを実行するには、PandasAPI にSpark を使用することをお勧めします。.pandas_api()
このPandasAPI を使用すると、 の対応するSparkDataFrame にキャストできます。詳細については、「PandasAPI Spark」を参照してください。
データを保存する
データを変換したら、 DataFrameWriter
メソッドを使用してデータを保存できます。 これらのメソッドの完全な一覧は、 DataFrameWriter にあります。 次のセクションでは、DataFrame をテーブルおよびデータ ファイルのコレクションとして保存する方法を示します。
DataFrame をテーブルとして保存する
DataFrame を Unity Catalog のテーブルとして保存するには、 write.saveAsTable
メソッドを使用し、パスを の形式で指定します <catalog-name>.<schema-name>.<table-name>
.
df_joined.write.saveAsTable(f"{catalog_name}.{schema_name}.{table_name}")
DataFrame を CSV 形式で記述します
DataFrame を *.csv
形式に書き込むには、 write.csv
メソッドを使用して、形式とオプションを指定します。 デフォルトでは、指定したパスにデータが存在する場合、書き込み操作は失敗します。 次のいずれかのモードを指定して、別のアクションを実行できます。
overwrite
ターゲットパス内のすべての既存のデータをDataFrameの内容で上書きします。append
DataFrame の内容をターゲット パス内のデータに追加します。ignore
ターゲット・パスにデータが存在する場合、書き込みはサイレントに失敗します。
次の例は、DataFrame の内容を CSV ファイルとしてデータに上書きする方法を示しています。
# Assign this variable your file path
file_path = ""
(df_joined.write
.format("csv")
.mode("overwrite")
.write(file_path)
)
次のステップ
Databricks でより多くの Spark 機能を活用するには、以下を参照してください。