PySparkの基礎

この記事では、PySpark の使用方法を説明するために簡単な例を紹介します。 基本的なApache Spark 概念 を理解しており、Databricks コンピュート に接続された でコマンドを実行していることを前提としています。サンプル データを使用してDataFramesを作成し、このデータに対して行と列の操作を含む基本的な変換を実行し、複数のDataFramesを結合してこのデータを集計し、このデータを視覚化して、テーブルまたはファイルに保存します。

データのアップロード

この記事の一部の例では、Databricks が提供するサンプル データを使用して、DataFrames を使用してデータを読み込み、変換、保存する方法を示します。 Databricks にまだ存在しない独自のデータを使用する場合は、まずそのデータをアップロードし、そこから DataFrame を作成できます。 「ファイルのアップロードを使用してテーブルを作成または変更する」および 「 Unity Catalogボリュームにファイルをアップロードする」を参照してください。

Databricksサンプルデータについて

Databricks は、 samplesカタログと/databricks-datasetsディレクトリにサンプル データを提供します。

  • samplesカタログのサンプル・データにアクセスするには、samples.<schema-name>.<table-name>という形式を使用します。この記事では、架空のビジネスのデータを含む samples.tpch スキーマのテーブルを使用します。 customerテーブルには顧客に関する情報が含まれており、 ordersそれらの顧客による注文に関する情報が含まれています。

  • dbutils.fs.ls を使用して、/databricks-datasets内のデータを探索します。ファイル パスを使用してこの場所のデータをクエリするには、Spark SQL または DataFrames を使用します。 Databricks が提供するサンプル データの詳細については、 「サンプル データセット」を参照してください。

データ型のインポート

多くの PySpark 操作では、SQL 関数を使用するか、ネイティブの Spark タイプと対話する必要があります。 必要な関数と型のみを直接インポートすることも、モジュール全体をインポートすることもできます。

# import all
from pyspark.sql.types import *
from pyspark.sql.functions import *

# import select functions and types
from pyspark.sql.types import IntegerType, StringType
from pyspark.sql.functions import floor, round

インポートされた関数の中には Python の組み込み関数をオーバーライドするものがある可能性があるため、一部のユーザーはエイリアスを使用してこれらのモジュールをインポートすることを選択します。 次の例は、Apache Spark コード例で使用される一般的なエイリアスを示しています。

import pyspark.sql.types as T
import pyspark.sql.functions as F

データ型の包括的なリストについては、 「Spark データ型」を参照してください。

PySpark SQL 関数の包括的なリストについては、 「Spark 関数」を参照してください。

DataFrameを作成する

DataFrame を作成する方法はいくつかあります。 通常、テーブルやファイルのコレクションなどのデータソースに対してDataFrameを定義します。 次に、Apache Spark の基本概念のセクションで説明されているように、 displayなどのアクションを使用して、変換の実行をトリガーします。 display メソッドはDataFramesを出力します。

指定された値を持つDataFrameを作成する

指定された値を持つ DataFrame を作成するには、行がタプルのリストとして表現されるcreateDataFrameメソッドを使用します。

df_children = spark.createDataFrame(
  data = [("Mikhail", 15), ("Zaky", 13), ("Zoya", 8)],
  schema = ['name', 'age'])
display(df_children)

出力では、 df_children の列のデータ型が自動的に推論されることに注意してください。 または、スキーマを追加してタイプを指定することもできます。 スキーマは、名前、データ型、および null 値が含まれているかどうかを示すBooleanフラグを指定する StructFields で構成される StructType を使用して定義されます。 データ型は pyspark.sql.typesからインポートする必要があります。

from pyspark.sql.types import StructType, StructField, StringType, IntegerType

df_children_with_schema = spark.createDataFrame(
  data = [("Mikhail", 15), ("Zaky", 13), ("Zoya", 8)],
  schema = StructType([
    StructField('name', StringType(), True),
    StructField('age', IntegerType(), True)
  ])
)
display(df_children_with_schema)

Unity Catalog のテーブルから DataFrame を作成する

