メインコンテンツまでスキップ

Azure Event Hubs を Lakeflow 宣言型パイプライン データソースとして使用する

この記事では Lakeflow 宣言型パイプラインを使用して Azure Event Hubs からのメッセージを処理する方法について説明します。 構造化ストリーミング Event Hubs コネクタは Databricks Runtimeの一部として使用できず、Lakeflow 宣言型パイプラインではサードパーティの JVM ライブラリを使用できないため、このコネクタを使用することはできません。

宣言型パイプラインLakeflow Event Hubs に接続するAzure

Azure Event Hubs は、 Databricks Runtimeで利用可能な構造化ストリーミングKafkaコネクタで使用できるApache Kafkaと互換性のあるエンドポイントを提供し、 Azure Event Hubs からのメッセージを処理します。 Azure Event Hubs とApache Kafka互換性の詳細については、 「 Apache KafkaアプリケーションからAzure Event Hubs を使用する」を参照してください。

次の手順では Lakeflow 宣言型パイプラインを既存の Event Hubs インスタンスに接続し、トピックからイベントを使用する方法について説明します。 これらの手順を完了するには、次の Event Hubs 接続値が必要です。

  • Event Hubs 名前空間の名前。
  • Event Hubs 名前空間内の Event Hub インスタンスの名前。
  • Event Hubs の共有アクセス ポリシー名とポリシー キー。デフォルトでは、Event Hubs 名前空間ごとにRootManageSharedAccessKeyポリシーが作成されます。このポリシーにはmanagesend 、およびlisten権限があります。パイプラインが Event Hubs からのみ読み取る場合、Databricks では、リッスン権限のみを持つ新しいポリシーを作成することをお勧めします。

Event Hubs 接続文字列の詳細については、 「Event Hubs 接続文字列を取得する」を参照してください。

注記
  • Azure Event Hubs では、セキュリティで保護されたリソースへのアクセスを承認するために、OAuth 2.0 と共有アクセス署名 (SAS) の両方のオプションが提供されます。これらの手順では、SAS ベースの認証を使用します。
  • Azure ポータルから Event Hubs 接続文字列を取得した場合、 EntityPath値が含まれていない可能性があります。EntityPath値は、構造化ストリーミング Event Hubs コネクタを使用する場合にのみ必要です。 構造化ストリーミング Kafka コネクタを使用するには、トピック名のみを指定する必要があります。

ポリシーキーをDatabricksシークレットに保存する

ポリシー キーは機密情報であるため、Databricks ではパイプライン コードに値をハードコーディングしないことをお勧めします。代わりに、Databricks シークレットを使用してキーを保存およびアクセスを管理します。

次の例では、 Databricks CLIを使用してシークレットスコープを作成し、そのシークレットスコープにキーを保存します。 パイプライン コードでは、 scope-nameおよびshared-policy-nameと共にdbutils.secrets.get()関数を使用してキー値を取得します。

Bash
databricks --profile <profile-name> secrets create-scope <scope-name>

databricks --profile <profile-name> secrets put-secret <scope-name> <shared-policy-name> --string-value <shared-policy-key>

Databricksシークレットの詳細については、 「シークレット管理」を参照してください。

パイプラインを作成し、イベントを消費するコードを追加する

次の例では、トピックから IoT イベントを読み取りますが、アプリケーションの要件に合わせて例を調整できます。ベスト プラクティスとして、 Databricks Lakeflow 宣言型パイプライン設定を使用してアプリケーション変数を構成することをお勧めします。 その後、パイプライン コードでは spark.conf.get() 関数を使用して値を取得します。

Python
from pyspark import pipelines as dp
import pyspark.sql.types as T
from pyspark.sql.functions import *

# Event Hubs configuration
EH_NAMESPACE = spark.conf.get("iot.ingestion.eh.namespace")
EH_NAME = spark.conf.get("iot.ingestion.eh.name")

EH_CONN_SHARED_ACCESS_KEY_NAME = spark.conf.get("iot.ingestion.eh.accessKeyName")
SECRET_SCOPE = spark.conf.get("io.ingestion.eh.secretsScopeName")
EH_CONN_SHARED_ACCESS_KEY_VALUE = dbutils.secrets.get(scope = SECRET_SCOPE, key = EH_CONN_SHARED_ACCESS_KEY_NAME)

EH_CONN_STR = f"Endpoint=sb://{EH_NAMESPACE}.servicebus.windows.net/;SharedAccessKeyName={EH_CONN_SHARED_ACCESS_KEY_NAME};SharedAccessKey={EH_CONN_SHARED_ACCESS_KEY_VALUE}"
# Kafka Consumer configuration

