Azure Event Hubs を LakeFlow 宣言型パイプライン データソースとして使用する
この記事では LakeFlow 宣言型パイプラインを使用して Azure Event Hubs からのメッセージを処理する方法について説明します。 構造化ストリーミング Event Hubs コネクタは Databricks Runtimeの一部として使用できず、LakeFlow 宣言型パイプラインではサードパーティの JVM ライブラリを使用できないため、このコネクタを使用することはできません。
宣言型パイプラインLakeFlow Event Hubs に接続するAzure
Azure Event Hubs には、Apache Kafka と互換性のあるエンドポイントが用意されており、Databricks Runtime で使用できる 構造化ストリーミング Kafka コネクタと共に使用して、Azure Event Hubs からのメッセージを処理できます。 AzureEvent Hubs とApacheKafka の互換性の詳細については、「Azureアプリケーションから Event Hubs を使用するApacheKafka」を参照してください。
次の手順では LakeFlow 宣言型パイプラインを既存の Event Hubs インスタンスに接続し、トピックからイベントを使用する方法について説明します。 これらの手順を完了するには、次の Event Hubs 接続値が必要です。
- Event Hubs 名前空間の名前。
- Event Hubs 名前空間の Event Hub インスタンスの名前。
- Event Hubs の共有アクセス ポリシー名とポリシー キー。 By Default は、Event Hubs 名前空間ごとに
RootManageSharedAccessKey
ポリシーが作成されます。 このポリシーには、manage
、send
、およびlisten
のアクセス許可があります。 パイプラインが Event Hubs からのみ読み取る場合、Databricks ではリッスンアクセス許可のみを持つ新しいポリシーを作成することをお勧めします。
Event Hubs 接続文字列の詳細については、「 Event Hubs 接続文字列を取得する」を参照してください。
- Azure Event Hubs には、セキュリティで保護されたリソースへのアクセスを承認するための OAuth 2.0 と Shared Access Signature (SAS) の両方のオプションが用意されています。 これらの手順では、SAS ベースの認証を使用します。
- Azure portal から Event Hubs 接続文字列を取得する場合、
EntityPath
値が含まれていない可能性があります。EntityPath
値は、構造化ストリーミング Event Hubs コネクタを使用する場合にのみ必要です。構造化ストリーミング Kafka Connector を使用するには、トピック名のみを指定する必要があります。
ポリシーキーを Databricks シークレットに格納する
ポリシー キーは機密情報であるため、Databricks ではパイプライン コードで値をハードコーディングしないことをお勧めします。 代わりに、Databricks シークレットを使用して、キーへのアクセスを格納および管理します。
次の例では、 Databricks CLI を使用してシークレットスコープを作成し、そのシークレットスコープにキーを格納します。 パイプライン コードで、scope-name
と shared-policy-name
と共に dbutils.secrets.get()
関数を使用して、キー値を取得します。
databricks --profile <profile-name> secrets create-scope <scope-name>
databricks --profile <profile-name> secrets put-secret <scope-name> <shared-policy-name> --string-value <shared-policy-key>
シークレット Databricks の詳細については、「 シークレットの管理」を参照してください。
ノートブックを作成し、イベントを使用するパイプライン コードを追加する
次の例では、トピックから IoT イベントを読み取りますが、アプリケーションの要件に合わせて例を調整できます。ベスト プラクティスとして、 Databricks LakeFlow 宣言型パイプライン設定を使用してアプリケーション変数を構成することをお勧めします。 その後、パイプライン コードでは spark.conf.get()
関数を使用して値を取得します。パイプライン設定を使用してパイプラインをパラメーター化する方法の詳細については、「LakeFlow 宣言型パイプラインでパラメーターを使用する」を参照してください。
import dlt
import pyspark.sql.types as T
from pyspark.sql.functions import *
# Event Hubs configuration
EH_NAMESPACE = spark.conf.get("iot.ingestion.eh.namespace")
EH_NAME = spark.conf.get("iot.ingestion.eh.name")
EH_CONN_SHARED_ACCESS_KEY_NAME = spark.conf.get("iot.ingestion.eh.accessKeyName")
SECRET_SCOPE = spark.conf.get("io.ingestion.eh.secretsScopeName")
EH_CONN_SHARED_ACCESS_KEY_VALUE = dbutils.secrets.get(scope = SECRET_SCOPE, key = EH_CONN_SHARED_ACCESS_KEY_NAME)
EH_CONN_STR = f"Endpoint=sb://{EH_NAMESPACE}.servicebus.windows.net/;SharedAccessKeyName={EH_CONN_SHARED_ACCESS_KEY_NAME};SharedAccessKey={EH_CONN_SHARED_ACCESS_KEY_VALUE}"
# Kafka Consumer configuration
KAFKA_OPTIONS = {
"kafka.bootstrap.servers" : f"{EH_NAMESPACE}.servicebus.windows.net:9093",
"subscribe" : EH_NAME,
"kafka.sasl.mechanism" : "PLAIN",
"kafka.security.protocol" : "SASL_SSL",
"kafka.sasl.jaas.config" : f"kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{EH_CONN_STR}\";",
"kafka.request.timeout.ms" : spark.conf.get("iot.ingestion.kafka.requestTimeout"),
"kafka.session.timeout.ms" : spark.conf.get("iot.ingestion.kafka.sessionTimeout"),
"maxOffsetsPerTrigger" : spark.conf.get("iot.ingestion.spark.maxOffsetsPerTrigger"),
"failOnDataLoss" : spark.conf.get("iot.ingestion.spark.failOnDataLoss"),
"startingOffsets" : spark.conf.get("iot.ingestion.spark.startingOffsets")
}
# PAYLOAD SCHEMA
payload_ddl = """battery_level BIGINT, c02_level BIGINT, cca2 STRING, cca3 STRING, cn STRING, device_id BIGINT, device_name STRING, humidity BIGINT, ip STRING, latitude DOUBLE, lcd STRING, longitude DOUBLE, scale STRING, temp BIGINT, timestamp BIGINT"""
payload_schema = T._parse_datatype_string(payload_ddl)
# Basic record parsing and adding ETL audit columns
def parse(df):
return (df
.withColumn("records", col("value").cast("string"))
.withColumn("parsed_records", from_json(col("records"), payload_schema))
.withColumn("iot_event_timestamp", expr("cast(from_unixtime(parsed_records.timestamp / 1000) as timestamp)"))
.withColumn("eh_enqueued_timestamp", expr("timestamp"))
.withColumn("eh_enqueued_date", expr("to_date(timestamp)"))
.withColumn("etl_processed_timestamp", col("current_timestamp"))
.withColumn("etl_rec_uuid", expr("uuid()"))
.drop("records", "value", "key")
)
@dlt.create_table(
comment="Raw IOT Events",
table_properties={
"quality": "bronze",
"pipelines.reset.allowed": "false" # preserves the data in the delta table if you do full refresh
},
partition_cols=["eh_enqueued_date"]
)
@dlt.expect("valid_topic", "topic IS NOT NULL")
@dlt.expect("valid records", "parsed_records IS NOT NULL")
def iot_raw():
return (
spark.readStream
.format("kafka")
.options(**KAFKA_OPTIONS)
.load()
.transform(parse)
)
パイプラインを作成する
次の設定で新しいパイプラインを作成し、プレースホルダーの値を環境に適した値に置き換えます。
{
"clusters": [
{
"spark_conf": {
"spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
},
"num_workers": 4
}
],
"development": true,
"continuous": false,
"channel": "CURRENT",
"edition": "ADVANCED",
"photon": false,
"libraries": [
{
"notebook": {
"path": "<path-to-notebook>"
}
}
],
"name": "dlt_eventhub_ingestion_using_kafka",
"storage": "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/iot/",
"configuration": {
"iot.ingestion.eh.namespace": "<eh-namespace>",
"iot.ingestion.eh.accessKeyName": "<eh-policy-name>",
"iot.ingestion.eh.name": "<eventhub>",
"io.ingestion.eh.secretsScopeName": "<secret-scope-name>",
"iot.ingestion.spark.maxOffsetsPerTrigger": "50000",
"iot.ingestion.spark.startingOffsets": "latest",
"iot.ingestion.spark.failOnDataLoss": "false",
"iot.ingestion.kafka.requestTimeout": "60000",
"iot.ingestion.kafka.sessionTimeout": "30000"
},
"target": "<target-database-name>"
}
置き換え
<container-name>
を Azure ストレージ アカウント コンテナーの名前に置き換えます。<storage-account-name>
を ADLS ストレージ アカウントの名前に置き換えます。<eh-namespace>
を Event Hubs 名前空間の名前に置き換えます。<eh-policy-name>
を Event Hubs ポリシー キーのシークレットスコープ キーに置き換えます。<eventhub>
を Event Hubs インスタンスの名前に置き換えます。<secret-scope-name>
を、Event Hubs ポリシー キーを含む Databricks シークレットスコープの名前に置き換えます。
ベスト プラクティスとして、このパイプラインでは デフォルト DBFS ストレージ パスを使用せず、代わりに Azure データレイク Storage (ADLS) ストレージ アカウントを使用します。 ADLS ストレージ アカウントの認証の構成の詳細については、「 パイプライン内のシークレットを使用してストレージ資格情報に安全にアクセスする」を参照してください。