メインコンテンツまでスキップ

チュートリアル: Apache Spark データフレーム を使用してデータを読み込んで変換する

このチュートリアルでは、Databricks で Apache Spark Python (PySpark) データフレーム API、Apache Spark Scala データフレーム API、および SparkR Sparkデータフレーム API を使用してデータを読み込んで変換する方法を示します。

注記

Databricks無料版をご利用の場合は、このチュートリアルのすべてのコード例について 「Python」 タブを選択してください。無料版はR言語およびScala言語をサポートしていません。さらに、無料版ではインターネットへの外部アクセスが制限されているため、コードを使用してダウンロードするのではなく、ワークスペースのUIを使用してCSVファイルをアップロードする必要があります。詳細な手順についてはステップ 1 を参照してください。

このチュートリアルの最後には、データフレームとは何かを理解し、次のタスクに慣れることができます:

データフレームとは何ですか?

データフレームは、異なるタイプの列を持ちえる2次元のラベル付きデータ構造です。データフレームは、スプレッドシート、SQLテーブル、または複数のオブジェクトから成る辞書のようなものと考えることができます。Apache Sparkデータフレームは、一般的なデータ分析の問題を効率的に解決できる関数(列の選択、絞り込み、結合、集計)を多数提供します。

Apache Sparkデータフレームは、Resilient Distributed Datasets(RDD)の上に構築された抽象化です。SparkデータフレームとSpark SQLでは、統合された計画および最適化エンジンが使用されているため、Databricksでサポートされているすべての言語(Python、SQL、Scala、およびR)でほぼ同じパフォーマンスを得ることができます。

必要条件

以下のチュートリアルを完了するには、以下の条件を満たす必要があります:

  • このチュートリアルの例を使用するには、ワークスペースでUnity Catalog有効になっている必要があります。 Databricks Free Edition および無料トライアル ワークスペースでは、当然にUnity Catalog有効になっています。

  • このチュートリアルの例ではUnity Catalogボリュームを使用してサンプル データを保存します。 これらの例を使用するには、ボリュームを作成し、そのボリュームのカタログ名、スキーマ名、およびボリューム名を使用して、例で使用されるボリュームパスを設定します。無料版ユーザーは、デフォルトでワークスペースカタログとdefaultスキーマにアクセスできます。

  • Unity Catalog には、次のアクセス許可が必要です。

    • READ VOLUME このチュートリアルで使用するボリュームはWRITE VOLUMEです
    • USE SCHEMA このチュートリアルで使用するスキーマ
    • USE CATALOG このチュートリアルで使用したカタログ

    これらの権限を設定するには、 Databricks管理者またはUnity Catalog権限とセキュリティ保護可能なオブジェクトを参照してください。 無料版ユーザーは、ワークスペースカタログとdefaultスキーマに対して、デフォルトでこれらの権限を持っています。

ヒント

この記事の完成したノートブックについては、データフレーム チュートリアル ノートブックを参照してください。

ステップ1:変数を定義してCSVファイルをロードする

このステップでは、このチュートリアルで使用する変数を定義し、赤ちゃんの名前データを含むCSVファイルをhealth.data.ny.govからUnity Catalogボリュームにロードします。 Unity Catalogのカタログ名、スキーマ名、ボリューム名が必要です。

ヒント

カタログ名とスキーマ名がわからない場合は、こちらをクリックしてください。データアイコン。 カタログは サイドバーにあります。ワークスペースカタログはワークスペースと同じ名前で、カタログパネルに表示されます。展開すると、利用可能なスキーマが表示されます。無料版および無料トライアルのユーザーは、ワークスペース カタログとdefaultスキーマを使用できます。

ボリュームがない場合は、ノートブックのセルで次のコマンドを実行してボリュームを作成してください( <catalog_name><schema_name>実際の値に置き換えてください)。

SQL
CREATE VOLUME IF NOT EXISTS <catalog_name>.<schema_name>.my_volume
  1. 新しいノートブックを開くには、 新しいアイコン アイコンをクリックします。 ノートブック Databricks ナビゲートする方法については、「 ノートブックの外観をカスタマイズする」を参照してください。

  2. 以下のコードをコピーして、新しい空のノートブックセルに貼り付けてください。<catalog-name><schema-name><volume-name> Unity Catalogボリュームのカタログ名、スキーマ名、ボリューム名に置き換えてください。 <table_name>任意のテーブル名に置き換えてください。このチュートリアルでは後ほど、赤ちゃんの名前データをこのテーブルに読み込みます。

