PySpark の基本
この記事では、PySpark の簡単な例で PySpark の使用方法を説明します。 これは、基本的なApache Spark概念を理解し、コンピュートに接続されたDatabricksノートブックでコマンドを実行していることを前提としています。サンプル データを使用して データフレーム を作成し、このデータに対して行操作や列操作などの基本的な変換を実行し、複数の データフレーム を組み合わせてこのデータを集計し、このデータを視覚化して、テーブルまたはファイルに保存します。
データのアップロード
この記事の一部の例では Databricksが提供するサンプル データを使用して、 データフレーム を使用してデータを読み込み、変換、および保存する方法を示します。 まだ Databricks にない独自のデータを使用する場合は、最初にデータをアップロードし、そこから データフレーム を作成できます。 「ファイルのアップロードを使用してテーブルを作成または変更する」と「Unity Catalog ボリュームにファイルをアップロードする」を参照してください。
Databricks のサンプル データについて
Databricks では、 samples カタログと /databricks-datasets ディレクトリにサンプル データが用意されています。
samplesカタログのサンプル・データにアクセスするには、samples.<schema-name>.<table-name>の形式を使用します。この記事では、架空のビジネスのデータを含むsamples.tpchスキーマのテーブルを使用します。customerテーブルには顧客に関する情報が含まれ、ordersにはそれらの顧客による注文に関する情報が含まれます。dbutils.fs.lsを使用して、/databricks-datasetsのデータを探索します。Spark SQL または データフレーム を使用して、ファイル パスを使用してこの場所のデータをクエリします。 Databricks が提供するサンプル データの詳細については、「 サンプル データセット」を参照してください。
データ・タイプのインポート
多くの PySpark 操作では、SQL 関数を使用するか、ネイティブの Spark タイプと対話する必要があります。必要な関数と型のみを直接インポートするか、Python 組み込み関数のオーバーライドを避けるために、共通のエイリアスを使用してこれらのモジュールをインポートします。
# import select functions and types
from pyspark.sql.types import IntegerType, StringType
from pyspark.sql.functions import floor, round
# import modules using an alias
import pyspark.sql.types as T
import pyspark.sql.functions as F
データ型の包括的な一覧については、「 Spark データ型」を参照してください。
PySpark SQL 関数の包括的な一覧については、「 Spark 関数」を参照してください。
データフレーム の作成
データフレーム を作成するには、いくつかの方法があります。 通常、 データフレーム は、テーブルやファイルのコレクションなどのデータソースに対して定義します。 次に、「Apache Spark の基本概念」セクションで説明されているように、 displayなどのアクションを使用して、実行する変換をトリガーします。 displayメソッドは データフレームを出力します。
指定した値で データフレーム を作成する
指定された値を持つ データフレーム を作成するには、 createDataFrame メソッドを使用します。ここで、行はタプルのリストとして表されます。
df_children = spark.createDataFrame(
data = [("Mikhail", 15), ("Zaky", 13), ("Zoya", 8)],
schema = ['name', 'age'])
display(df_children)
出力では、 df_children の列のデータ型が自動的に推論されることに注意してください。 または、スキーマを追加してタイプを指定することもできます。 スキーマは、名前、データ型、およびnull値が含まれているかどうかを示すブールフラグを指定するStructFieldsで構成されるStructTypeを使用して定義されます。データ型は pyspark.sql.typesからインポートする必要があります。
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
df_children_with_schema = spark.createDataFrame(
data = [("Mikhail", 15), ("Zaky", 13), ("Zoya", 8)],
schema = StructType([
StructField('name', StringType(), True),
StructField('age', IntegerType(), True)
])
)
display(df_children_with_schema)
Unity Catalog のテーブルから データフレーム を作成する
Unity Catalog のテーブルから データフレーム を作成するには、 table メソッドを使用してテーブルを <catalog-name>.<schema-name>.<table-name>形式で識別します。 左側のナビゲーションバーにある 「カタログ 」をクリックして、 カタログエクスプローラー を使用してテーブルに移動します。 それをクリックし、[ テーブル パスのコピー ] を選択して、テーブル パスをノートブックに挿入します。
次の例では、テーブル samples.tpch.customerをロードしますが、独自のテーブルへのパスを指定することもできます。
df_customer = spark.table('samples.tpch.customer')
display(df_customer)
アップロードしたファイルから データフレーム を作成する
Unity Catalog ボリュームにアップロードしたファイルから データフレーム を作成するには、 read プロパティを使用します。 このメソッドは、 DataFrameReaderを返し、これを使用して適切な形式を読み取ることができます。 左側の小さなサイドバーにあるカタログオプションをクリックし、カタログブラウザを使用してファイルを見つけます。 それを選択し、[ ボリューム ファイル パスのコピー ] をクリックします。
以下の例は *.csv ファイルから読み取りますが、 DataFrameReader は他の多くの形式でのファイルのアップロードをサポートしています。 データフレームReader メソッドを参照してください。
# Assign this variable your full volume file path
volume_file_path = ""
df_csv = (spark.read
.format("csv")
.option("header", True)
.option("inferSchema", True)
.load(volume_file_path)
)
display(df_csv)
Unity Catalog ボリュームの詳細については、「Unity Catalog ボリュームとは」を参照してください。
JSON 応答から データフレーム を作成する
REST API によって返された JSON 応答ペイロードから DataFrame を作成するには、Python requests パッケージを使用して応答のクエリと解析を行います。パッケージを使用するには、パッケージをインポートする必要があります。この例では、米国食品医薬品局(FDA)の医薬品アプリケーションデータベースのデータを使用します。
import requests
# Download data from URL
url = "https://api.fda.gov/drug/drugsfda.json?limit=100"
response = requests.get(url)
# Create the DataFrame
df_drugs = spark.createDataFrame(response.json()["results"])
display(df_drugs)
Databricksで JSONおよびその他の半構造化データの操作に関する情報については、「 半構造化データのモデル化」を参照してください。
JSON フィールドまたはオブジェクトを選択します
変換された JSON から特定のフィールドまたはオブジェクトを選択するには、 [] 表記を使用します。 たとえば、それ自体が製品の配列である products フィールドを選択するには、次のようにします。
display(df_drugs.select(df_drugs["products"]))
また、メソッド呼び出しをチェーンして、複数のフィールドを走査することもできます。 たとえば、医薬品アプリケーションの最初の製品のブランド名を出力するには、次のようにします。
display(df_drugs.select(df_drugs["products"][0]["brand_name"]))
ファイルからの データフレーム の作成
ファイルから データフレーム を作成することを示すために、この例では CSV データを /databricks-datasets ディレクトリに読み込みます。
サンプル データセットに移動するには、 Databricks Utilties ファイル システム コマンドを使用できます。 次の例では、 dbutils を使用して、 /databricks-datasetsで使用可能なデータセットを一覧表示します。
display(dbutils.fs.ls('/databricks-datasets'))
または、次の例に示すように、 %fs を使用して Databricks CLI ファイル システム コマンドにアクセスすることもできます。
%fs ls '/databricks-datasets'
ファイルまたはファイルのディレクトリから データフレーム を作成するには、 load メソッドでパスを指定します。
df_population = (spark.read
.format("csv")
.option("header", True)
.option("inferSchema", True)
.load("/databricks-datasets/samples/population-vs-price/data_geo.csv")
)
display(df_population)
データフレーム によるデータの変換
データフレーム を使用すると、データの並べ替え、フィルター処理、集計を行う組み込みのメソッドを使用して、データを簡単に変換できます。 多くの変換は データフレームのメソッドとして指定されていませんが、代わりに spark.sql.functions パッケージで提供されます。 Databricks Spark SQL 関数を参照してください。
列の操作
Spark には、多くの基本的な列操作が用意されています。
データフレーム のすべての列を出力するには、 columnsを使用します (例: df_customer.columns)。
列の選択
select と colを使用して特定の列を選択できます。col 関数は pyspark.sql.functions サブモジュールにあります。
from pyspark.sql.functions import col
df_customer.select(
col("c_custkey"),
col("c_acctbal")
)
また、文字列として定義された式を受け取る expr を使用して列を参照することもできます。
from pyspark.sql.functions import expr
df_customer.select(
expr("c_custkey"),
expr("c_acctbal")
)
また、SQL 式を受け入れる selectExprを使用することもできます。
df_customer.selectExpr(
"c_custkey as key",
"round(c_acctbal) as account_rounded"
)
文字列リテラルを使用して列を選択するには、次の操作を行います。
df_customer.select(
"c_custkey",
"c_acctbal"
)
特定の データフレーム から列を明示的に選択するには、 [] 演算子または . 演算子を使用できます。 ( . 演算子を使用して、整数で始まる列、またはスペースや特殊文字を含む列を選択することはできません。 これは、一部の列が同じ名前を持つ データフレーム を結合する場合に特に役立ちます。
df_customer.select(
df_customer["c_custkey"],
df_customer["c_acctbal"]
)
df_customer.select(
df_customer.c_custkey,
df_customer.c_acctbal
)
列の作成
新しい列を作成するには、 withColumn 方法を使用します。 次の例では、顧客アカウント残高 c_acctbal が 1000を超えているかどうかに基づいて、ブール値を含む新しい列を作成します。
df_customer_flag = df_customer.withColumn("balance_flag", col("c_acctbal") > 1000)
列の名前を変更する
列の名前を変更するには、既存の列名と新しい列名を受け付ける withColumnRenamed メソッドを使います。
df_customer_flag_renamed = df_customer_flag.withColumnRenamed("balance_flag", "balance_flag_renamed")
alias方法は、集計の一部として列の名前を変更する場合に特に役立ちます。
from pyspark.sql.functions import avg
df_segment_balance = df_customer.groupBy("c_mktsegment").agg(
avg(df_customer["c_acctbal"]).alias("avg_account_balance")
)
display(df_segment_balance)
キャスト列のタイプ
場合によっては、データフレーム の 1 つ以上の列のデータ型を変更したい場合があります。 これを行うには、 cast メソッドを使用して列データ型間で変換します。 次の例は、 col メソッドを使用して列を参照し、列を整数型から文字列型に変換する方法を示しています。
from pyspark.sql.functions import col
df_casted = df_customer.withColumn("c_custkey", col("c_custkey").cast(StringType()))
print(type(df_casted))
列の削除
列を削除するには、選択または select(*) except 中に列を省略するか、 drop 方法を使用できます。
df_customer_flag_renamed.drop("balance_flag_renamed")
一度に複数の列を削除することもできます。
df_customer_flag_renamed.drop("c_phone", "balance_flag_renamed")
行操作
Spark には、多くの基本的な行操作が用意されています。
行をフィルタリングする
行をフィルター処理するには、データフレーム で filter メソッドまたは where メソッドを使用して、特定の行のみを返します。 フィルター処理する列を識別するには、 col メソッドまたは列に評価される式を使用します。
from pyspark.sql.functions import col
df_that_one_customer = df_customer.filter(col("c_custkey") == 412449)
複数の条件でフィルタリングするには、論理演算子を使用します。 たとえば、 & と | を使用すると、それぞれ条件を AND および OR できます。 次の例では、 c_nationkey が 20 に等しく、 c_acctbal が 1000より大きい行をフィルタリングします。
df_customer.filter((col("c_nationkey") == 20) & (col("c_acctbal") > 1000))
df_filtered_customer = df_customer.filter((col("c_custkey") == 412446) | (col("c_custkey") == 412447))
重複する行を削除する
行の重複を排除するには、 distinctを使用して、一意の行のみを返します。
df_unique = df_customer.distinct()
null 値を処理する
null 値を処理するには、 na.drop メソッドを使用して null 値を含む行を削除します。 このメソッドでは、 any null 値を含む行を削除するか、null 値 all 行を削除するかを指定できます。
null 値を削除するには、次のいずれかの例を使用します。
df_customer_no_nulls = df_customer.na.drop()
df_customer_no_nulls = df_customer.na.drop("any")
代わりに、すべての null 値を含む行のみをフィルターで除外する場合は、次を使用します。
df_customer_no_nulls = df_customer.na.drop("all")
これを適用するには、次に示すように、これを指定します。
df_customer_no_nulls = df_customer.na.drop("all", subset=["c_acctbal", "c_custkey"])
欠損値を埋めるには、 fill 方法を使用します。 これをすべての列に適用するか、列のサブセットに適用するかを選択できます。 次の例では、アカウント残高 c_acctbal にnull値を持つアカウント残高には 0が入力されています。
df_customer_filled = df_customer.na.fill("0", subset=["c_acctbal"])
文字列を他の値に置き換えるには、 replace メソッドを使用します。 次の例では、空のアドレス文字列が UNKNOWNという単語に置き換えられています。
df_customer_phone_filled = df_customer.na.replace([""], ["UNKNOWN"], subset=["c_phone"])
行の追加
行を追加するには、 union メソッドを使用して新しいデータフレームを作成する必要があります。 次の例では、以前に作成した データフレーム df_that_one_customer と df_filtered_customer が組み合わされ、3 人の顧客を含む データフレーム が返されます。
df_appended_rows = df_that_one_customer.union(df_filtered_customer)
display(df_appended_rows)
また、 DataFrames をテーブルに書き込んでから新しい行を追加することで、を組み合わせることもできます。 本番運用ワークロードの場合、データソースをターゲットテーブルに増分処理すると、データのサイズが大きくなるにつれてレイテンシーとコンピュートのコストを大幅に削減できます。 Lakeflowコネクトの標準コネクタを参照してください。
行をソートする
並べ替えは大規模にコストがかかる場合があり、並べ替えられたデータを格納し、Spark でデータを再読み込みすると、順序は保証されません。 仕分けの使用に意図的であることを確認してください。
1 つ以上の列で行を並べ替えるには、 sort または orderBy の方法を使用します。 デフォルトでは、これらのメソッドは昇順でソートされます。
df_customer.orderBy(col("c_acctbal"))
降順でフィルタリングするには、次の descを使用します。
df_customer.sort(col("c_custkey").desc())
次の例は、2 つの列で並べ替える方法を示しています。
df_sorted = df_customer.orderBy(col("c_acctbal").desc(), col("c_custkey").asc())
df_sorted = df_customer.sort(col("c_acctbal").desc(), col("c_custkey").asc())
データフレーム が並べ替えられた後に返される行数を制限するには、 limit メソッドを使用します。 次の例では、上位 10 の結果のみを表示します。
display(df_sorted.limit(10))
データフレームのJOIN
2 つ以上の データフレームを結合するには、join メソッドを使用します。 データフレームの結合方法は、how (結合タイプ) パラメーターと on (結合の基になる列) パラメーターで指定できます。一般的な結合タイプには、次のものがあります。
inner: これは結合タイプ デフォルトであり、 全体でデータフレームonパラメーターに一致する行のみを保持するデータフレーム を返します。left: 最初に指定した データフレーム のすべての行と、2 番目に指定した データフレーム の行のうち、最初の データフレーム と一致する行のみが保持されます。outer: 外部ジョインは、一致に関係なく、両方の データフレーム のすべてのローを保持します。
結合の詳細については、「Databricksでの結合の操作」を参照してください。PySpark でサポートされている結合の一覧については、データフレームのJOINを参照してください。
次の例では、 orders データフレーム の各行が customers データフレーム の対応する行と結合されている 1 つの データフレーム を返します。 内部結合は、すべての注文が 1 人の顧客に対応することが想定されるため、使用されます。
df_customer = spark.table('samples.tpch.customer')
df_order = spark.table('samples.tpch.orders')
df_joined = df_order.join(
df_customer,
on = df_order["o_custkey"] == df_customer["c_custkey"],
how = "inner"
)
display(df_joined)
複数の条件で結合するには、 & や | などのブール演算子を使用して、それぞれ AND と ORを指定します。 次の例では、 o_totalprice が 500,000より大きい行のみにフィルタする条件を追加します。
df_customer = spark.table('samples.tpch.customer')
df_order = spark.table('samples.tpch.orders')
df_complex_joined = df_order.join(
df_customer,
on = ((df_order["o_custkey"] == df_customer["c_custkey"]) & (df_order["o_totalprice"] > 500000)),
how = "inner"
)
display(df_complex_joined)
データの集計
データフレーム でデータを集計するには、SQL の GROUP BY と同様に、 groupBy メソッドを使用してグループ化する列を指定し、 agg メソッドを使用して集計を指定します。 avg、 sum、 max、 min などの一般的な集計を pyspark.sql.functionsからインポートします。次の例は、市場セグメント別の平均顧客残高を示しています。
from pyspark.sql.functions import avg
# group by one column
df_segment_balance = df_customer.groupBy("c_mktsegment").agg(
avg(df_customer["c_acctbal"])
)
display(df_segment_balance)
from pyspark.sql.functions import avg
# group by two columns
df_segment_nation_balance = df_customer.groupBy("c_mktsegment", "c_nationkey").agg(
avg(df_customer["c_acctbal"])
)
display(df_segment_nation_balance)
一部の集計はアクションであり、計算をトリガーします。 この場合、結果を出力するために他のアクションを使用する必要はありません。
データフレーム の行をカウントするには、 count メソッドを使用します。
df_customer.count()
呼び出しのチェーン
データフレーム を変換するメソッドは データフレームを返し、アクションが呼び出されるまで Spark は変換に対して動作しません。この 遅延評価 は、利便性と読みやすさのために複数のメソッドをチェーンできることを意味します。 次の例は、フィルタリング、集計、および順序付けをチェーンする方法を示しています。
from pyspark.sql.functions import count
df_chained = (
df_order.filter(col("o_orderstatus") == "F")
.groupBy(col("o_orderpriority"))
.agg(count(col("o_orderkey")).alias("n_orders"))
.sort(col("n_orders").desc())
)
display(df_chained)
データフレーム を視覚化する
ノートブックで DataFrame を視覚化するには、DataFrame の左上にあるテーブルの横にある [+ ] 記号をクリックし、[ 視覚化 ] を選択して、DataFrame に基づいて 1 つ以上のグラフを追加します。視覚化の詳細については、「 Databricks ノートブックと SQL エディターでの視覚化」を参照してください。
display(df_order)
Databricks追加のビジュアライゼーションを実行するには、Pandas API on Spark を使用することをお勧めします。.pandas_api()を用いることで、Sparkデータフレームを対応するPandas APIにキャストすることができます。詳細については、「Pandas API on Spark」を参照してください。
データを保存する
データを変換したら、 DataFrameWriter メソッドを使用してデータを保存できます。 これらのメソッドの完全な一覧は、 データフレームWriter にあります。 次のセクションでは、データフレーム をテーブルおよびデータ ファイルのコレクションとして保存する方法を示します。
データフレーム をテーブルとして保存する
データフレーム を Unity Catalog のテーブルとして保存するには、 write.saveAsTable メソッドを使用し、パスを の形式で指定します <catalog-name>.<schema-name>.<table-name>.
df_joined.write.saveAsTable(f"{catalog_name}.{schema_name}.{table_name}")
データフレーム を CSV 形式で記述します
データフレーム を *.csv 形式に書き込むには、 write.csv メソッドを使用して、形式とオプションを指定します。 デフォルトでは、指定したパスにデータが存在する場合、書き込み操作は失敗します。 次のいずれかのモードを指定して、別のアクションを実行できます。
overwriteターゲットパス内のすべての既存のデータをデータフレームの内容で上書きします。appendデータフレーム の内容をターゲット パス内のデータに追加します。ignoreターゲット・パスにデータが存在する場合、書き込みはサイレントに失敗します。
次の例は、データフレーム の内容を CSV ファイルとしてデータに上書きする方法を示しています。
# Assign this variable your file path
file_path = ""
(df_joined.write
.format("csv")
.mode("overwrite")
.write(file_path)
)
次のステップ
Databricks でより多くの Spark 機能を活用するには、以下を参照してください。