Databricks レイクハウスにデータを取り込む
Databricks は、Delta Lake がサポートするレイクハウスにデータを取り込むためのさまざまな方法を提供します。 この記事では、サポートされているインジェスト ツールと、データソースや待機時間などの条件に基づいて使用する方法に関するガイダンスの一覧を示します。
インジェスト方法
次の方法を使用して、Databricks にデータを取り込むことができます。
- 頻度の低い処理のための一連のデータ行の バッチ取り込み
- 個々のデータ行またはデータ行のセットが到着したときの ストリーミング 取り込みで、リアルタイム処理を行います
取り込まれたデータは Delta テーブル に読み込まれ、ダウンストリーム データと AI のユース ケース全体で使用できます。 Databricksレイクハウスのアーキテクチャにより、ユースケース間でデータを複製する必要がなく、Unity Catalogを活用して、すべてのデータにわたって一元化されたアクセス制御、監査、リネージ、およびデータディスカバリーを行うことができます。
バッチ取り込み
バッチインジェストでは、データを行のセット(またはバッチ)として Databricks にロードします。多くの場合、スケジュール(毎日など)に基づいて、または手動でトリガーされます。 これは、従来の抽出、変換、ロード (ETL) のユースケースの「抽出」部分を表しています。 バッチ インジェストを使用して、次の場所からデータを読み込むことができます。
-
CSV などのローカル ファイル
-
Amazon S3、Azureデータレイクストレージ、Google Cloud Storage などのクラウドオブジェクトストレージ
-
Salesforce などの SaaS アプリケーションや SQL Server などのデータベース
バッチ取り込みでは、CSV、TSV、JSON、XML、Avro、ORC、Parquet、テキスト ファイルなど、さまざまなファイル ソース形式がサポートされています。
Databricks は、従来のバッチ インジェスト オプションと増分バッチ インジェスト オプションの両方をサポートしています。 従来のバッチ取り込みでは、実行されるたびにすべてのレコードが処理されますが、増分バッチ取り込みでは、データソース内の新しいレコードが自動的に検出され、すでに取り込まれたレコードは無視されます。 つまり、処理するデータが少なくて済むため、インジェスト ジョブの実行が速くなり、コンピュート リソースをより効率的に使用できます。
従来の (1 回限りの) バッチ取り込み
データ追加UIを使用して、ローカルデータファイルをアップロードしたり、公開URLからファイルをダウンロードしたりできます。 「ファイルのアップロード」を参照してください。
増分バッチ取り込み
このセクションでは、サポートされている増分バッチ取り込みツールについて説明します。
ストリーミング テーブル
CREATE STREAMING TABLE
SQL コマンドを使用すると、クラウド・オブジェクト・ストレージからストリーミング・テーブルにデータを増分的にロードできます。CREATE STREAMING TABLEを参照してください。
例: ストリーミング テーブルを使用した増分バッチ取り込み
CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM STREAM read_files(
"/databricks-datasets/retail-org/customers/",
format => "csv"
)
クラウドオブジェクトストレージコネクタ
Auto Loaderは、組み込み Cloud Object Storage Connector であり、新しいデータ ファイルが Amazon S3 (S3)、 Azure データレイク Storage Gen 2 (ALDS2)、または Google Cloud Storage (GCS) に到着したときに、段階的かつ効率的に処理できます。 Auto Loaderを参照してください。
例: Auto Loader を使用した増分バッチ取り込み
df = spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("rescuedDataColumn", "_rescued_data")
.schema("/databricks-datasets/retail-org/customers/schema")
.load("/databricks-datasets/retail-org/customers/")
フルマネージド コネクタ
LakeFlow Connect には、 SaaS アプリケーションやデータベースから取り込むためのフルマネージド コネクタが用意されています。 マネージド コネクタは、次のものを使用して使用できます。
- Databricks UI
- Databricks CLI
- Databricks APIs
- Databricks SDK
- Databricksアセットバンドル
LakeFlow Connectを参照してください。
ストリーミング インジェスト
ストリーミング インジェストでは、データ行またはデータ行のバッチが生成されたときに継続的に読み込まれるため、ほぼリアルタイムで到着したときにクエリを実行できます。 ストリーミングインジェストを使用して、Apache Kafka、Amazon Kinesis、Google Pub/Sub、Apache Pulsar などのソースからストリーミングデータをロードできます。
Databricks では、組み込みコネクタを使用したストリーミング インジェストもサポートされています。 これらのコネクタを使用すると、ストリーミング ソースから到着する新しいデータを段階的かつ効率的に処理できます。 ストリーミングデータソースの設定を参照してください。
例: Kafka からのストリーミング取り込み
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "topic1")
.option("startingOffsets", "latest")
.load()
DLT によるバッチおよびストリーミング取り込み
Databricks では、 DLT を使用して信頼性と拡張性に優れたデータ処理パイプラインを構築することをお勧めします。DLT はバッチとストリーミングの両方のインジェストをサポートしており、 Auto Loaderでサポートされている任意のデータソースからデータを取り込むことができます。
例: DLT を使用した増分バッチ取り込み
@dlt.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/databricks-datasets/retail-org/customers/")
)
例: DLT を使用した Kafka からのストリーミング取り込み
@dlt.table
def kafka_raw():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "topic1")
.option("startingOffsets", "latest")
.load()
)
インジェスト スケジュール
データは、1 回限りの操作として、定期的なスケジュールで、または継続的に取り込むことができます。
- ほぼリアルタイムのストリーミングの使用例では、連続モードを使用します。
- バッチ取得のユースケースでは、1 回だけ取り込むか、定期的なスケジュールを設定します。
「ジョブによるインジェスト」および「トリガー パイプライン モードと継続的パイプライン モード」を参照してください。
インジェストパートナー
多くのサードパーティ ツールは、Databricks へのバッチまたはストリーミング インジェストをサポートしています。 Databricks はさまざまなサードパーティ統合を検証していますが、ソースシステムへのアクセスを設定し、データを取り込む手順はツールによって異なります。 検証済みのツールの一覧については、「 インジェスト パートナー 」を参照してください。 一部の技術パートナーは、サードパーティ ツールをレイクハウス データに簡単に接続できる UI を提供する Databricks Partner Connectでも紹介されています。
DIYインジェスト
Databricks 一般的なコンピュートプラットフォームを提供します。 その結果、Python や Java など、Databricks でサポートされている任意のプログラミング言語を使用して、独自のインジェスト コネクタを作成できます。 また、データロードツール、Airbyte、Debeziumなどの一般的なオープンソースコネクタライブラリをインポートして活用することもできます。
インジェストの代替手段
Databricks では、大量のデータ、低レイテンシのクエリ、サードパーティ API の制限に対応するようにスケーリングできるため、ほとんどのユース ケースでインジェストを推奨しています。 インジェストでは、ソース システムから Databricks にデータがコピーされるため、重複するデータが発生し、時間の経過と共に古くなる可能性があります。 データをコピーしない場合は、次のツールを使用できます。
- レイクハウスフェデレーション を使用すると、データを移動せずに外部データソースをクエリできます。
- Delta Sharing を使用すると、プラットフォーム、クラウド、リージョン間でデータを安全に共有できます。
ただし、データをコピーしたくない場合は、レイクハウスフェデレーションまたは Delta Sharingを使用します。
Delta Sharing を使用する場合
Delta Sharing は、次のシナリオで選択します。
- データ重複の制限
- 可能な限り最新のデータのクエリ
レイクハウスフェデレーションを使用する場合
次のシナリオでは、レイクハウスフェデレーションを選択します。
- ETL パイプラインのアドホック レポート作成または概念実証作業
インジェスト方法を選択する際の考慮事項
配慮 | 指導 |
---|---|
データソース | データソースに LakeFlow Connect ネイティブコネクタが存在する場合、これがデータを取り込む最も簡単な方法になります。 LakeFlow Connectでサポートされていないデータソースの場合は、ソース からデータを抽出し、Auto Loader を使用してデータを Databricksに取り込みます。ローカル ファイルの場合は、Databricks UI を使用してデータをアップロードします。 |
レイテンシー | ほぼリアルタイムでデータを分析したい場合は、ストリーミングを使用してインクリメンタル処理を活用します。 ストリーミングでは、各レコードが到着するとすぐにデータをクエリできます。 それ以外の場合は、バッチ取り込みを使用します。 |
データの移動 | ソース システムから Databricksにデータをコピーできない場合は、レイクハウスフェデレーションまたは Delta Sharingを使用します。 |
Delta Lake へのデータの移行
既存のデータを Delta Lake に移行する方法については、「 Delta Lake へのデータの移行」を参照してください。
COPY INTO (レガシー)
CREATE STREAMING TABLE
SQL コマンドは、クラウド オブジェクト ストレージからの増分インジェストに推奨される従来の COPY INTO
SQL コマンドの代替手段です。 「COPY INTO」を参照してください。よりスケーラブルで堅牢なファイルインジェストエクスペリエンスを実現するために、 Databricks SQL ユーザーは COPY INTO
ではなくストリーミングテーブルを活用することをお勧めします。