Python
catalog = "<catalog_name>"
schema = "<schema_name>"
volume = "<volume_name>"
download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
file_name = "rows.csv"
table_name = "<table_name>"
path_volume = "/Volumes/" + catalog + "/" + schema + "/" + volume
path_table = catalog + "." + schema
print(path_table) # Show the complete path
print(path_volume) # Show the complete path
  1. Shift+Enterを押すとセルが実行され、新しい空白のセルが作成されます。

  2. CSVファイルをボリュームにロードしてください。以下のいずれかの方法を選択してください。

    • アップロードするには UIDatabricks Free Editionを使用している場合、またはオプション B のコードダウンロードがネットワーク エラーで失敗した場合は、この方法を使用してください。 Free Edition およびその他のサーバレス コンピュート環境では、送信インターネット アクセスが制限されているため、ローカル マシンからファイルをアップロードする必要があります。
    • コードを使用してダウンロード — コンピュート環境に送信インターネット アクセスがある場合は、この方法を使用します。

    オプション A: ワークスペース UI を使用してアップロードする

    1. ローカルマシンで、ブラウザにhealth.data.ny.gov/api/views/jxy9-yhdk/rows.csvを開いてください。ファイルはrows.csvとしてコンピュータにダウンロードされます。これは、以前に定義された変数file_nameと一致します。
    2. Databricksのワークスペースに戻ってください。サイドバーで、新しいアイコン 新規 > データを追加またはアップロードします
    3. 「ボリュームにファイルをアップロード」 をクリックします。
    4. 「参照」 をクリックしてrows.csvファイルを選択するか、アップロードエリアにドラッグ&ドロップしてください。
    5. 「宛先ボリューム」 で、上記で指定したボリュームを選択してください。
    6. アップロードが完了したら、ノートブックに戻り、ステップ 2に進みます。

    ファイルのアップロードに関する詳細については、 Unity Catalogボリュームへのファイルのアップロード」を参照してください。

    オプションB:コードを使用してダウンロードする

    以下のコードをコピーして、新しい空のノートブックセルに貼り付けてください。Databricksこのコードは、 dbutils コマンドを使用して、 rows.csvhealth.data.ny.gov から ファイルを ボリュームにコピーします。Unity CatalogShift+Enterを押すとセルが実行され、次のセルに移動します。

Python
dbutils.fs.cp(f"{download_url}", f"{path_volume}/{file_name}")

ステップ 2: データフレーム を作成する

このステップでは、テストデータを含むdf1 という名前のデータフレームを作成し、その内容を表示します。

  1. 次のコードをコピーして、新しい空のノートブックセルに貼り付けます。このコードは、テストデータを使用してデータフレームを作成し、データフレームの内容とスキーマを表示します。
Python
data = [[2021, "test", "Albany", "M", 42]]
columns = ["Year", "First_Name", "County", "Sex", "Count"]

df1 = spark.createDataFrame(data, schema="Year int, First_Name STRING, County STRING, Sex STRING, Count int")
display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization.
# df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
  1. Shift+Enterを押してセルを実行し、次のセルに移動します。

ステップ 3: CSV ファイルから データフレーム にデータを読み込む

このステップでは、以前にUnity Catalog ボリュームにロードしたCSVファイルからdf_csvという名前のデータフレームを作成します。spark.read.csvを参照してください。

  1. 次のコードをコピーして、新しい空のノートブックセルに貼り付けます。このコードは、CSVファイルから赤ちゃんの名前データをデータフレーム df_csvに読み込み、データフレームの内容を表示します。
Python
df_csv = spark.read.csv(f"{path_volume}/{file_name}",
header=True,
inferSchema=True,
sep=",")
display(df_csv)
  1. Shift+Enterを押してセルを実行し、次のセルに移動します。

サポートされている多くのファイル形式からデータをロードできます。

ステップ 4: データフレーム を表示して操作する

次の方法を使用して、赤ちゃんの名前のデータフレームを表示および操作します。

データフレーム スキーマを印刷する

Apache Sparkデータフレームのスキーマを表示する方法を学習します。Apache Sparkでは、 スキーマ という用語を使用して、データフレーム内の列の名前とデータ型を示します。