Unity Catalog 内のテーブルから DataFrame を作成するには、形式<catalog-name>.<schema-name>.<table-name>を使用してテーブルを識別するtableメソッドを使用します。 左側のナビゲーションバーの「 カタログ」(Catalog ) をクリックし、「 カタログエクスプローラ」(Catalog Explorer ) を使用してテーブルに移動します。 それをクリックし、 「テーブル パスのコピー」を選択して、テーブル パスをノートブックに挿入します。

次の例では、テーブル samples.tpch.customerをロードしますが、独自のテーブルへのパスを指定することもできます。

df_customer = spark.table('samples.tpch.customer')
display(df_customer)

アップロードされたファイルからDataFrameを作成する

Unity Catalog ボリュームにアップロードしたファイルから DataFrame を作成するには、 readプロパティを使用します。 このメソッドは、 DataFrameReaderを返し、これを使用して適切な形式を読み取ることができます。 左側の小さなサイドバーにあるカタログオプションをクリックし、カタログブラウザを使用してファイルを見つけます。 それを選択し、[ ボリュームファイルパスのコピー]をクリックします。

以下の例は *.csv ファイルから読み取りますが、 DataFrameReader は他の多くの形式でのファイルのアップロードをサポートしています。 「DataFrameReader メソッド」を参照してください。

# Assign this variable your full volume file path
volume_file_path = ""

df_csv = (spark.read
  .format("csv")
  .option("header", True)
  .option("inferSchema", True)
  .load(volume_file_path)
)
display(df_csv)

Unity Catalogボリュームの詳細については、 Unity Catalogボリュームとは何ですか?」を参照してください。

JSONレスポンスからDataFrameを作成する

REST API によって返される JSON 応答ペイロードから DataFrame を作成するには、Python requestsパッケージを使用して応答をクエリおよび解析します。 使用するには、パッケージをインポートする必要があります。 この例では、米国食品医薬品局(FDA)の医薬品申請データベースのデータを使用します。

import requests

# Download data from URL
url = "https://api.fda.gov/drug/drugsfda.json?limit=100"
response = requests.get(url)

# Create the DataFrame
df_drugs = spark.createDataFrame(response.json()["results"])
display(df_drugs)

で JSONやその他の半構造化データを操作する方法については、Databricks 「半構造化データのモデル化」 を参照してください。

JSONフィールドまたはオブジェクトを選択

変換された JSON から特定のフィールドまたはオブジェクトを選択するには、 []表記を使用します。 たとえば、製品の配列であるproductsフィールドを選択するには、次のようにします。

display(df_drugs.select(df_drugs["products"]))

また、メソッド呼び出しを連結して、複数のフィールドをトラバースすることもできます。 たとえば、医薬品申請の最初の製品のブランド名を出力するには、次のようにします。

display(df_drugs.select(df_drugs["products"][0]["brand_name"]))

ファイルからDataFrameを作成する

ファイルから DataFrame を作成する方法を示すために、この例では、 /databricks-datasetsディレクトリに CSV データを読み込みます。

サンプル データセットに移動するには、 Databricks Utiltiesファイル システム コマンドを使用できます。 次の例では、 dbutilsを使用して/databricks-datasetsで使用可能なデータセットを一覧表示します。

display(dbutils.fs.ls('/databricks-datasets'))

または、次の例に示すように、 %fsを使用してDatabricks CLI ファイル システム コマンドにアクセスすることもできます。

%fs ls '/databricks-datasets'

ファイルまたはファイルのディレクトリから DataFrame を作成するには、 loadメソッドでパスを指定します。

df_population = (spark.read
  .format("csv")
  .option("header", True)
  .option("inferSchema", True)
  .load("/databricks-datasets/samples/population-vs-price/data_geo.csv")
)
display(df_population)

DataFramesでデータを変換する

DataFrames を使用すると、組み込みメソッドを使用してデータの並べ替え、フィルタリング、集計を行うことで、データを簡単に変換できます。 多くの変換は DataFrames のメソッドとして指定されていませんが、代わりにspark.sql.functionsパッケージで提供されています。 Databricks Spark SQL 関数を参照してください。

列操作

Spark は多くの基本的な列操作を提供します。

ヒント

