DLT によるデータの読み込み
DLT を使用して、DatabricksにおけるApache Sparkでサポートされている任意のデータソースからデータをロードできます。 DLT では、 Spark データフレームを返す任意のクエリに対してデータセット (テーブルとビュー) を定義できます (ストリーミング 、 データフレーム 、 Pandas for Spark データフレームなど)。 データ取り込み タスクの場合、 Databricks はほとんどのユースケースでストリーミングテーブルを使用することをお勧めします。 ストリーミングテーブルは、 Auto Loader を使用してクラウドオブジェクトストレージからデータを取り込む場合や、 Kafkaなどのメッセージバスからデータを取り込むのに適しています。
- すべてのデータソースが SQL をサポートしているわけではありません。 DLT パイプラインで SQL ノートブックと Python ノートブックを混在させて、インジェスト以降のすべての操作に SQL を使用できます。
- デフォルトで DLT にパッケージ化されていないライブラリの操作の詳細については、「 DLT パイプラインの Python 依存関係の管理」を参照してください。
- Databricks でのインジェストに関する一般的な情報については、「 Databricks レイクハウスにデータを取り込む」を参照してください。
次の例は、いくつかの一般的なパターンを示しています。
クラウドオブジェクトストレージからのファイルのロード
Databricks では、クラウド オブジェクト ストレージからのほとんどのデータ取り込みタスクに Auto Loader with DLT を使用することをお勧めします。 Auto Loader と DLT は、増え続けるデータがクラウド ストレージに到着するときに、増分的かつべき等に読み込むように設計されています。
Auto Loaderとはおよびオブジェクトストレージからのデータのロードを参照してください。
Auto LoaderUnity Catalog対応パイプラインで を含むファイルをロードするには、外部ロケーション を使用する必要があります。DLT での Unity Catalog の使用の詳細については、「 DLT パイプラインでの Unity Catalog の使用」を参照してください。
次の例では、 Auto Loader を使用して CSV ファイルと JSON ファイルからデータセットを作成します。
- Python
- SQL
@dlt.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/databricks-datasets/retail-org/customers/")
)
@dlt.table
def sales_orders_raw():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/databricks-datasets/retail-org/sales_orders/")
)
CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM STREAM read_files(
"/databricks-datasets/retail-org/customers/",
format => "csv"
)
CREATE OR REFRESH STREAMING TABLE sales_orders_raw
AS SELECT * FROM STREAM read_files(
"/databricks-datasets/retail-org/sales_orders/",
format => "json")
ファイル通知で Auto Loader を使用し、パイプラインまたはストリーミングテーブルの完全な更新を実行する場合は、リソースを手動でクリーンアップする必要があります。 ノートブックで CloudFilesResourceManager を使用して、クリーンアップを実行できます。
メッセージバスからのデータの読み込み
DLT パイプラインを設定して、メッセージバスからデータを取り込むことができます。Databricks では、連続実行と拡張オートスケールを備えたストリーミングテーブルを使用して、メッセージバスからの低レイテンシロードに最も効率的なインジェストを提供することをお勧めします。 オートスケールを使用したDLT パイプラインのクラスタリング利用の最適化を参照してください。
たとえば、次のコードでは、 Kafkaからデータを取り込むようにストリーミングテーブルを構成します。
import dlt
@dlt.table
def kafka_raw():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "topic1")
.option("startingOffsets", "latest")
.load()
)
次の例のように、純粋な SQL でダウンストリーム操作を記述して、このデータに対してストリーミング変換を実行できます。
CREATE OR REFRESH STREAMING TABLE streaming_silver_table
AS SELECT
*
FROM
STREAM(kafka_raw)
WHERE ...
Azure Event Hubs からデータを読み込む
Azure Event Hubs は、Apache Kafka と互換性のあるインターフェイスを提供するデータ ストリーミング サービスです。 DLT ランタイムに含まれる構造化ストリーミング Kafka コネクタを使用して、Azure Event Hubs からメッセージを読み込むことができます。Azure Event Hubs からのメッセージの読み込みと処理の詳細については、「Azure Event Hubs を DLT データソースとして使用する」を参照してください。
外部システムからのデータのロード
DLT は、 Databricksでサポートされている任意のデータソースからのデータの読み込みをサポートしています。 「データソースへの接続」を参照してください。サポートされているデータソースのレイクハウスフェデレーションを使用して外部データをロードすることもできます。レイクハウスフェデレーションには Databricks Runtime 13.3 LTS 以上が必要なため、レイクハウスフェデレーションを使用するには、 プレビューチャンネルを使用するようにパイプラインを構成する必要があります。
一部のデータソースは、 SQLで同等のサポートを受けていません。 これらのデータソースのいずれかでレイクハウスフェデレーションを使用できない場合は、 Python ノートブックを使用してソースからデータを取り込むことができます。 Python と SQL のソースコードを同じ DLT パイプラインに追加できます。次の例では、リモート PostgreSQL テーブル内のデータの現在の状態にアクセスするためのマテリアライズドビューを宣言します。
import dlt
@dlt.table
def postgres_raw():
return (
spark.read
.format("postgresql")
.option("dbtable", table_name)
.option("host", database_host_url)
.option("port", 5432)
.option("database", database_name)
.option("user", username)
.option("password", password)
.load()
)
小さなデータセットまたは静的なデータセットをクラウドオブジェクトストレージからロードする
Apache Spark のロード構文を使用して、小さなデータセットまたは静的なデータセットをロードできます。 DLT は、Databricks 上の Apache Spark でサポートされているすべてのファイル形式をサポートしています。完全なリストについては、「 データ形式のオプション」を参照してください。
次の例は、JSON をロードして DLT テーブルを作成する方法を示しています。
- Python
- SQL
@dlt.table
def clickstream_raw():
return (spark.read.format("json").load("/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"))
CREATE OR REFRESH MATERIALIZED VIEW clickstream_raw
AS SELECT * FROM read_files(
"/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"
)
read_files
SQL 関数は、Databricks 上のすべての SQL 環境に共通です。これは、DLT で SQL を使用した直接ファイル・アクセスに推奨されるパターンです。詳細については、「 オプション」を参照してください。
ソース ストリーミングテーブルの変更を無視するようにストリーミングテーブルを構成する
skipChangeCommits
フラグは、option()
関数を使用するspark.readStream
でのみ機能します。このフラグは、dlt.read_stream()
関数では使用できません。- ソース ストリーミングテーブルが apply_changes() 関数のターゲットとして定義されている場合は、
skipChangeCommits
フラグを使用できません。
デフォルトでは、ストリーミングテーブルには追加専用ソースが必要です。ストリーミングテーブルが別のストリーミングテーブルをソースとして使用し、ソース ストリーミングテーブルが更新または削除 ( GDPR 忘れられる権利" 処理など) を必要とする場合、ソース ストリーミングテーブルを読み取るときに skipChangeCommits
フラグを設定して、それらの変更を無視できます。 このフラグの詳細については、「 更新と削除を無視する」を参照してください。
@table
def b():
return spark.readStream.option("skipChangeCommits", "true").table("A")
パイプライン内のシークレットを使用してストレージ資格情報に安全にアクセスする
Databricks シークレット を使用して、アクセス キーやパスワードなどの資格情報を格納できます。パイプラインでシークレットを構成するには、パイプライン設定のクラスタリング構成で Spark プロパティを使用します。 DLT パイプラインのコンピュートの設定を参照してください。
次の例では、シークレットを使用して、Azure データレイク Storage (ADLS) storage アカウント から入力データを読み取るために必要なアクセス キー Auto Loaderを使用します。これと同じ方法を使用して、パイプラインで必要なシークレット (AWS S3にアクセスするための キーや のパスワードなど)ApacheHive metastore を構成できます。
Azure データレイク Storage の操作の詳細については、「Azure データレイク Storage と Blob Storage に接続する」を参照してください。
シークレット値を設定する spark_conf
設定キーに spark.hadoop.
プレフィックスを追加する必要があります。
{
"id": "43246596-a63f-11ec-b909-0242ac120002",
"clusters": [
{
"spark_conf": {
"spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
},
"autoscale": {
"min_workers": 1,
"max_workers": 5,
"mode": "ENHANCED"
}
}
],
"development": true,
"continuous": false,
"libraries": [
{
"notebook": {
"path": "/Users/user@databricks.com/DLT Notebooks/DLT quickstart"
}
}
],
"name": "DLT quickstart using ADLS2"
}
置き換え
<storage-account-name>
を ADLS ストレージ アカウント名に置き換えます。<scope-name>
をDatabricksシークレットスコープ名に置き換えます。<secret-name>
をAzureストレージアカウントのアクセスキーを含むキーの名前に置き換えます。
import dlt
json_path = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-input-dataset>"
@dlt.create_table(
comment="Data ingested from an ADLS2 storage account."
)
def read_from_ADLS2():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load(json_path)
)
置き換え
<container-name>
は、入力データを格納する Azure ストレージ アカウント コンテナーの名前に置き換えます。<storage-account-name>
を ADLS ストレージ アカウント名に置き換えます。<path-to-input-dataset>
を入力データセットへのパスに置き換えます。