パイプラインでデータをロードする
Databricksでは、パイプラインを使用することで、Apache Sparkがサポートするあらゆるデータソースからデータをロードできます。LakeFlow Spark宣言型パイプラインでは、ストリーミングDataFramesやSpark DataFramesのPandasなど、 Spark DataFrame返すクエリに対してデータセット (テーブルとビュー) を定義できます。 データ取り込みタスクに関しては、Databricksはほとんどのユースケースでストリーミングテーブルの使用を推奨しています。ストリーミングテーブルは、Auto Loaderを使用してクラウドオブジェクトストレージからデータを取り込む場合や、Kafkaなどのメッセージバスからデータを取り込む場合に便利です。
すべてのデータソースが、データ取り込みのためのSQLをサポートしているわけではありません。ただし、同じパイプライン内でSQLとPythonのソースを混在させ、必要に応じてPythonを使用することも可能です。LakeFlow Spark宣言型パイプラインにパッケージ化されていないライブラリを無理に操作する方法の詳細については、「パイプラインのPython依存関係の管理」を参照してください。 Databricksにおけるデータ取り込みに関する一般的な情報については、 LakeFlow Connectの標準コネクタを参照してください。
以下の例は、一般的なデータ読み込みパターンを示しています。
既存のテーブルから読み込む
Databricks内の既存のテーブルからデータを読み込む。クエリを使用してデータを変換することも、パイプラインでさらに処理するためにテーブルをロードすることもできます。
- Python
- SQL
@dp.table(
comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
return (
spark.read.table("baby_names_prepared")
.filter(expr("Year_Of_Birth == 2021"))
.groupBy("First_Name")
.agg(sum("Count").alias("Total_Count"))
.sort(desc("Total_Count"))
)
CREATE OR REFRESH MATERIALIZED VIEW top_baby_names_2021
COMMENT "A table summarizing counts of the top baby names for New York for 2021."
AS SELECT
First_Name,
SUM(Count) AS Total_Count
FROM baby_names_prepared
WHERE Year_Of_Birth = 2021
GROUP BY First_Name
ORDER BY Total_Count DESC
クラウドオブジェクトストレージからファイルを読み込む
Databricks 、クラウド オブジェクト ストレージまたはUnity Catalogボリューム内のファイルからのデータ取り込みタスクのほとんどに、パイプラインのAuto Loader使用することをお勧めします。 Auto Loaderとパイプラインは、増え続けるデータがクラウド ストレージに到着すると、増分的かつべき等にロードするように設計されています。 Auto Loaderとは何か?」および「オブジェクトストレージからデータをロードする」を参照してください。
以下の例では、Auto Loader を使用してクラウドストレージからデータを読み取ります。
- Python
- SQL
@dp.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("s3://mybucket/analysis/*/*/*.json")
)
CREATE OR REFRESH STREAMING TABLE sales
AS SELECT *
FROM STREAM read_files(
's3://mybucket/analysis/*/*/*.json',
format => "json"
);
次の例では、 Auto Loader使用して、 Unity Catalogボリューム内のCSVファイルからデータセットを作成します。
- Python
- SQL
@dp.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/Volumes/my_catalog/retail_org/customers/")
)
CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM STREAM read_files(
"/Volumes/my_catalog/retail_org/customers/",
format => "csv"
)
- ファイル通知でAuto Loaderを使用し、パイプラインまたはストリーミング テーブルの完全な更新を実行する場合は、リソースを手動でクリーンアップする必要があります。 ノートブック内のCloudFilesResourceManagerを使用してクリーンアップを実行できます。
- Unity Catalog有効なパイプラインでAuto Loaderを使用してファイルをロードするには、外部ロケーションを使用する必要があります。 パイプラインで Unity Catalog を使用する方法の詳細については、 「パイプラインで Unity Catalog を使用する」を参照してください。
クラウドストレージへの認証
Auto Loader Unity Catalog外部ロケーションを使用してクラウド ストレージに対して認証します。 読み取り先のストレージ パスの外部ロケーションを構成し、実行ユーザーにREAD FILES権限を付与する必要があります。
Amazon S3からデータを取り込むには、S3バケットを参照するストレージ認証情報によって裏付けられた外部ロケーションを設定します。詳細については、 Unity Catalogを使用してクラウド オブジェクト ストレージに接続する」を参照してください。
メッセージバスからデータをロードする
パイプラインを構成して、メッセージバスからデータを取り込むことができます。Databricks 、継続的な実行と強化されたオートスケールを備えたストリーミング テーブルを使用して、メッセージ バスからの低遅延読み込みの最も効率的な取り込みを提供することをお勧めします。 詳細については、 「オートスケールを使用したLakeFlow Spark宣言型パイプラインのクラスター使用率の最適化」を参照してください。
例えば、以下のコードは、 read_kafka関数を使用してKafkaからデータを取り込むようにストリーミングテーブルを設定します。
- Python
- SQL
from pyspark import pipelines as dp
@dp.table
def kafka_raw():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka_server:9092")
.option("subscribe", "topic1")
.load()
)
CREATE OR REFRESH STREAMING TABLE kafka_raw AS
SELECT *
FROM STREAM read_kafka(
bootstrapServers => 'kafka_server:9092',
subscribe => 'topic1'
);
他のメッセージ バス ソースから取り込むには、以下を参照してください。
- Kinesis : read_kinesis
- Pub/Subトピック: read_pubsub
- Pulsar: read_pulsar
Azure Event Hubs からデータを読み込む
Azure Event Hubs は、Apache Kafka 互換インターフェースを提供するデータ ストリーミング サービスです。Lakeflow Spark宣言型パイプライン ランタイムに含まれる構造化ストリーミングKafkaコネクタを使用して、 Azure Event Hubs からメッセージを読み込むことができます。 Azure Event Hubs からのメッセージの読み込みと処理の詳細については、 「 Azure Event Hubs をパイプライン データ ソースとして使用する」を参照してください。
外部システムからデータを読み込む
LakeFlow Spark宣言型パイプラインは、 Databricksでサポートされている任意のデータ ソースからのデータの読み込みをサポートします。 データソースおよび外部サービスへの接続を参照してください。サポートされているデータソースのレイクハウスフェデレーションを使用して外部データを読み込むこともできます。 レイクハウスフェデレーションにはDatabricks Runtime 13.3 LTS以降が必要なため、レイクハウスフェデレーションを使用するには、プレビュー チャンネルを使用するようにパイプラインを構成します。
データソースによっては、同等のSQLサポートが提供されていない場合があります。これらのデータ ソースのいずれかでレイクハウスフェデレーションを使用できない場合は、 Python使用してソースからデータを取り込むことができます。 PythonとSQLのソースファイルを同じパイプラインに追加できます。次の例では、リモートのPostgreSQLテーブル内のデータの現在の状態にアクセスするためのマテリアライズドビューを宣言します。
import dp
@dp.table
def postgres_raw():
return (
spark.read
.format("postgresql")
.option("dbtable", table_name)
.option("host", database_host_url)
.option("port", 5432)
.option("database", database_name)
.option("user", username)
.option("password", password)
.load()
)
クラウドオブジェクトストレージから小規模または静的なデータセットをロードする
Apache Spark ロード構文を使用して、小さなデータセットや静的データセットをロードできます。Lakeflow Spark宣言型パイプラインは、 Databricks上のApache Sparkでサポートされるすべてのファイル形式をサポートします。 完全なリストについては、 「データ形式オプション」を参照してください。
以下の例は、JSONを読み込んでテーブルを作成する方法を示しています。
- Python
- SQL
@dp.table
def clickstream_raw():
return (spark.read.format("json").load("/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"))
CREATE OR REFRESH MATERIALIZED VIEW clickstream_raw
AS SELECT * FROM read_files(
"/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"
)
read_files SQL 関数は、Databricks 上のすべての SQL 環境に共通です。これは、パイプラインで SQL を使用して直接ファイルにアクセスする場合に推奨されるパターンです。詳細については、 「オプション」を参照してください。
Pythonカスタムデータソースからデータをロードする
Pythonのカスタムデータソースを使用すると、カスタム形式でデータを読み込むことができます。特定の外部データソースからデータを読み取ったり、データを書き込んだりするコードを記述することも、既存のPythonコードを使用して自社の内部システムからデータを読み取ることもできます。Pythonデータソースの開発の詳細については、 PySparkカスタム データソース」を参照してください。
次の例では、形式名my_custom_datasourceのカスタム データ ソースを登録し、バッチ モードとストリーミング モードの両方でそれを読み取ります。
from pyspark import pipelines as dp
# Assume `my_custom_datasource` is a custom Python custom data
# source that supports both batch and streaming reads, and has
# been registered using `spark.dataSource.register`.
# This creates a materialized view
@dp.table(name = "read_from_batch")
def read_from_batch():
return spark.read.format("my_custom_datasource").load()
# This creates a streaming table
@dp.table(name = "read_from_streaming")
def read_from_streaming():
return spark.readStream.format("my_custom_datasource").load()
ソースストリーミングテーブルの変更を無視するようにストリーミングテーブルを構成する
デフォルトでは、ストリーミングテーブルは追記専用のソースを必要とします。ソースストリーミングテーブルに更新や削除が必要な場合(たとえば、GDPRの「忘れられる権利」の処理など)、 skipChangeCommitsフラグを使用してこれらの変更を無視します。このフラグは、 option()関数を使用するspark.readStreamでのみ機能し、ソース ストリーミング テーブルがcreate_auto_cdc_flow()関数のターゲットである場合には使用できません。 詳細については、 「ソースDeltaテーブルへの変更の処理」を参照してください。
@dp.table
def b():
return spark.readStream.option("skipChangeCommits", "true").table("A")
パイプライン内のシークレットを使用してストレージ資格情報に安全にアクセスする
Databricksシークレットを使用して、アクセス キーやパスワードなどの資格情報を保存できます。パイプラインでシークレットを構成するには、パイプライン設定のクラスター構成で Spark プロパティを使用します。「パイプライン用のクラシック コンピュートの構成」を参照してください。
次の例では、シークレットを使用して、 Auto Loaderを使用してAzureデータレイク ストレージ アカウントから入力データを読み取るために必要なアクセス キーを保存します。 この同じ方法を使用して、たとえばS3にアクセスするためのAWSキーやApache Hive metastoreのパスワードなど、必要なシークレットを設定できます。
Azure Data Lake Storage の操作の詳細については、「Azure Data Lake Storage と Blob Storage に接続する」を参照してください。
シークレット値を設定するspark_conf構成キーにspark.hadoop.プレフィックスを追加する必要があります。
{
"id": "43246596-a63f-11ec-b909-0242ac120002",
"storage": "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path>",
"clusters": [
{
"spark_conf": {
"spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
},
"autoscale": {
"min_workers": 1,
"max_workers": 5,
"mode": "ENHANCED"
}
}
],
"development": true,
"continuous": false,
"libraries": [
{
"notebook": {
"path": "/Users/user@databricks.com/:re[LDP] Notebooks/:re[LDP] quickstart"
}
}
],
"name": ":re[LDP] quickstart using ADLS2"
}
このコードサンプルでは、以下の値を置き換えてください。
プレースホルダー | に置き換える |
|---|---|
| Azureストレージアカウントコンテナーの名前。 |
| ADLSストレージアカウント名。 |
| パイプラインの出力データとメタデータのパス。 |
| Databricksのシークレットスコープ名。 |
| Azureストレージアカウントのアクセスキーを含むキーの名前。 |
from pyspark import pipelines as dp
json_path = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-input-dataset>"
@dp.create_table(
comment="Data ingested from an ADLS2 storage account."
)
def read_from_ADLS2():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load(json_path)
)
このコードサンプルでは、以下の値を置き換えてください。
プレースホルダー | に置き換える |
|---|---|
| 入力データを保存するAzureストレージアカウントコンテナーの名前。 |
| ADLSストレージアカウント名。 |
| 入力データセットへのパス。 |