DataFrame 内のすべての列を出力するには、 columnsを使用します (例: df_customer.columns

列の選択

selectcolを使用して特定の列を選択できます。col 関数は pyspark.sql.functions サブモジュールにあります。

from pyspark.sql.functions import col

df_customer.select(
  col("c_custkey"),
  col("c_acctbal")
)

文字列として定義された式を受け取るexprを使用して列を参照することもできます。

from pyspark.sql.functions import expr

df_customer.select(
  expr("c_custkey"),
  expr("c_acctbal")
)

SQL 式を受け入れるselectExprを使用することもできます。

df_customer.selectExpr(
  "c_custkey as key",
  "round(c_acctbal) as account_rounded"
)

文字列リテラルを使用して列を選択するには、次の手順を実行します。

df_customer.select(
  "c_custkey",
  "c_acctbal"
)

特定の DataFrame から列を明示的に選択するには、 []演算子または.演算子を使用できます。 ( . 演算子を使用して、整数で始まる列、またはスペースまたは特殊文字を含む列を選択することはできません。 これは、一部の列の名前が同じであるDataFramesを結合する場合に特に役立ちます。

df_customer.select(
  df_customer["c_custkey"],
  df_customer["c_acctbal"]
)
df_customer.select(
  df_customer.c_custkey,
  df_customer.c_acctbal
)

列の作成

新しい列を作成するには、 withColumn メソッドを使用します。 次の例では、顧客アカウント残高 c_acctbal1000 を超えているかどうかに基づいてBoolean値を含む新しい列を作成します。

df_customer_flag = df_customer.withColumn("balance_flag", col("c_acctbal") > 1000)

列の名前を変更する

列の名前を変更するには、既存の列名と新しい列名を受け入れる withColumnRenamed メソッドを使用します。

df_customer_flag_renamed = df_customer_flag.withColumnRenamed("balance_flag", "balance_flag_renamed")

alias メソッドは、集計の一部として列の名前を変更する場合に特に役立ちます。

from pyspark.sql.functions import avg

df_segment_balance = df_customer.groupBy("c_mktsegment").agg(
    avg(df_customer["c_acctbal"]).alias("avg_account_balance")
)

display(df_segment_balance)

キャスト列の型

場合によっては、DataFrame 内の 1 つ以上の列のデータ型を変更する必要があることがあります。 これを行うには、 cast メソッドを使用して列のデータ型を変換します。 次の例は、 colメソッドを使用して列を参照し、列を整数型から文字列型に変換する方法を示しています。

from pyspark.sql.functions import col

df_casted = df_customer.withColumn("c_custkey", col("c_custkey").cast(StringType()))
print(type(df_casted))

列の削除

列を削除するには、選択または select(*) except 中に列を省略するか、 drop メソッドを使用します。

df_customer_flag_renamed.drop("balance_flag_renamed")

複数の列を一度に削除することもできます。

df_customer_flag_renamed.drop("c_phone", "balance_flag_renamed")

行操作

Spark は多くの基本的な行操作を提供します。

行のフィルター

行をフィルタリングするには、DataFrame でfilterまたはwhereメソッドを使用して、特定の行のみを返します。 フィルター処理する列を指定するには、 col メソッドまたは列に評価される式を使用します。

from pyspark.sql.functions import col

df_that_one_customer = df_customer.filter(col("c_custkey") == 412449)

複数の条件でフィルター処理するには、論理演算子を使用します。 たとえば、 &| では、それぞれ条件 ANDOR できます。 次の例では、 c_nationkey20 で、 c_acctbal1000より大きい行をフィルター処理します。

df_customer.filter((col("c_nationkey") == 20) & (col("c_acctbal") > 1000))
df_filtered_customer = df_customer.filter((col("c_custkey") == 412446) | (col("c_custkey") == 412447))

重複する行を削除する

行の重複を排除するには、一意の行のみを返す distinctを使用します。

df_unique = df_customer.distinct()

null 値を処理する

NULL 値を処理するには、 na.drop メソッドを使用して NULL 値を含む行を削除します。 この方法では、 any NULL 値を含む行をドロップするか、 all NULL 値を含む行をドロップするかを指定できます。

null 値を削除するには、次のいずれかの例を使用します。

df_customer_no_nulls = df_customer.na.drop()
df_customer_no_nulls = df_customer.na.drop("any")

代わりに、すべて null 値を含む行のみを除外する場合は、以下を使用します。

df_customer_no_nulls = df_customer.na.drop("all")

次に示すように、これを指定することで、列のサブセットにこれを適用できます。

df_customer_no_nulls = df_customer.na.drop("all", subset=["c_acctbal", "c_custkey"])

欠損値を埋めるには、 fill メソッドを使用します。 これをすべての列に適用するか、列のサブセットに適用するかを選択できます。 以下の例では、アカウント残高c_acctbalが null 値であるアカウント残高は0で埋められます。

df_customer_filled = df_customer.na.fill("0", subset=["c_acctbal"])

文字列を他の値に置き換えるには、 replaceメソッドを使用します。 以下の例では、空のアドレス文字列は単語UNKNOWNに置き換えられます。

df_customer_phone_filled = df_customer.na.replace([""], ["UNKNOWN"], subset=["c_phone"])

行の追加

行を追加するには、 unionメソッドを使用して新しい DataFrame を作成する必要があります。 次の例では、以前に作成した DataFrame df_that_one_customerdf_filtered_customerが結合され、3 人の顧客を含む DataFrame が返されます。

df_appended_rows = df_that_one_customer.union(df_filtered_customer)

display(df_appended_rows)

注:

DataFrame をテーブルに書き込んでから新しい行を追加することで、 DataFramesを結合することもできます。 本番運用ワークロードの場合、データソースをターゲット テーブルに増分処理することで、データのサイズが増大してもレイテンシとコンピュート コストを大幅に削減できます。 「Databricks レイクハウスへのデータの取り込み」を参照してください。

行を並べ替える

重要

大規模なソートはコストがかかる可能性があり、ソートされたデータを保存して Spark でデータを再ロードすると、順序が保証されません。 並べ替えの使用は意図的に行ってください。

1 つ以上の列で行を並べ替えるには、 sort または orderBy メソッドを使用します。 デフォルトでは、これらのメソッドは昇順で並べ替えられます。

df_customer.orderBy(col("c_acctbal"))

降順でフィルタリングするには、 descを使用します。

df_customer.sort(col("c_custkey").desc())

次の例は、2 つの列で並べ替える方法を示しています。

df_sorted = df_customer.orderBy(col("c_acctbal").desc(), col("c_custkey").asc())
df_sorted = df_customer.sort(col("c_acctbal").desc(), col("c_custkey").asc())

DataFrame がソートされた後に返される行数を制限するには、 limitメソッドを使用します。 次の例では、上位 10 の結果のみを表示します。

display(df_sorted.limit(10))

DataFramesを結合する

2 つ以上のDataFramesを結合するには、join メソッドを使用します。 how (結合タイプ) と on (結合の基準となる列) 引数で、 DataFramesを結合する方法を指定できます。 一般的な結合タイプには、次のものがあります。

  • inner: これは結合タイプ デフォルト であり、DataFrame 全体で 引数に一致する行のみを保持するon DataFramesを返します。

  • left: 最初に指定した DataFrame のすべての行と、最初に指定した DataFrame と一致する 2 番目に指定した DataFrame の行のみが保持されます。

  • outer: 外部結合では、一致に関係なく、両方のDataFramesのすべての行が保持されます。

結合の詳細については、 Databricksでの結合の操作」を参照してください。 PySpark でサポートされている結合の一覧については、 「DataFrame 結合」を参照してください。

次の例では、 orders DataFrame の各行がcustomers DataFrame の対応する行と結合された単一の DataFrame を返します。 すべての注文が 1 人の顧客に対応することが予想されるため、内部結合が使用されます。

df_customer = spark.table('samples.tpch.customer')
df_order = spark.table('samples.tpch.orders')

df_joined = df_order.join(
  df_customer,
  on = df_order["o_custkey"] == df_customer["c_custkey"],
  how = "inner"
)

display(df_joined)

複数の条件で結合するには、&| などのBoolean演算子を使用して、それぞれ ANDOR を指定します。 次の例では、条件を追加し、 o_totalprice500,000より大きい行のみにフィルター処理します。

df_customer = spark.table('samples.tpch.customer')
df_order = spark.table('samples.tpch.orders')

df_complex_joined = df_order.join(
  df_customer,
  on = ((df_order["o_custkey"] == df_customer["c_custkey"]) & (df_order["o_totalprice"] > 500000)),
  how = "inner"
)

display(df_complex_joined)

データの集計

DataFrame 内のデータを集計するには、SQL のGROUP BYと同様に、 groupByメソッドを使用してグループ化する列を指定し、 aggメソッドを使用して集計を指定します。 avgsummaxmin などの一般的な集計を pyspark.sql.functionsからインポートします。次の例は、市場セグメント別の平均顧客残高を示しています。

from pyspark.sql.functions import avg

# group by one column
df_segment_balance = df_customer.groupBy("c_mktsegment").agg(
    avg(df_customer["c_acctbal"])
)

display(df_segment_balance)
from pyspark.sql.functions import avg

# group by two columns
df_segment_nation_balance = df_customer.groupBy("c_mktsegment", "c_nationkey").agg(
    avg(df_customer["c_acctbal"])
)

display(df_segment_nation_balance)

一部の集計はアクションであり、計算をトリガーします。 この場合、結果を出力するために他のアクションを使用する必要はありません。

DataFrame 内の行をカウントするには、 countメソッドを使用します。

df_customer.count()

呼び出しの連鎖

DataFrames を変換するメソッドは DataFrames を返しますが、Spark はアクションが呼び出されるまで変換を実行しません。 この 遅延評価 は、利便性と読みやすさのために複数のメソッドを連鎖させることができることを意味します。 次の例は、フィルター処理、集計、および順序付けを連鎖させる方法を示しています。

from pyspark.sql.functions import count

df_chained = (
    df_order.filter(col("o_orderstatus") == "F")
    .groupBy(col("o_orderpriority"))
    .agg(count(col("o_orderkey")).alias("n_orders"))
    .sort(col("n_orders").desc())
)

display(df_chained)

DataFrameを視覚化する

ノートブックで DataFrame を視覚化するには、DataFrame の左上にあるテーブルの横にある+記号をクリックし、 [視覚化]を選択して、DataFrame に基づいて 1 つ以上のグラフを追加します。 視覚化の詳細については、 「Databricks ノートブックの視覚化」を参照してください。

display(df_order)

追加の視覚化を実行するには、Databricks PandasAPI用のSpark の使用を推奨しています。.pandas_api()PandasAPIを使用すると、 の対応するSparkDataFrame にキャストできます。詳細については、PandasAPI 上のSpark を参照してください。

データを保存する

データを変換したら、 DataFrameWriter 方法を使用してデータを保存できます。 これらのメソッドの完全な一覧は、 DataFrameWriter にあります。 次のセクションでは、DataFrame をテーブルおよびデータ ファイルのコレクションとして保存する方法を示します。

DataFrameをテーブルとして保存する

DataFrame を Unity Catalog のテーブルとして保存するには、 write.saveAsTableメソッドを使用し、 <catalog-name>.<schema-name>.<table-name>形式でパスを指定します。

df_joined.write.saveAsTable(f"{catalog_name}.{schema_name}.{table_name}")

DataFrameをCSVとして書き込む

DataFrame を*.csv形式で書き込むには、形式とオプションを指定してwrite.csvメソッドを使用します。 デフォルトでは、指定されたパスにデータが存在する場合、書き込み操作は失敗します。 次のいずれかのモードを指定して、別のアクションを実行できます。

  • overwrite ターゲット パス内の既存のデータをすべて DataFrame の内容で上書きします。

  • append DataFrame の内容をターゲット パスのデータに追加します。

  • ignore ターゲット・パスにデータが存在する場合、書き込みは暗黙のうちに失敗します。

次の例は、DataFrame の内容を CSV ファイルとして上書きする方法を示しています。

# Assign this variable your file path
file_path = ""

(df_joined.write
  .format("csv")
  .mode("overwrite")
  .write(file_path)
)

次のステップ

Databricks でさらに多くの Spark 機能を活用するには、以下を参照してください。