Azure イベント ハブを Delta Live Tables データソースとして使用する
この記事では、 Delta Live Tables を使用して Azure Event Hubs からのメッセージを処理する方法について説明します。 構造化 ストリーミング Event Hub s コネクタ は、このライブラリ が Databricks Runtime の一部として使用できず 、 サードパーティの JVM ライブラリを使用できない ため Delt a Live Tables は使用できません。
Delta Live Tables からAzure イベント ハブに接続するにはどうすればよいですか?
Azure Event Hubs には、Azure Event Hubs からのメッセージを処理するために、 Databricks Runtime で利用可能な 構造化ストリーミング Kafka コネクタで使用できる Apache Kafka と互換性のあるエンドポイントが用意されています。Azure Event Hubs と Apache Kafka の互換性の詳細については、「 Apache Kafka アプリケーションから Azure Event Hubs を使用する」を参照してください。
次のステップでは、 Delta Live Tables パイプラインを既存の Event Hubs インスタンスに接続し、トピックからイベントを使用する方法について説明します。 これらのステップを完了するには、次の Event Hubs 接続値が必要です。
イベント ハブ名前空間の名前。
イベント ハブ名前空間内のイベント ハブ インスタンスの名前。
Event Hubs の共有アクセス ポリシー名とポリシー キー。 デフォルトにより、Event Hubs 名前空間ごとに
RootManageSharedAccessKey
ポリシーが作成されます。 このポリシーには、manage
、send
、およびlisten
のアクセス許可があります。 パイプラインが Event Hubs からの読み取りのみを行う場合、Databricks では、リッスン アクセス許可のみを使用して新しいポリシーを作成することをお勧めします。
Event Hubs 接続文字列の詳細については、「 Event Hubs 接続文字列の取得」を参照してください。
注
Azure Event Hubs には、セキュリティで保護されたリソースへのアクセスを承認するための OAuth 2.0 と共有アクセス署名 (SAS) オプションの両方が用意されています。 これらの手順では、SAS ベースの認証を使用します。
Azure ポータルから Event Hubs 接続文字列を取得した場合、
EntityPath
値が含まれていない可能性があります。EntityPath
値は、構造化ストリーミング イベント ハブ コネクタを使用する場合にのみ必要です。 構造化ストリーミング Kafka コネクタを使用するには、トピック名のみを指定する必要があります。
ポリシー キーを Databricks シークレットに格納する
ポリシー キーは機密情報であるため、Databricks ではパイプライン コードの値をハードコーディングしないことをお勧めします。 代わりに、Databricks シークレットを使用して、キーへのアクセスを格納および管理します。
次の例では、Databricks CLI を使用してシークレットスコープを作成し、そのシークレットスコープにキーを格納します。 パイプライン コードで、 scope-name
と shared-policy-name
で dbutils.secrets.get()
関数を使用してキー値を取得します。
databricks --profile <profile-name> secrets create-scope <scope-name>
databricks --profile <profile-name> secrets put-secret <scope-name> <shared-policy-name> --string-value <shared-policy-key>
Databricks シークレットの詳細については、 「シークレット管理」を参照してください。
ノートブックを作成し、イベントを使用するパイプライン コードを追加する
次の例ではトピックから IoT イベントを読み取りますが、アプリケーションの要件に合わせて例を調整できます。 ベスト プラクティスとして、Databricks では Delta Live Tables パイプライン設定を使用してアプリケーション変数を構成することをお勧めします。 パイプライン コードは、 spark.conf.get()
関数を使用して値を取得します。 パイプライン設定を使用してパイプラインをパラメーター化する方法の詳細については、「PythonまたはSQLでデータセット宣言をパラメーター化する」を参照してください。
import dlt
import pyspark.sql.types as T
from pyspark.sql.functions import *
# Event Hubs configuration
EH_NAMESPACE = spark.conf.get("iot.ingestion.eh.namespace")
EH_NAME = spark.conf.get("iot.ingestion.eh.name")
EH_CONN_SHARED_ACCESS_KEY_NAME = spark.conf.get("iot.ingestion.eh.accessKeyName")
SECRET_SCOPE = spark.conf.get("io.ingestion.eh.secretsScopeName")
EH_CONN_SHARED_ACCESS_KEY_VALUE = dbutils.secrets.get(scope = SECRET_SCOPE, key = EH_CONN_SHARED_ACCESS_KEY_NAME)
EH_CONN_STR = f"Endpoint=sb://{EH_NAMESPACE}.servicebus.windows.net/;SharedAccessKeyName={EH_CONN_SHARED_ACCESS_KEY_NAME};SharedAccessKey={EH_CONN_SHARED_ACCESS_KEY_VALUE}"
# Kafka Consumer configuration
KAFKA_OPTIONS = {
"kafka.bootstrap.servers" : f"{EH_NAMESPACE}.servicebus.windows.net:9093",
"subscribe" : EH_NAME,
"kafka.sasl.mechanism" : "PLAIN",
"kafka.security.protocol" : "SASL_SSL",
"kafka.sasl.jaas.config" : f"kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{EH_CONN_STR}\";",
"kafka.request.timeout.ms" : spark.conf.get("iot.ingestion.kafka.requestTimeout"),
"kafka.session.timeout.ms" : spark.conf.get("iot.ingestion.kafka.sessionTimeout"),
"maxOffsetsPerTrigger" : spark.conf.get("iot.ingestion.spark.maxOffsetsPerTrigger"),
"failOnDataLoss" : spark.conf.get("iot.ingestion.spark.failOnDataLoss"),
"startingOffsets" : spark.conf.get("iot.ingestion.spark.startingOffsets")
}
# PAYLOAD SCHEMA
payload_ddl = """battery_level BIGINT, c02_level BIGINT, cca2 STRING, cca3 STRING, cn STRING, device_id BIGINT, device_name STRING, humidity BIGINT, ip STRING, latitude DOUBLE, lcd STRING, longitude DOUBLE, scale STRING, temp BIGINT, timestamp BIGINT"""
payload_schema = T._parse_datatype_string(payload_ddl)
# Basic record parsing and adding ETL audit columns
def parse(df):
return (df
.withColumn("records", col("value").cast("string"))
.withColumn("parsed_records", from_json(col("records"), payload_schema))
.withColumn("iot_event_timestamp", expr("cast(from_unixtime(parsed_records.timestamp / 1000) as timestamp)"))
.withColumn("eh_enqueued_timestamp", expr("timestamp"))
.withColumn("eh_enqueued_date", expr("to_date(timestamp)"))
.withColumn("etl_processed_timestamp", col("current_timestamp"))
.withColumn("etl_rec_uuid", expr("uuid()"))
.drop("records", "value", "key")
)
@dlt.create_table(
comment="Raw IOT Events",
table_properties={
"quality": "bronze",
"pipelines.reset.allowed": "false" # preserves the data in the delta table if you do full refresh
},
partition_cols=["eh_enqueued_date"]
)
@dlt.expect("valid_topic", "topic IS NOT NULL")
@dlt.expect("valid records", "parsed_records IS NOT NULL")
def iot_raw():
return (
spark.readStream
.format("kafka")
.options(**KAFKA_OPTIONS)
.load()
.transform(parse)
)
パイプラインを作成する
次の設定で新しいパイプラインを作成し、プレースホルダー値を環境に適した値に置き換えます。
{
"clusters": [
{
"spark_conf": {
"spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
},
"num_workers": 4
}
],
"development": true,
"continuous": false,
"channel": "CURRENT",
"edition": "ADVANCED",
"photon": false,
"libraries": [
{
"notebook": {
"path": "<path-to-notebook>"
}
}
],
"name": "dlt_eventhub_ingestion_using_kafka",
"storage": "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/iot/",
"configuration": {
"iot.ingestion.eh.namespace": "<eh-namespace>",
"iot.ingestion.eh.accessKeyName": "<eh-policy-name>",
"iot.ingestion.eh.name": "<eventhub>",
"io.ingestion.eh.secretsScopeName": "<secret-scope-name>",
"iot.ingestion.spark.maxOffsetsPerTrigger": "50000",
"iot.ingestion.spark.startingOffsets": "latest",
"iot.ingestion.spark.failOnDataLoss": "false",
"iot.ingestion.kafka.requestTimeout": "60000",
"iot.ingestion.kafka.sessionTimeout": "30000"
},
"target": "<target-database-name>"
}
取り替える
<container-name>
を Azure ストレージ アカウント コンテナーの名前に置き換えます。<storage-account-name>
を ADLS Gen2 ストレージ アカウントの名前に置き換えます。<eh-namespace>
をイベント ハブ名前空間の名前に置き換えます。<eh-policy-name>
を Event Hubs ポリシー キーのシークレットスコープ キーに置き換えます。<eventhub>
をイベント ハブ インスタンスの名前に置き換えます。<secret-scope-name>
を、イベント ハブ ポリシー キーを含む Databricks シークレット スコープの名前に置き換えます。
ベスト プラクティスとして、このパイプラインでは既定の DBFS ストレージ パスを使用せず、代わりに Azure データレイク ストレージ Gen2 (ADLS Gen2) ストレージ アカウントを使用します。 ADLS Gen2 ストレージ アカウントの認証の構成の詳細については、「 パイプライン内のシークレットを使用してストレージ資格情報に安全にアクセスする」を参照してください。