Use Azure Event Hubs como um pipeline declarativo LakeFlow fonte de dados
Este artigo explica como usar o pipeline declarativo LakeFlow para processar mensagens dos Hubs de Eventos Azure . Você não pode usar o conector Event Hubs de transmissão estruturada porque esta biblioteca não está disponível como parte do Databricks Runtime e o pipeline LakeFlow Declarative não permite que você use bibliotecas JVM de terceiros.
Como o pipeline declarativo LakeFlow pode se conectar ao Azure Event Hubs?
Azure Event Hubs fornece um endpoint compatível com Apache Kafka que você pode usar com o conector Kafka de transmissão estruturada, disponível no Databricks Runtime, para processar mensagens do Azure Event Hubs. Para obter mais informações sobre a compatibilidade Azure Event Hubs e Apache Kafka , consulte Usar Azure Event Hubs de aplicativos Apache Kafka.
Os passos a seguir descrevem como conectar o pipeline declarativo LakeFlow a uma instância existente do Event Hubs e consumir eventos de um tópico. Para concluir esses passos, você precisa dos seguintes valores de conexão do Event Hubs:
- O nome do namespace Event Hubs.
- O nome da instância do Event Hub no namespace Event Hubs.
- Um nome de política de acesso compartilhado e key de política para Hubs de Eventos. Por default, uma política
RootManageSharedAccessKey
é criada para cada namespace do Event Hubs. Esta política tem permissõesmanage
,send
elisten
. Se o seu pipeline só lê de Hubs de Eventos, a Databricks recomenda criar uma nova política com permissão somente de escuta.
Para obter mais informações sobre as strings de conexão do Event Hubs, consulte Obter strings de conexão do Event Hubs.
- Azure Event Hubs fornece opções de OAuth 2.0 e assinatura de acesso compartilhado (SAS) para autorizar o acesso ao seu recurso seguro. Estas instruções usam autenticação baseada em SAS.
- Se você obtiver as strings de conexão dos Hubs de Eventos do portal Azure , elas podem não conter o valor
EntityPath
. O valorEntityPath
é necessário somente ao usar o conector Event Hubs de transmissão estruturada. O uso do conector Kafka de transmissão estruturada requer o fornecimento apenas do nome do tópico.
Armazene a key de política em um segredo Databricks
Como a key de política é uma informação sensível, Databricks recomenda não codificar o valor no código pipeline . Em vez disso, use os segredos Databricks para armazenar e gerenciar o acesso à key.
O exemplo a seguir usa o Databricks CLI para criar um Escopo Secreto e armazenar a key nesse Escopo Secreto. No código do seu pipeline , use a função dbutils.secrets.get()
com scope-name
e shared-policy-name
para recuperar o valor key .
databricks --profile <profile-name> secrets create-scope <scope-name>
databricks --profile <profile-name> secrets put-secret <scope-name> <shared-policy-name> --string-value <shared-policy-key>
Para obter mais informações sobre segredos Databricks , consulte Gerenciamento de segredos.
Crie um pipeline e adicione código para consumir eventos
O exemplo a seguir lê eventos de IoT de um tópico, mas você pode adaptar o exemplo aos requisitos do seu aplicativo. Como prática recomendada, Databricks recomenda usar as configurações do pipeline declarativo LakeFlow para configurar variáveis do aplicativo. O código do pipeline então usa a função spark.conf.get()
para recuperar valores.
from pyspark import pipelines as dp
import pyspark.sql.types as T
from pyspark.sql.functions import *
# Event Hubs configuration
EH_NAMESPACE = spark.conf.get("iot.ingestion.eh.namespace")
EH_NAME = spark.conf.get("iot.ingestion.eh.name")
EH_CONN_SHARED_ACCESS_KEY_NAME = spark.conf.get("iot.ingestion.eh.accessKeyName")
SECRET_SCOPE = spark.conf.get("io.ingestion.eh.secretsScopeName")
EH_CONN_SHARED_ACCESS_KEY_VALUE = dbutils.secrets.get(scope = SECRET_SCOPE, key = EH_CONN_SHARED_ACCESS_KEY_NAME)
EH_CONN_STR = f"Endpoint=sb://{EH_NAMESPACE}.servicebus.windows.net/;SharedAccessKeyName={EH_CONN_SHARED_ACCESS_KEY_NAME};SharedAccessKey={EH_CONN_SHARED_ACCESS_KEY_VALUE}"
# Kafka Consumer configuration
KAFKA_OPTIONS = {
"kafka.bootstrap.servers" : f"{EH_NAMESPACE}.servicebus.windows.net:9093",
"subscribe" : EH_NAME,
"kafka.sasl.mechanism" : "PLAIN",
"kafka.security.protocol" : "SASL_SSL",
"kafka.sasl.jaas.config" : f"kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username=\"$ConnectionString\" password=\"{EH_CONN_STR}\";",
"kafka.request.timeout.ms" : spark.conf.get("iot.ingestion.kafka.requestTimeout"),
"kafka.session.timeout.ms" : spark.conf.get("iot.ingestion.kafka.sessionTimeout"),
"maxOffsetsPerTrigger" : spark.conf.get("iot.ingestion.spark.maxOffsetsPerTrigger"),
"failOnDataLoss" : spark.conf.get("iot.ingestion.spark.failOnDataLoss"),
"startingOffsets" : spark.conf.get("iot.ingestion.spark.startingOffsets")
}
# PAYLOAD SCHEMA
payload_ddl = """battery_level BIGINT, c02_level BIGINT, cca2 STRING, cca3 STRING, cn STRING, device_id BIGINT, device_name STRING, humidity BIGINT, ip STRING, latitude DOUBLE, lcd STRING, longitude DOUBLE, scale STRING, temp BIGINT, timestamp BIGINT"""
payload_schema = T._parse_datatype_string(payload_ddl)
# Basic record parsing and adding ETL audit columns
def parse(df):
return (df
.withColumn("records", col("value").cast("string"))
.withColumn("parsed_records", from_json(col("records"), payload_schema))
.withColumn("iot_event_timestamp", expr("cast(from_unixtime(parsed_records.timestamp / 1000) as timestamp)"))
.withColumn("eh_enqueued_timestamp", expr("timestamp"))
.withColumn("eh_enqueued_date", expr("to_date(timestamp)"))
.withColumn("etl_processed_timestamp", col("current_timestamp"))
.withColumn("etl_rec_uuid", expr("uuid()"))
.drop("records", "value", "key")
)
@dp.create_table(
comment="Raw IOT Events",
table_properties={
"quality": "bronze",
"pipelines.reset.allowed": "false" # preserves the data in the delta table if you do full refresh
},
partition_cols=["eh_enqueued_date"]
)
@dp.expect("valid_topic", "topic IS NOT NULL")
@dp.expect("valid records", "parsed_records IS NOT NULL")
def iot_raw():
return (
spark.readStream
.format("kafka")
.options(**KAFKA_OPTIONS)
.load()
.transform(parse)
)
Criar o pipeline
Crie um novo pipeline com um arquivo de origem Python e insira o código acima.
O código faz referência aos parâmetros configurados. Use a seguinte configuração JSON, substituindo os valores do espaço reservado por valores apropriados para seu ambiente (veja a lista após o JSON). Você pode definir os parâmetros usando a interface do usuário de configurações ou editando o JSON de configurações diretamente. Para obter mais informações sobre como usar as configurações pipeline para parametrizar seu pipeline, consulte Usar parâmetros com o pipeline declarativo LakeFlow.
Este arquivo de configurações também define o local de armazenamento para uma account de armazenamento Azure Data Lake Storage (ADLS). Como prática recomendada, esse pipeline não usa o caminho de armazenamento DBFS default , mas sim uma account de armazenamento ADLS . Para obter mais informações sobre como configurar a autenticação para uma account de armazenamento ADLS , consulte Acessar credenciais de armazenamento com segurança com segredos em um pipeline.
{
"storage": "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/iot/",
"configuration": {
"iot.ingestion.eh.namespace": "<eh-namespace>",
"iot.ingestion.eh.accessKeyName": "<eh-policy-name>",
"iot.ingestion.eh.name": "<eventhub>",
"io.ingestion.eh.secretsScopeName": "<secret-scope-name>",
"iot.ingestion.spark.maxOffsetsPerTrigger": "50000",
"iot.ingestion.spark.startingOffsets": "latest",
"iot.ingestion.spark.failOnDataLoss": "false",
"iot.ingestion.kafka.requestTimeout": "60000",
"iot.ingestion.kafka.sessionTimeout": "30000"
}
}
Substitua os seguintes espaços reservados:
<container-name>
com o nome de um contêiner account armazenamento Azure .<storage-account-name>
com o nome de uma account de armazenamento ADLS .<eh-namespace>
com o nome do namespace do seu Event Hubs.<eh-policy-name>
com a key Secret Scope para a key de política do Event Hubs.<eventhub>
com o nome da sua instância do Event Hubs.<secret-scope-name>
com o nome do Databricks Secret Scope que contém a key de política do Event Hubs.