注記

Databricksは、カタログに登録されたテーブルの集まりを表すのにも「スキーマ」という用語を使用します。

  1. 次のコードをコピーして、ノートブックの空のセルに貼り付けます。このコードは、.printSchema() メソッドを使用してデータフレームのスキーマを示しています。2つのデータフレームのスキーマを表示して、2つのデータフレームを結合する準備をします。
Python
df_csv.printSchema()
df1.printSchema()
  1. Shift+Enterを押してセルを実行し、次のセルに移動します。

データフレーム の列の名前を変更する

データフレームの列の名前を変更する方法を学びましょう。

  1. 次のコードをコピーして、ノートブックの空のセルに貼り付けます。このコードは、df1_csv データフレーム内の列の名前を、df1 データフレーム内の対応する列と一致するように変更します。このコードはApache Spark withColumnRenamed()メソッドを使用します。
Python
df_csv = df_csv.withColumnRenamed("First Name", "First_Name")
df_csv.printSchema()
  1. Shift+Enterを押してセルを実行し、次のセルに移動します。

データフレームを組み合わせる

あるデータフレームの行を別のデータフレームに追加して新しいデータフレームを作成する方法を学習します。

  1. 次のコードをコピーして、ノートブックの空のセルに貼り付けます。このコードでは、Apache Sparkのunion()メソッドを使用して、最初のデータフレーム dfの内容を、CSVファイルから読み込んだ赤ちゃんの名前データを含むデータフレーム df_csvと結合します。
Python
df = df1.union(df_csv)
display(df)
  1. Shift+Enterを押してセルを実行し、次のセルに移動します。

データフレーム の行をフィルター処理する

Apache Spark .filter() または.where() メソッドを使用して行をフィルタリングして、データセット内で最も人気のある赤ちゃんの名前を見つけてください。フィルタリングを使用して、データフレーム内で返したり変更したりする行のサブセットを選択します。以下の例のように、パフォーマンスや構文に違いはありません。

.filter() の使用 方式

  1. 次のコードをコピーして、空のノートブックのセルに貼り付けます。このコードは、Apache Spark の.filter()メソッドを使用して、DataFrame 内の行数が 50 を超える行を表示します。
Python
display(df.filter(df["Count"] > 50))
  1. Shift+Enterを押してセルを実行し、次のセルに移動します。

.where() の使用 方式

  1. 次のコードをコピーして、空のノートブックのセルに貼り付けます。このコードは、Apache Spark の.where()メソッドを使用して、DataFrame 内の行数が 50 を超える行を表示します。
Python
display(df.where(df["Count"] > 50))
  1. Shift+Enterを押してセルを実行し、次のセルに移動します。

データフレーム から列を選択し、頻度で並べ替える

select()メソッドを使用して、データフレームから返す列を指定することで、赤ちゃんの名前の頻度について学習します。結果を並べ替えるには、Apache Sparkのorderbyおよびdesc関数を使用します。

Apache SparkのPySpark .sql モジュールは、 SQL関数のサポートを提供します。 このチュートリアルで使用する関数には、Apache Spark のorderBy()desc()expr()関数があります。必要に応じてこれらの関数をセッションにインポートすることで、これらの関数の使用を有効にします。

  1. 次のコードをコピーして、ノートブックの空のセルに貼り付けます。このコードは、desc()関数をインポートし、Apache Spark select()メソッドとApache Spark orderBy()およびdesc()関数を使用して、最も一般的な名前とその数を降順で表示します。
Python
from pyspark.sql.functions import desc
display(df.select("First_Name", "Count").orderBy(desc("Count")))
  1. Shift+Enterを押してセルを実行し、次のセルに移動します。

サブセットの データフレーム を作成する

既存のデータフレームからサブセットデータフレームを作成する方法を学びましょう。

  1. 次のコードをコピーして、ノートブックの空のセルに貼り付けます。このコードでは、Apache Sparkのfilter メソッドを使用して、年、数、性別でデータを制限した新しいデータフレームを作成します。列を制限するためにApache Spark select()メソッドを使用します。また、Apache SparkのorderBy()およびdesc()関数を使用して、新しい データフレームをカウントで並べ替えます。