KAFKA_OPTIONS = {
"kafka.bootstrap.servers" : f"{EH_NAMESPACE}.servicebus.windows.net:9093",
"subscribe" : EH_NAME,
"kafka.sasl.mechanism" : "PLAIN",
"kafka.security.protocol" : "SASL_SSL",
"kafka.sasl.jaas.config" : f"kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{EH_CONN_STR}\";",
"kafka.request.timeout.ms" : spark.conf.get("iot.ingestion.kafka.requestTimeout"),
"kafka.session.timeout.ms" : spark.conf.get("iot.ingestion.kafka.sessionTimeout"),
"maxOffsetsPerTrigger" : spark.conf.get("iot.ingestion.spark.maxOffsetsPerTrigger"),
"failOnDataLoss" : spark.conf.get("iot.ingestion.spark.failOnDataLoss"),
"startingOffsets" : spark.conf.get("iot.ingestion.spark.startingOffsets")
}

# PAYLOAD SCHEMA
payload_ddl = """battery_level BIGINT, c02_level BIGINT, cca2 STRING, cca3 STRING, cn STRING, device_id BIGINT, device_name STRING, humidity BIGINT, ip STRING, latitude DOUBLE, lcd STRING, longitude DOUBLE, scale STRING, temp BIGINT, timestamp BIGINT"""
payload_schema = T._parse_datatype_string(payload_ddl)

# Basic record parsing and adding ETL audit columns
def parse(df):
return (df
.withColumn("records", col("value").cast("string"))
.withColumn("parsed_records", from_json(col("records"), payload_schema))
.withColumn("iot_event_timestamp", expr("cast(from_unixtime(parsed_records.timestamp / 1000) as timestamp)"))
.withColumn("eh_enqueued_timestamp", expr("timestamp"))
.withColumn("eh_enqueued_date", expr("to_date(timestamp)"))
.withColumn("etl_processed_timestamp", col("current_timestamp"))
.withColumn("etl_rec_uuid", expr("uuid()"))
.drop("records", "value", "key")
)

@dp.create_table(
comment="Raw IOT Events",
table_properties={
"quality": "bronze",
"pipelines.reset.allowed": "false" # preserves the data in the delta table if you do full refresh
},
partition_cols=["eh_enqueued_date"]
)
@dp.expect("valid_topic", "topic IS NOT NULL")
@dp.expect("valid records", "parsed_records IS NOT NULL")
def iot_raw():
return (
spark.readStream
.format("kafka")
.options(**KAFKA_OPTIONS)
.load()
.transform(parse)
)

パイプラインを作成する

Python ソース ファイルを使用して新しいパイプラインを作成し、上記のコードを入力します。

コード参照が構成されています。 次の JSON 構成を使用し、プレースホルダー値を環境に適した値に置き換えます (JSON に続くリストを参照)。設定 UI を使用するか、設定JSON直接編集して、 を設定できます。 パイプライン設定を使用してパイプラインをパラメーター化する方法の詳細については、 LakeFlow宣言型パイプラインでの使用」を参照してください。

この設定ファイルは、 Azureデータレイク ストレージ ( ADLS ) ストレージ アカウントの保存場所も定義します。 ベスト プラクティスとして、このパイプラインは当然DBFSストレージ パスを使用せず、代わりにADLSストレージ アカウントを使用します。 ADLS ストレージ アカウントの認証の構成の詳細については、 「パイプラインでシークレットを使用してストレージ資格情報に安全にアクセスする」を参照してください。

JSON
{
"storage": "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/iot/",
"configuration": {
"iot.ingestion.eh.namespace": "<eh-namespace>",
"iot.ingestion.eh.accessKeyName": "<eh-policy-name>",
"iot.ingestion.eh.name": "<eventhub>",
"io.ingestion.eh.secretsScopeName": "<secret-scope-name>",
"iot.ingestion.spark.maxOffsetsPerTrigger": "50000",
"iot.ingestion.spark.startingOffsets": "latest",
"iot.ingestion.spark.failOnDataLoss": "false",
"iot.ingestion.kafka.requestTimeout": "60000",
"iot.ingestion.kafka.sessionTimeout": "30000"
}
}

次のプレースホルダーを置き換えます。

  • <container-name> Azure ストレージ アカウント コンテナーの名前。
  • <storage-account-name> ADLS ストレージ アカウントの名前を使用します。
  • <eh-namespace> Event Hubs 名前空間の名前に置き換えます。
  • <eh-policy-name> Event Hubs ポリシー キーのシークレットスコープ キーを使用します。
  • <eventhub> Event Hubs インスタンスの名前に置き換えます。
  • <secret-scope-name> Event Hubs ポリシー キーを含むDatabricksシークレットスコープの名前を付けます。