パイプラインでデータをロードする
パイプラインを使用して、 Apache Sparkでサポートされている任意のデータ ソースからDatabricksにデータをロードできます。 LakeFlow Spark宣言型パイプラインでは、ストリーミングDataFramesやSpark DataFramesのPandasなど、 Spark DataFrameを返すクエリに対してデータセット (テーブルとビュー) を定義できます。 データ取り込みタスクの場合、 Databricksほとんどのユースケースでストリーミング テーブルを使用することを推奨します。 ストリーミング テーブルは、 Auto Loader使用してクラウド オブジェクト ストレージから、またはKafkaなどのメッセージ バスからデータを取り込むのに適しています。
- すべてのデータソースが取り込み用のSQLサポートしているわけではありません。 パイプライン内でSQLとPythonソースを混合して、必要な場合にはPython使用し、同じパイプライン内の他の操作にはSQL使用できます。
- LakeFlow Spark宣言型パイプラインにパッケージ化されていないライブラリを無理に操作する方法の詳細については、 「パイプラインのPython依存関係の管理」を参照してください。
- Databricksでのインジェストに関する一般的な情報については、「Lakeflowコネクトの標準コネクタ」を参照してください。
次の例は、いくつかの一般的なパターンを示しています。
既存のテーブルから読み込む
Databricks 内の既存のテーブルからデータを読み込みます。クエリを使用してデータを変換したり、パイプラインでさらに処理するためにテーブルをロードしたりできます。
次の例では、既存のテーブルからデータを読み取ります。
- Python
- SQL
@dp.table(
comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
return (
spark.read.table("baby_names_prepared")
.filter(expr("Year_Of_Birth == 2021"))
.groupBy("First_Name")
.agg(sum("Count").alias("Total_Count"))
.sort(desc("Total_Count"))
)
CREATE OR REFRESH MATERIALIZED VIEW top_baby_names_2021
COMMENT "A table summarizing counts of the top baby names for New York for 2021."
AS SELECT
First_Name,
SUM(Count) AS Total_Count
FROM baby_names_prepared
WHERE Year_Of_Birth = 2021
GROUP BY First_Name
ORDER BY Total_Count DESC
クラウドオブジェクトストレージからファイルを読み込む
Databricksでは、クラウド オブジェクト ストレージまたはUnity Catalogボリューム内のファイルからのデータ取り込みタスクのほとんどに、パイプラインのAuto Loaderを使用することをお勧めします。 Auto Loader とパイプラインは、クラウド ストレージに到着するデータが増え続けるにつれて、それを増分的かつべき等的にロードするように設計されています。
Auto Loaderとはおよびオブジェクトストレージからのデータのロードを参照してください。
次の例では、Auto Loader を使用してクラウド ストレージからデータを読み取ります。
- Python
- SQL
@dp.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("s3://mybucket/analysis/*/*/*.json")
)
CREATE OR REFRESH STREAMING TABLE sales
AS SELECT *
FROM STREAM read_files(
's3://mybucket/analysis/*/*/*.json',
format => "json"
);
次の例では、Auto Loader を使用して、Unity Catalog ボリューム内の CSVファイルからデータセットを作成します。
- Python
- SQL
@dp.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/Volumes/my_catalog/retail_org/customers/")
)
CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM STREAM read_files(
"/Volumes/my_catalog/retail_org/customers/",
format => "csv"
)
- ファイル通知でAuto Loaderを使用し、パイプラインまたはストリーミング テーブルの完全な更新を実行する場合は、リソースを手動でクリーンアップする必要があります。 ノートブック内のCloudFilesResourceManagerを使用してクリーンアップを実行できます。
- Unity Catalog有効なパイプラインでAuto Loaderを使用してファイルをロードするには、外部ロケーションを使用する必要があります。 パイプラインで Unity Catalog を使用する方法の詳細については、 「パイプラインで Unity Catalog を使用する」を参照してください。
メッセージバスからデータを読み込む
メッセージ バスからデータを取り込むようにパイプラインを構成できます。Databricks継続的な実行と強化されたオートスケールを備えたストリーミング テーブルを使用して、メッセージ バスからの低遅延読み込みの最も効率的な取り込みを提供することをお勧めします。 「オートスケールを使用したLakeFlow Spark宣言型パイプラインのクラスター使用率の最適化」を参照してください。
たとえば、次のコードは、 read_kafka関数を使用してKafkaからデータを取り込むようにストリーミング テーブルを構成します。
- Python
- SQL
from pyspark import pipelines as dp
@dp.table
def kafka_raw():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka_server:9092")
.option("subscribe", "topic1")
.load()
)
CREATE OR REFRESH STREAMING TABLE kafka_raw AS
SELECT *
FROM STREAM read_kafka(
bootstrapServers => 'kafka_server:9092',
subscribe => 'topic1'
);
他のメッセージ バス ソースから取り込むには、以下を参照してください。
- Kinesis : read_kinesis
- Pub/Subトピック: read_pubsub
- Pulsar: read_pulsar
Azure Event Hubs からデータを読み込む
Azure Event Hubs は、Apache Kafka 互換インターフェースを提供するデータ ストリーミング サービスです。LakeFlow Spark宣言型パイプライン ランタイムに含まれる構造化ストリーミングKafkaコネクタを使用して、 Azure Event Hubs からメッセージを読み込むことができます。 Azure Event Hubs からのメッセージの読み込みと処理の詳細については、 「 Azure Event Hubs をパイプライン データ ソースとして使用する」を参照してください。
外部システムからデータを読み込む
LakeFlow Spark宣言型パイプラインは、 Databricksでサポートされている任意のデータ ソースからのデータの読み込みをサポートします。 「データソースと外部サービスへの接続」を参照してください。 サポートされているデータソースのレイクハウスフェデレーションを使用して外部データを読み込むこともできます。 レイクハウスフェデレーションにはDatabricks Runtime 13.3 LTS以降が必要なため、レイクハウスフェデレーションを使用するには、プレビュー チャンネルを使用するようにパイプラインを構成する必要があります。
一部のデータソースには、 SQLでの同等のサポートがありません。 これらのデータ ソースのいずれかでレイクハウスフェデレーションを使用できない場合は、 Pythonを使用してソースからデータを取り込むことができます。 同じパイプラインに Python ソース ファイルと SQL ソース ファイルを追加できます。次の例では、リモートPostgreSQLテーブル内のデータの現在の状態にアクセスするためのマテリアライズドビューを宣言します。
import dp
@dp.table
def postgres_raw():
return (
spark.read
.format("postgresql")
.option("dbtable", table_name)
.option("host", database_host_url)
.option("port", 5432)
.option("database", database_name)
.option("user", username)
.option("password", password)
.load()
)
クラウド オブジェクト ストレージから小規模または静的なデータセットを読み込む
Apache Spark ロード構文を使用して、小さなデータセットや静的データセットをロードできます。LakeFlow Spark宣言型パイプラインは、 Databricks上のApache Sparkでサポートされるすべてのファイル形式をサポートします。 完全なリストについては、 「データ形式オプション」を参照してください。
次の例は、JSON を読み込んでテーブルを作成する方法を示しています。
- Python
- SQL
@dp.table
def clickstream_raw():
return (spark.read.format("json").load("/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"))
CREATE OR REFRESH MATERIALIZED VIEW clickstream_raw
AS SELECT * FROM read_files(
"/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"
)
read_files SQL 関数は、Databricks 上のすべての SQL 環境に共通です。これは、パイプラインで SQL を使用して直接ファイルにアクセスする場合に推奨されるパターンです。詳細については、 「オプション」を参照してください。
Pythonカスタムデータソースからデータをロードする
Pythonカスタム データソースを使用すると、カスタム形式でデータをロードできます。 特定の外部データソースに対して読み取りや書き込みを行うコードを記述したり、既存のシステム内の既存のPythonコードを活用して独自の内部システムからデータを読み取ったりすることができます。 Pythonデータソースの開発の詳細については、 PySparkカスタム データソース」を参照してください。
Pythonカスタム データソースを使用してデータをパイプラインにロードするには、それをmy_custom_datasourceなどの形式名で登録し、そこから読み取ります。
from pyspark import pipelines as dp
# Assume `my_custom_datasource` is a custom Python custom data
# source that supports both batch and streaming reads, and has
# been registered using `spark.dataSource.register`.
# This creates a materialized view
@dp.table(name = "read_from_batch")
def read_from_batch():
return spark.read.format("my_custom_datasource").load()
# This creates a streaming table
@dp.table(name = "read_from_streaming")
def read_from_streaming():
return spark.readStream.format("my_custom_datasource").load()
ソースストリーミングテーブルの変更を無視するようにストリーミングテーブルを構成する
skipChangeCommitsフラグは、option()関数を使用するspark.readStreamでのみ機能します。このフラグは、dp.read_stream()関数では使用できません。- ソース ストリーミング テーブルがcreate_auto_cdc_flow()関数のターゲットとして定義されている場合は、
skipChangeCommitsフラグを使用できません。
とりあえず、ストリーミングテーブルには追加専用のソースが必要です。 ストリーミング テーブルが別のストリーミング テーブルをソースとして使用し、そのソース ストリーミング テーブルがGDPR 「忘れられる権利」処理などの更新または削除を必要とする場合、ソース ストリーミング テーブルの読み取り時にskipChangeCommitsフラグを設定して、それらの変更を無視することができます。 このフラグの詳細については、 「更新と削除を無視する」を参照してください。
@dp.table
def b():
return spark.readStream.option("skipChangeCommits", "true").table("A")
パイプライン内のシークレットを使用してストレージ資格情報に安全にアクセスする
Databricksシークレットを使用して、アクセス キーやパスワードなどの資格情報を保存できます。パイプラインでシークレットを構成するには、パイプライン設定のクラスター構成で Spark プロパティを使用します。「パイプライン用のクラシック コンピュートの構成」を参照してください。
次の例では、シークレットを使用して、Azure Data Lake Storage (ADLS)のストレージアカウント から入力データを読み取るために必要なアクセス キー Auto Loaderを使用します。これと同じ方法を使用して、パイプラインで必要なシークレット (S3にアクセスするためのAWSキーやApache Hive metastoreのパスワードなど)を構成できます。
Azure Data Lake Storage の操作の詳細については、「Azure Data Lake Storage と Blob Storage に接続する」を参照してください。
シークレット値を設定するspark_conf構成キーにspark.hadoop.プレフィックスを追加する必要があります。
{
"id": "43246596-a63f-11ec-b909-0242ac120002",
"clusters": [
{
"spark_conf": {
"spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
},
"autoscale": {
"min_workers": 1,
"max_workers": 5,
"mode": "ENHANCED"
}
}
],
"development": true,
"continuous": false,
"libraries": [
{
"notebook": {
"path": "/Users/user@databricks.com/:re[LDP] Notebooks/:re[LDP] quickstart"
}
}
],
"name": ":re[LDP] quickstart using ADLS2"
}
以下の通り置き換えます。
<storage-account-name>ADLS ストレージ アカウント名を使用します。<scope-name>をDatabricksシークレットスコープ名に置き換えます。<secret-name>をAzureストレージアカウントのアクセスキーを含むキーの名前に置き換えます。
from pyspark import pipelines as dp
json_path = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-input-dataset>"
@dp.create_table(
comment="Data ingested from an ADLS2 storage account."
)
def read_from_ADLS2():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load(json_path)
)
以下の通り置き換えます。
<container-name>入力データを保存する Azure ストレージ アカウント コンテナーの名前。<storage-account-name>ADLS ストレージ アカウント名を使用します。<path-to-input-dataset>入力データセットへのパス。