データの変換

この記事では、Databricks を使用したデータ変換の概要について説明します。 データの変換、つまりデータの準備は、すべてのデータエンジニアリング、アナリティクス、およびMLワークロードにおける重要なステップです。

この記事のサンプル パターンと推奨事項は、Delta Lake によってサポートされるレイクハウス テーブルの操作に重点を置いています。 Delta Lake は Databricks レイクハウスの ACID 保証を提供するため、他の形式またはデータ システムのデータを操作するときに異なる動作が見られる場合があります。

Databricks 、生データまたはほぼ生データの状態でレイクハウスにデータを取り込み、その後、変換とエンリッチメントを別の処理ステップとして適用することを推奨しています。 このパターンはメダリオンアーキテクチャとして知られています。 「メダリオン レイクハウス建築とは何か?」を参照してください。

変換する必要があるデータがまだレイクハウスにロードされていないことが分かっている場合は、 「Databricks レイクハウスへのデータの取り込み」を参照してください。 変換を記述するためのレイクハウス データを探している場合は、 「データの検出」を参照してください。

すべての変換は、データソースに対してバッチ クエリまたはストリーミング クエリを記述することから始まります。 データのクエリに慣れていない場合は、「 データのクエリ」を参照してください。

変換されたデータをDeltaテーブルに保存すると、そのテーブルをMLの特徴量テーブルとして使用できます。 特徴量エンジニアリングとサービングを参照してください。

注:

ここでの記事では、Databricks での変換について説明します。 Databricks は、多くの一般的なデータ準備プラットフォームへの接続もサポートしています。 「Partner Connect を使用してデータ準備パートナーに接続する」を参照してください。

Spark変換と Lakehouse 変換

この記事では、ETL または ELT の T に関連する 変換 の定義に焦点を当てます。Apache Spark 処理モデルでも、関連する方法で「変換」という言葉が使用されます。 簡単に言うと、Apache Spark では、すべての操作は変換またはアクションのいずれかとして定義されます。

  • 変換: プランに処理ロジックを追加します。 例としては、データの読み取り、結合、集計、型キャストなどがあります。

  • アクション: 処理ロジックをトリガーして、結果を評価して出力します。 たとえば、書き込み、結果の表示またはプレビュー、手動キャッシュ、行数の取得などがあります。

Apache Spark は遅延実行モデルを使用します。つまり、アクションがトリガーされるまで、操作のコレクションによって定義されたロジックは評価されません。 このモデルは、データ処理パイプラインを定義するときに重要な影響を及ぼします。つまり、結果をターゲット テーブルに保存するためのアクションのみを使用します。

アクションはロジックの最適化における処理のボトルネックとなるため、Databricks は Apache Spark にすでに存在するものに加えて、ロジックの最適な実行を保証するために多数の最適化を追加しました。 これらの最適化では、特定のアクションによってトリガーされたすべての変換が一度に考慮され、データの物理レイアウトに基づいて最適なプランが見つかります。 手動でデータをキャッシュしたり、本番運用 パイプラインでプレビュー結果を返したりすると、これらの最適化が中断され、コストとレイテンシが大幅に増加する可能性があります。

したがって、レイクハウス変換は、 1 つ以上のレイクハウス テーブルに適用され、新しいレイクハウス テーブルを生成する一連の操作として定義できます。 結合や集計などの変換については個別に説明しますが、これらのパターンの多くを 1 つの処理ステップで組み合わせて、Databricks のオプティマイザーが最も効率的なプランを見つけることを信頼できます。

ストリーミングとバッチ処理の違いは何ですか?

ストリーミングとバッチ処理は Databricks 上でほぼ同じ構文を使用しますが、それぞれに独自のセマンティクスがあります。

バッチ処理を使用すると、一定量の静的で変化しないデータを単一の操作として処理するための明示的な指示を定義できます。

ストリーム処理を使用すると、無制限で継続的に増加するデータセットに対してクエリを定義し、小さな増分バッチでデータを処理できます。

Databricks のバッチ操作では Spark SQL または DataFrames が使用され、ストリーム処理では構造化ストリーミングが活用されます。

次の表に示すように、読み取り操作と書き込み操作を確認することで、バッチApache Sparkコマンドと構造化ストリーミングを区別できます。

Apachehe Spark

構造化ストリーミング

読み取り

spark.read.load()

spark.readStream.load()

書き込み

spark.write.save()

spark.writeStream.start()

マテリアライズド ビューは通常、バッチ処理の保証に準拠していますが、可能な場合は Delta Live Tables を使用して結果を増分的に計算します。 マテリアライズド ビューによって返される結果は常にロジックのバッチ評価と同等ですが、Databricks は可能な場合はこれらの結果を段階的に処理しようとします。

ストリーミング テーブルは常に結果を増分的に計算します。 多くのストリーミングデータソースはレコードを数時間または数日間のみ保持するため、ストリーミング テーブルで使用される処理モデルでは、データソースからのレコードの各バッチが 1 回だけ処理されると想定されています。

Databricks は、次のユースケースで SQL を使用してストリーミング クエリを記述することをサポートしています。

  • Databricks SQL を使用して Unity Catalog でストリーミング テーブルを定義します。

  • Delta Live Tables パイプラインのソース コードを定義します。

注:

Python 構造化ストリーミング コードを使用して、Delta Live Tables でストリーミング テーブルを宣言することもできます。

バッチ変換

バッチ変換は、特定の時点で明確に定義されたデータ資産のセットに対して実行されます。 バッチ変換は 1 回限りの操作である場合もありますが、多くの場合、本番運用システムを最新の状態に保つために定期的に実行される、スケジュールされたジョブまたはパイプラインの一部です。

インクリメンタル変換

増分パターンでは通常、データソースが追加専用であり、安定したスキーマを持っていると想定されます。 次の記事では、更新、削除、またはスキーマの変更が発生するテーブルでの増分変換のニュアンスについて詳しく説明します。

リアルタイム変換

Delta Lake は、レイクハウスをクエリするすべてのユーザーとアプリケーションに大量のデータへのほぼリアルタイムのアクセスを提供することに優れていますが、クラウドオブジェクトストレージにファイルとメタデータを書き込むオーバーヘッドのため、Delta Lake シンクに書き込む多くのワークロードでは真のリアルタイム レイテンシを実現できません。

レイテンシが極めて低いストリーミング アプリケーションの場合、Databricks では、Kafka などのリアルタイム ワークロード用に設計されたソース システムとシンク システムを選択することをお勧めします。 Databricksを使用すると、集計、ストリーム間の結合、レイクハウスに保存されている緩やかに変化するディメンション データとストリーミング データの結合など、データをエンリッチできます。