メインコンテンツまでスキップ

Azure Event Hubs を DLT データソースとして使用する

この記事では、DLT を使用して Azure Event Hubs からのメッセージを処理する方法について説明します。構造化ストリーミング Event Hubs コネクタは Databricks Runtimeの一部として使用できず、DLT ではサードパーティの JVM ライブラリを使用できないため、使用できません。

DLT は Azure Event Hubs にどのように接続できますか?

Azure Event Hubs には、Apache Kafka と互換性のあるエンドポイントが用意されており、Databricks Runtime で使用できる 構造化ストリーミング Kafka コネクタと共に使用して、Azure Event Hubs からのメッセージを処理できます。 AzureEvent Hubs とApacheKafka の互換性の詳細については、「Azureアプリケーションから Event Hubs を使用するApacheKafka」を参照してください。

次の手順では、DLT パイプラインを既存の Event Hubs インスタンスに接続し、トピックからイベントを使用する方法について説明します。これらの手順を完了するには、次の Event Hubs 接続値が必要です。

  • Event Hubs 名前空間の名前。
  • Event Hubs 名前空間の Event Hub インスタンスの名前。
  • Event Hubs の共有アクセス ポリシー名とポリシー キー。 By Default は、Event Hubs 名前空間ごとに RootManageSharedAccessKey ポリシーが作成されます。 このポリシーには、 managesend 、および listen のアクセス許可があります。 パイプラインが Event Hubs からのみ読み取る場合、Databricks ではリッスンアクセス許可のみを持つ新しいポリシーを作成することをお勧めします。

Event Hubs 接続文字列の詳細については、「 Event Hubs 接続文字列を取得する」を参照してください。

注記
  • Azure Event Hubs には、セキュリティで保護されたリソースへのアクセスを承認するための OAuth 2.0 と Shared Access Signature (SAS) の両方のオプションが用意されています。 これらの手順では、SAS ベースの認証を使用します。
  • Azure portal から Event Hubs 接続文字列を取得する場合、 EntityPath 値が含まれていない可能性があります。 EntityPath値は、構造化ストリーミング Event Hubs コネクタを使用する場合にのみ必要です。構造化ストリーミング Kafka Connector を使用するには、トピック名のみを指定する必要があります。

ポリシーキーを Databricks シークレットに格納する

ポリシー キーは機密情報であるため、Databricks ではパイプライン コードで値をハードコーディングしないことをお勧めします。 代わりに、Databricks シークレットを使用して、キーへのアクセスを格納および管理します。

次の例では、 Databricks CLI を使用してシークレットスコープを作成し、そのシークレットスコープにキーを格納します。 パイプライン コードで、scope-nameshared-policy-name と共に dbutils.secrets.get() 関数を使用して、キー値を取得します。

Bash
databricks --profile <profile-name> secrets create-scope <scope-name>

databricks --profile <profile-name> secrets put-secret <scope-name> <shared-policy-name> --string-value <shared-policy-key>

シークレット Databricks の詳細については、「 シークレットの管理」を参照してください。

ノートブックを作成し、イベントを使用するパイプライン コードを追加する

次の例では、トピックから IoT イベントを読み取りますが、アプリケーションの要件に合わせて例を調整できます。 ベスト プラクティスとして、Databricks では DLT パイプライン設定を使用してアプリケーション変数を構成することをお勧めします。その後、パイプライン コードでは spark.conf.get() 関数を使用して値を取得します。 パイプライン設定を使用してパイプラインをパラメーター化する方法の詳細については、「 DLT パイプラインでパラメーターを使用する」を参照してください。

Python
import dlt
import pyspark.sql.types as T
from pyspark.sql.functions import *

# Event Hubs configuration
EH_NAMESPACE = spark.conf.get("iot.ingestion.eh.namespace")
EH_NAME = spark.conf.get("iot.ingestion.eh.name")

EH_CONN_SHARED_ACCESS_KEY_NAME = spark.conf.get("iot.ingestion.eh.accessKeyName")
SECRET_SCOPE = spark.conf.get("io.ingestion.eh.secretsScopeName")
EH_CONN_SHARED_ACCESS_KEY_VALUE = dbutils.secrets.get(scope = SECRET_SCOPE, key = EH_CONN_SHARED_ACCESS_KEY_NAME)

EH_CONN_STR = f"Endpoint=sb://{EH_NAMESPACE}.servicebus.windows.net/;SharedAccessKeyName={EH_CONN_SHARED_ACCESS_KEY_NAME};SharedAccessKey={EH_CONN_SHARED_ACCESS_KEY_VALUE}"
# Kafka Consumer configuration

