Pular para o conteúdo principal

Use o Azure Event Hubs como uma fonte de dados DLT

Este artigo explica como usar a DLT para processar mensagens do Azure Event Hubs. O senhor não pode usar o conector Transmission Estructurada Event Hubs porque essa biblioteca não está disponível como parte do Databricks Runtime, e a DLT não permite que o senhor use a biblioteca JVM de terceiros.

Como a DLT pode se conectar aos Hubs de Eventos do Azure?

Azure Os Event Hubs fornecem um endpoint compatível com Apache Kafka que o senhor pode usar com o conector de transmissão estruturada Kafka, disponível em Databricks Runtime, para processar mensagens dos Event Hubs Azure. Para obter mais informações sobre Azure Event Hubs e Apache Kafka compatibilidade, consulte Use Azure Event Hubs from Apache Kafka applications.

As etapas a seguir descrevem a conexão de um pipeline DLT a uma instância existente do Event Hubs e o consumo de eventos de um tópico. Para concluir essas etapas, você precisa dos seguintes valores de conexão dos Hubs de Eventos:

  • O nome do namespace dos Hubs de Eventos.
  • O nome da instância do Event Hub no namespace Event Hubs.
  • Um nome de política de acesso compartilhado e uma política key para Event Hubs. Em default, uma política RootManageSharedAccessKey é criada para cada namespace do Event Hubs. Essa política tem permissões manage, send e listen. Se o seu pipeline só lê de Event Hubs, a Databricks recomenda a criação de uma nova política com permissão de escuta apenas.

Para obter mais informações sobre as cadeias de conexão do Event Hubs, consulte Obter uma cadeia de conexão do Event Hubs.

nota
  • Azure O Event Hubs oferece as opções OAuth 2.0 e de assinatura de acesso compartilhado (SAS) para autorizar o acesso ao seu recurso seguro. Essas instruções usam autenticação baseada em SAS.
  • Se o senhor obtiver as cadeias de conexão dos Event Hubs no portal Azure, elas poderão não conter o valor EntityPath. O valor EntityPath é necessário somente ao usar o conector do Event Hubs de transmissão estruturada. Para usar o conector de transmissão estruturada Kafka, é necessário fornecer apenas o nome do tópico.

Armazene a política key em um segredo Databricks

Como a política key é uma informação sensível, a Databricks recomenda não codificar o valor no seu código pipeline. Em vez disso, use os segredos do Databricks para armazenar e gerenciar o acesso ao key.

O exemplo a seguir usa o Databricks CLI para criar um escopo secreto e armazenar o key nesse escopo secreto. Em seu código pipeline, use a função dbutils.secrets.get() com scope-name e shared-policy-name para recuperar o valor key.

Bash
databricks --profile <profile-name> secrets create-scope <scope-name>

databricks --profile <profile-name> secrets put-secret <scope-name> <shared-policy-name> --string-value <shared-policy-key>

Para obter mais informações sobre Databricks secrets, consulte Secret management.

Crie um Notebook e adicione o código pipeline para consumir eventos

O exemplo a seguir lê eventos de IoT de um tópico, mas o senhor pode adaptar o exemplo aos requisitos do seu aplicativo. Como prática recomendada, a Databricks recomenda o uso das configurações do pipeline DLT para configurar as variáveis do aplicativo. Seu código de pipeline usa a função spark.conf.get() para recuperar valores. Para obter mais informações sobre como usar as configurações do pipeline para parametrizar o pipeline, consulte Usar parâmetros com o pipeline DLT.

Python
import dlt
import pyspark.sql.types as T
from pyspark.sql.functions import *

# Event Hubs configuration
EH_NAMESPACE = spark.conf.get("iot.ingestion.eh.namespace")
EH_NAME = spark.conf.get("iot.ingestion.eh.name")

EH_CONN_SHARED_ACCESS_KEY_NAME = spark.conf.get("iot.ingestion.eh.accessKeyName")
SECRET_SCOPE = spark.conf.get("io.ingestion.eh.secretsScopeName")
EH_CONN_SHARED_ACCESS_KEY_VALUE = dbutils.secrets.get(scope = SECRET_SCOPE, key = EH_CONN_SHARED_ACCESS_KEY_NAME)

EH_CONN_STR = f"Endpoint=sb://{EH_NAMESPACE}.servicebus.windows.net/;SharedAccessKeyName={EH_CONN_SHARED_ACCESS_KEY_NAME};SharedAccessKey={EH_CONN_SHARED_ACCESS_KEY_VALUE}"
# Kafka Consumer configuration

