チュートリアル: エンドツーエンドのレイクハウス アナリティクス パイプラインの実行
このチュートリアルでは、Databricksレイクハウスのエンドツーエンド分析パイプラインを設定する方法を説明します。
このチュートリアルでは、Unity Catalogが有効化されたクラスター上で、一般的なETLタスクを完了するために対話型ノートブックを使用します。Unity Catalog を使用していない場合は、「 Databricks で初めての ETL ワークロードの実行」を参照してください。
このチュートリアルのタスク
この記事の終わりまでに、以下をスムーズに行えるようになります。
- Unity Catalog 対応コンピュート クラスターの起動。
- Databricks ノートブックを作成する。
- Unity Catalog外部ロケーションからのデータの書き込みと読み取り。
- Auto Loaderを用いてUnity Catalogへのインクリメンタルなデータ取り込みの設定。
- ノートブックのセルを実行して、データを処理、クエリ、プレビューします。
- ノートブックを Databricks ジョブとしてスケジュールする。
- Databricks SQL からの Unity Catalog テーブルのクエリ
Databricks には、データ プロフェッショナルが抽出、変換、読み込み (ETL) パイプラインを迅速に開発してデプロイできるようにする、運用環境対応のツール スイートが用意されています。Unity Catalogを使用すると、データスチュワードは、組織全体のユーザーのストレージ資格情報、外部ロケーション、およびデータベース オブジェクトを構成してセキュリティで保護できます。Databricks SQL を使用すると、アナリストは運用環境の ETL ワークロードで使用されているのと同じテーブルに対して SQL クエリーを実行できるため、大規模なリアルタイムのビジネスインテリジェンスが可能になります。
DLT を使用して ETL パイプラインを構築することもできます。Databricks は、本番運用 ETL パイプラインの構築、デプロイ、保守の複雑さを軽減するために DLT を作成しました。 「 チュートリアル: 初めての DLT パイプラインを実行する」を参照してください。
必要条件
クラスター制御権限がない場合でも、 クラスターにアクセスできる限り、以下のほとんどの手順を完了できます。
ステップ 1: クラスターを作成する
探索的データ分析とデータエンジニアリングを行うには、コマンドの実行に必要なコンピュートリソースを提供するクラスターを作成します。
- サイドバーの
[ コンピュート ]をクリックします。
サイドバーで「 新規 」をクリックし、「 クラスター 」を選択します。これにより、新しいクラスター/コンピュートページが開きます。
- クラスターの一意の名前を指定します。
- [パフォーマンス ] セクションで、[ 単一ノード ] ラジオ ボタンを選択します。
- [詳細設定] で、アクセス モード設定を [手動 ] に切り替え、[ 専用 ] を選択します。
- [ シングル ユーザーまたはグループ ] で、ユーザー名を選択します。
- Unityカタログを使用するには、 Databricks ランタイムバージョン の11.1 以降を選択します。
- [コンピュートの作成] をクリックして、クラスターを作成します。
Databricks クラスターの詳細については、「コンピュート」を参照してください。
手順 2: Databricks ノートブックを作成する
ワークスペースにノートブックを作成するには、サイドバーの「 新規 」をクリックし、「 ノートブック 」をクリックします。空白のノートブックがワークスペースで開きます。
ノートブックの作成と管理の詳細については、「 ノートブックの管理」を参照してください。
ステップ3:Unity Catalogが管理する外部ロケーションからのデータの書き込みと読み取り
Databricks では、増分データ取り込みに Auto Loader を使用することをお勧めします。Auto Loader は、新しいファイルがクラウド オブジェクト ストレージに到着すると、自動的に検出して処理します。
Unity Catalogを使用して、外部ロケーションへの安全なアクセスを管理します。外部ロケーションに対するREAD FILES
権限を持つユーザーまたはサービスプリンシパルは、 Auto Loader を使用してデータを取り込むことができます。
通常、データは他のシステムからの書き込みのために外部ロケーションに到着します。このデモでは、JSON ファイルを外部ロケーションに書き出すことで、データ到着をシミュレートできます。
次のコードをノートブックのセルにコピーします。catalog
の文字列値を、 CREATE CATALOG
および USE CATALOG
アクセス許可を持つカタログの名前に置き換えます。external_location
の文字列値を、 READ FILES
、WRITE FILES
、および CREATE EXTERNAL TABLE
のアクセス許可を持つ外部ロケーションのパスに置き換えます。
外部ロケーションは、ストレージ コンテナー全体として定義できますが、多くの場合、コンテナーに入れ子になったディレクトリを指します。
外部ロケーションパスの正しい形式は "gs://bucket-name/path/to/external_location"
です。
external_location = "<your-external-location>"
catalog = "<your-catalog>"
dbutils.fs.put(f"{external_location}/filename.txt", "Hello world!", True)
display(dbutils.fs.head(f"{external_location}/filename.txt"))
dbutils.fs.rm(f"{external_location}/filename.txt")
display(spark.sql(f"SHOW SCHEMAS IN {catalog}"))
このセルを実行すると、12 バイトを読み取る行が出力され、文字列 "Hello world!" が出力され、指定されたカタログに存在するすべてのデータベースが表示されます。 このセルを実行できない場合は、Unity Catalog が有効なワークスペースにいることを確認し、ワークスペース管理者に適切なアクセス許可を要求して、このチュートリアルを完了してください。
以下の Python コードは、Eメールアドレスを使用して、提供されたカタログに一意のデータベースを作成し、提供された外部ロケーションに一意の保存場所を作成します。 このセルを実行すると、このチュートリアルに関連付けられているすべてのデータが削除され、この例をべき等に実行できるようになります。 クラスが定義され、インスタンス化され、接続されたシステムからソースの外部ロケーションに到着するデータのバッチをシミュレートするために使用します。
このコードをノートブックの新しいセルにコピーし、実行して環境を構成します。
このコードで定義された変数を使用すると、既存のワークスペースアセットや他のユーザーと競合するリスクを負うことなく、コードを安全に実行できます。ネットワークまたはストレージのアクセス許可が制限されていると、このコードを実行するとエラーが発生します。これらの制限のトラブルシューティングについては、ワークスペース管理者に問い合わせてください。
from pyspark.sql.functions import col
# Set parameters for isolation in workspace and reset demo
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
database = f"{catalog}.e2e_lakehouse_{username}_db"
source = f"{external_location}/e2e-lakehouse-source"
table = f"{database}.target_table"
checkpoint_path = f"{external_location}/_checkpoint/e2e-lakehouse-demo"
spark.sql(f"SET c.username='{username}'")
spark.sql(f"SET c.database={database}")
spark.sql(f"SET c.source='{source}'")
spark.sql("DROP DATABASE IF EXISTS ${c.database} CASCADE")
spark.sql("CREATE DATABASE ${c.database}")
spark.sql("USE ${c.database}")
# Clear out data from previous demo execution
dbutils.fs.rm(source, True)
dbutils.fs.rm(checkpoint_path, True)
# Define a class to load batches of data to source
class LoadData:
def __init__(self, source):
self.source = source
def get_date(self):
try:
df = spark.read.format("json").load(source)
except:
return "2016-01-01"
batch_date = df.selectExpr("max(distinct(date(tpep_pickup_datetime))) + 1 day").first()[0]
if batch_date.month == 3:
raise Exception("Source data exhausted")
return batch_date
def get_batch(self, batch_date):
return (
spark.table("samples.nyctaxi.trips")
.filter(col("tpep_pickup_datetime").cast("date") == batch_date)
)
def write_batch(self, batch):
batch.write.format("json").mode("append").save(self.source)
def land_batch(self):
batch_date = self.get_date()
batch = self.get_batch(batch_date)
self.write_batch(batch)
RawData = LoadData(source)
これで、次のコードをセルにコピーして実行することで、データのバッチをランディングできます。 このセルを最大 60 回まで手動で実行して、新しいデータ到着をトリガーできます。
RawData.land_batch()
ステップ 4: Unity Catalogにデータを取り込むように Auto Loaderを構成する
Databricks では、 Delta Lake を使用してデータを格納することをお勧めします。 Delta Lake は、 ACIDトランザクションを提供し、データレイクハウスを有効にするオープンソース ストレージ レイヤーです。 Delta Lake は、 Databricksで作成されたテーブルのデフォルト形式です。
Unity Catalogテーブルにデータを取り込むようにAuto Loaderを設定するには、次のコードをコピーしてノートブックの空のセルに貼り付けます:
# Import functions
from pyspark.sql.functions import col, current_timestamp
# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaLocation", checkpoint_path)
.load(source)
.select("*", col("_metadata.source").alias("source_file"), current_timestamp().alias("processing_time"))
.writeStream
.option("checkpointLocation", checkpoint_path)
.trigger(availableNow=True)
.option("mergeSchema", "true")
.toTable(table))
Auto Loaderの詳細については、「Auto Loaderとは」を参照してください。
Unity Catalog での構造化ストリーミングの詳細については、「 Unity Catalog と構造化ストリーミングの使用」を参照してください。
ステップ 5: データの処理と操作
ノートブックは、セルごとにロジックを実行します。以下のステップを使用して、セル内のロジックを実行します。
-
前の手順で完了したセルを実行するには、セルを選択して SHIFT+ENTER キーを押します。
-
作成したテーブルを検索するには、以下のコードをコピーして空のセルに貼り付け、 Shift+Enter キーを押してセルを実行します。
Pythondf = spark.read.table(table)
-
DataFrame内のデータをプレビューするには、以下のコードを空のセルにコピーして貼り付け、 Shift + Enter キーを押してセルを実行します。
Pythondisplay(df)
データを視覚化するための対話型オプションの詳細については、「 Databricks ノートブックでの視覚化」を参照してください。
ステップ 6: ジョブをスケジュールする
DatabricksノートブックをDatabricksジョブのタスクとして追加することで、Databricksノートブックを本番運用スクリプトとして実行できます。このステップでは、手動でトリガーできる新しいジョブを作成します。
ノートブックをタスクとしてスケジュールするには、以下を実行します。
- ヘッダーバーの右側にある「 スケジュール 」をクリックします。
- 「 ジョブ名 」に一意の名前を入力します。
- 「 手動 」をクリックします。
- 「 クラスター 」ドロップダウンで、ステップ1で作成したクラスターを選択します。
- 作成 をクリックします。
- 表示されるウィンドウで、「 今すぐ実行 」をクリックします。
- ジョブの実行結果を表示するには、
最終実行 タイムスタンプの横にある アイコンをクリックします。
ジョブの詳細については、「 ジョブとは」を参照してください。
手順 7: Databricks SQL からテーブルをクエリする
現在のカタログに対するUSE CATALOG
権限、現在のスキーマに対するUSE SCHEMA
権限、およびテーブルに対するSELECT
権限を持つユーザーは、優先するDatabricks APIからテーブルの内容をクエリーできます。
Databricks SQLでクエリーを実行するには、実行中のSQLウェアハウスにアクセスする必要があります。
このチュートリアルで前に作成したテーブルの名前は target_table
です。 最初のセルに指定したカタログと、 patern e2e_lakehouse_<your-username>
を含むデータベースを使用してクエリを実行できます。 カタログエクスプローラを使用して、作成したデータオブジェクトを検索できます。
その他のインテグレーション
Databricksを使用したデータエンジニアリングのための統合とツールの詳細をご覧ください。