Python
subsetDF = df.filter((df["Year"] == 2009) & (df["Count"] > 100) & (df["Sex"] == "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))
display(subsetDF)
  1. Shift+Enterを押してセルを実行し、次のセルに移動します。

ステップ 5: データフレーム を保存する

データフレームを保存する方法について説明します。 データフレームをテーブルに保存するか、データフレームを 1 つまたは複数のファイルに書き込むことができます。

データフレーム をテーブルに保存します

Databricksでは、デフォルトですべてのテーブル向けにDelta Lakeを使用しています。データフレームを保存するには、カタログとスキーマに対する CREATE テーブル権限が必要です。

  1. 次のコードをコピーして、ノートブックの空のセルに貼り付けます。このコードは、このチュートリアルの最初に定義した変数を使用して、データフレームの内容を表に保存します。
Python
df.write.mode("overwrite").saveAsTable(f"{path_table}.{table_name}")
  1. Shift+Enterを押してセルを実行し、次のセルに移動します。

Apache Sparkアプリケーションのほとんどは、大規模なデータセットを分散処理します。Apache Sparkは単一のファイルではなく、ファイルのディレクトリを書き出します。Delta LakeはParquetフォルダとファイルを分割します。多くのデータシステムは、これらのファイルのディレクトリを読むことができます。Databricksでは、ほとんどのアプリケーションでファイルパスよりもテーブルを優先して使用することを推奨しています。

データフレーム を JSON ファイルに保存します

  1. 次のコードをコピーして、ノートブックの空のセルに貼り付けます。このコードは、データフレームを JSON ファイルのディレクトリに保存します。
Python
df.write.format("json").mode("overwrite").save("/tmp/json_data")
  1. Shift+Enterを押してセルを実行し、次のセルに移動します。

JSON ファイルから データフレーム を読み取る

Apache Sparkspark.read.format()read.jsonメソッドを使用して、ディレクトリからデータフレーム に .json データを読み込む方法を学習します。

  1. 次のコードをコピーして、空のノートブック セルに貼り付けます。 このコードは、前の例で保存した JSON ファイルを表示します。
Python
display(spark.read.format("json").json("/tmp/json_data"))
  1. Shift+Enterを押してセルを実行し、次のセルに移動します。

その他のタスク: PySpark、Scala、R での SQL クエリの実行

Apache Sparkデータフレームには、SQLをPySpark、Scala、Rと組み合わせるための次のオプションが用意されています。次のコードは、このチュートリアル用に作成したものと同じノートブックで実行できます。

列を SQL クエリとして指定する

Apache Spark selectExpr()メソッドの使用方法を学習します。これは、SQL 式を受け入れ、更新されたデータフレームを返すselect() メソッドのバリアントです。この方法では、upperなどのSQL式を使用できます。

  1. 次のコードをコピーして、ノートブックの空のセルに貼り付けます。このコードは、Apache Spark selectExpr()メソッドとSQL upper式を使用して、文字列の列を大文字に変換します(列の名前を変更します)。
Python
display(df.selectExpr("Count", "upper(County) as big_name"))
  1. Shift+Enterを押してセルを実行し、次のセルに移動します。

expr() を使用して、列の SQL 構文を使用します

Apache Spark expr()関数をインポートして使用し、列が指定される任意の場所でSQL構文を使用する方法を学習します。

  1. 次のコードをコピーして、ノートブックの空のセルに貼り付けます。このコードはexpr() 関数をインポートし、Apache Spark expr() 関数とSQL lower 式を使用して文字列を小文字に変換します(そして列の名前を変更します)。
Python
from pyspark.sql.functions import expr
display(df.select("Count", expr("lower(County) as little_name")))
  1. Shift+Enterを押してセルを実行し、次のセルに移動します。

spark.sql() を使用して任意の SQL クエリを実行する 機能

Apache Spark spark.sql()関数を使用して任意のSQLクエリを実行する方法を学習します。

  1. 次のコードをコピーして、ノートブックの空のセルに貼り付けます。このコードでは、Apache Spark spark.sql() 関数を使用してSQL構文を使用してSQLテーブルにクエリを実行します。
Python
display(spark.sql(f"SELECT * FROM {path_table}.{table_name}"))
  1. Shift+Enterを押してセルを実行し、次のセルに移動します。

データフレーム チュートリアル ノートブック

次のノートブックには、このチュートリアルからのクエリーの例が含まれています。

DataFramesを使用した チュートリアルPython

ノートブックを新しいタブで開く

追加のリソース