KAFKA_OPTIONS = {
"kafka.bootstrap.servers" : f"{EH_NAMESPACE}.servicebus.windows.net:9093",
"subscribe" : EH_NAME,
"kafka.sasl.mechanism" : "PLAIN",
"kafka.security.protocol" : "SASL_SSL",
"kafka.sasl.jaas.config" : f"kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{EH_CONN_STR}\";",
"kafka.request.timeout.ms" : spark.conf.get("iot.ingestion.kafka.requestTimeout"),
"kafka.session.timeout.ms" : spark.conf.get("iot.ingestion.kafka.sessionTimeout"),
"maxOffsetsPerTrigger" : spark.conf.get("iot.ingestion.spark.maxOffsetsPerTrigger"),
"failOnDataLoss" : spark.conf.get("iot.ingestion.spark.failOnDataLoss"),
"startingOffsets" : spark.conf.get("iot.ingestion.spark.startingOffsets")
}

# PAYLOAD SCHEMA
payload_ddl = """battery_level BIGINT, c02_level BIGINT, cca2 STRING, cca3 STRING, cn STRING, device_id BIGINT, device_name STRING, humidity BIGINT, ip STRING, latitude DOUBLE, lcd STRING, longitude DOUBLE, scale STRING, temp BIGINT, timestamp BIGINT"""
payload_schema = T._parse_datatype_string(payload_ddl)

# Basic record parsing and adding ETL audit columns
def parse(df):
return (df
.withColumn("records", col("value").cast("string"))
.withColumn("parsed_records", from_json(col("records"), payload_schema))
.withColumn("iot_event_timestamp", expr("cast(from_unixtime(parsed_records.timestamp / 1000) as timestamp)"))
.withColumn("eh_enqueued_timestamp", expr("timestamp"))
.withColumn("eh_enqueued_date", expr("to_date(timestamp)"))
.withColumn("etl_processed_timestamp", col("current_timestamp"))
.withColumn("etl_rec_uuid", expr("uuid()"))
.drop("records", "value", "key")
)

@dlt.create_table(
comment="Raw IOT Events",
table_properties={
"quality": "bronze",
"pipelines.reset.allowed": "false" # preserves the data in the delta table if you do full refresh
},
partition_cols=["eh_enqueued_date"]
)
@dlt.expect("valid_topic", "topic IS NOT NULL")
@dlt.expect("valid records", "parsed_records IS NOT NULL")
def iot_raw():
return (
spark.readStream
.format("kafka")
.options(**KAFKA_OPTIONS)
.load()
.transform(parse)
)

パイプラインを作成する

次の設定で新しいパイプラインを作成し、プレースホルダーの値を環境に適した値に置き換えます。

JSON
{
"clusters": [
{
"spark_conf": {
"spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
},
"num_workers": 4
}
],
"development": true,
"continuous": false,
"channel": "CURRENT",
"edition": "ADVANCED",
"photon": false,
"libraries": [
{
"notebook": {
"path": "<path-to-notebook>"
}
}
],
"name": "dlt_eventhub_ingestion_using_kafka",
"storage": "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/iot/",
"configuration": {
"iot.ingestion.eh.namespace": "<eh-namespace>",
"iot.ingestion.eh.accessKeyName": "<eh-policy-name>",
"iot.ingestion.eh.name": "<eventhub>",
"io.ingestion.eh.secretsScopeName": "<secret-scope-name>",
"iot.ingestion.spark.maxOffsetsPerTrigger": "50000",
"iot.ingestion.spark.startingOffsets": "latest",
"iot.ingestion.spark.failOnDataLoss": "false",
"iot.ingestion.kafka.requestTimeout": "60000",
"iot.ingestion.kafka.sessionTimeout": "30000"
},
"target": "<target-database-name>"
}

置き換え

  • <container-name> を Azure ストレージ アカウント コンテナーの名前に置き換えます。
  • <storage-account-name> ADLS Gen2 ストレージ アカウントの名前に置き換えます。
  • <eh-namespace> を Event Hubs 名前空間の名前に置き換えます。
  • <eh-policy-name> を Event Hubs ポリシー キーのシークレットスコープ キーに置き換えます。
  • <eventhub> を Event Hubs インスタンスの名前に置き換えます。
  • <secret-scope-name> を、Event Hubs ポリシー キーを含む Databricks シークレットスコープの名前に置き換えます。

ベスト プラクティスとして、このパイプラインでは デフォルト DBFS ストレージ パスを使用せず、代わりに Azure Data Lake Storage Gen2 (ADLS Gen2) ストレージ アカウントを使用します。 ADLS Gen2 ストレージ アカウントの認証の構成の詳細については、「 パイプライン内のシークレットを使用してストレージ資格情報に安全にアクセスする」を参照してください。