KAFKA_OPTIONS = {
"kafka.bootstrap.servers" : f"{EH_NAMESPACE}.servicebus.windows.net:9093",
"subscribe" : EH_NAME,
"kafka.sasl.mechanism" : "PLAIN",
"kafka.security.protocol" : "SASL_SSL",
"kafka.sasl.jaas.config" : f"kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{EH_CONN_STR}\";",
"kafka.request.timeout.ms" : spark.conf.get("iot.ingestion.kafka.requestTimeout"),
"kafka.session.timeout.ms" : spark.conf.get("iot.ingestion.kafka.sessionTimeout"),
"maxOffsetsPerTrigger" : spark.conf.get("iot.ingestion.spark.maxOffsetsPerTrigger"),
"failOnDataLoss" : spark.conf.get("iot.ingestion.spark.failOnDataLoss"),
"startingOffsets" : spark.conf.get("iot.ingestion.spark.startingOffsets")
}

# PAYLOAD SCHEMA
payload_ddl = """battery_level BIGINT, c02_level BIGINT, cca2 STRING, cca3 STRING, cn STRING, device_id BIGINT, device_name STRING, humidity BIGINT, ip STRING, latitude DOUBLE, lcd STRING, longitude DOUBLE, scale STRING, temp BIGINT, timestamp BIGINT"""
payload_schema = T._parse_datatype_string(payload_ddl)

# Basic record parsing and adding ETL audit columns
def parse(df):
return (df
.withColumn("records", col("value").cast("string"))
.withColumn("parsed_records", from_json(col("records"), payload_schema))
.withColumn("iot_event_timestamp", expr("cast(from_unixtime(parsed_records.timestamp / 1000) as timestamp)"))
.withColumn("eh_enqueued_timestamp", expr("timestamp"))
.withColumn("eh_enqueued_date", expr("to_date(timestamp)"))
.withColumn("etl_processed_timestamp", col("current_timestamp"))
.withColumn("etl_rec_uuid", expr("uuid()"))
.drop("records", "value", "key")
)

@dlt.create_table(
comment="Raw IOT Events",
table_properties={
"quality": "bronze",
"pipelines.reset.allowed": "false" # preserves the data in the delta table if you do full refresh
},
partition_cols=["eh_enqueued_date"]
)
@dlt.expect("valid_topic", "topic IS NOT NULL")
@dlt.expect("valid records", "parsed_records IS NOT NULL")
def iot_raw():
return (
spark.readStream
.format("kafka")
.options(**KAFKA_OPTIONS)
.load()
.transform(parse)
)

Criar o pipeline

Crie um novo pipeline com as seguintes configurações, substituindo os valores de espaço reservado pelos valores apropriados para seu ambiente.

JSON
{
"clusters": [
{
"spark_conf": {
"spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
},
"num_workers": 4
}
],
"development": true,
"continuous": false,
"channel": "CURRENT",
"edition": "ADVANCED",
"photon": false,
"libraries": [
{
"notebook": {
"path": "<path-to-notebook>"
}
}
],
"name": "dlt_eventhub_ingestion_using_kafka",
"storage": "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/iot/",
"configuration": {
"iot.ingestion.eh.namespace": "<eh-namespace>",
"iot.ingestion.eh.accessKeyName": "<eh-policy-name>",
"iot.ingestion.eh.name": "<eventhub>",
"io.ingestion.eh.secretsScopeName": "<secret-scope-name>",
"iot.ingestion.spark.maxOffsetsPerTrigger": "50000",
"iot.ingestion.spark.startingOffsets": "latest",
"iot.ingestion.spark.failOnDataLoss": "false",
"iot.ingestion.kafka.requestTimeout": "60000",
"iot.ingestion.kafka.sessionTimeout": "30000"
},
"target": "<target-database-name>"
}

Substituir

  • <container-name> com o nome de um contêiner de armazenamento Azure account .
  • <storage-account-name> com o nome de um armazenamento ADLS Gen2 account.
  • <eh-namespace> com o nome do seu namespace Event Hubs.
  • <eh-policy-name> com o escopo secreto key para a política de hubs de eventos key.
  • <eventhub> com o nome da sua instância do Event Hubs.
  • <secret-scope-name> com o nome do Databricks Secret Scope que contém a política de Event Hubs key.

Como prática recomendada, este pipeline não usa o caminho de armazenamento default DBFS , mas sim um Azure Data Lake Storage Gen2 (ADLS Gen2) account. Para obter mais informações sobre como configurar a autenticação para um armazenamento ADLS Gen2 account, consulte Acessar com segurança as credenciais de armazenamento com segredos em um pipeline.