Pular para o conteúdo principal

Configurar uma transmissão

info

Visualização

Esse recurso está em Prévia Pública. Os administradores do espaço de trabalho podem controlar o acesso a esse recurso na página Pré-visualizações . Consulte Gerenciar prévias do Databricks.

Uma Transmissão representa uma fonte de dados de transmissão externa, como Apache Kafka. As transmissões armazenam detalhes de conexão, autenticação, esquemas e configuração de ingestão. Depois que uma transmissão é criada, ela pode ser referenciada usando definições de Feature View para criar recursos de transmissão em tempo real.

As transmissões têm nomes de três partes (catalog.schema.stream_name). O acesso a uma transmissão é regido por sua tabela de ingestão associada. Consulte Ingestão e preenchimento retroativo para obter detalhes.

Requisitos

  • Para executar comandos do Notebook: serverless ou um cluster de compute clássico executando Databricks Runtime 17.0 ML ou acima.
  • A versão 0.16.0 ou acima do pacote Python feature-engineering-client deve ser instalada.

Criar uma Transmissão

Use create_stream() para criar uma nova transmissão. Uma transmissão requer quatro componentes de configuração:

  • Configuração de origem : especifica a plataforma de transmissão (por exemplo, Kafka) e detalhes específicos da origem (como inscrição no tópico para Kafka).
  • **Configuração da conexão**: Especifica como conectar e autenticar-se na plataforma de transmissão, incluindo servidores de bootstrap e credenciais.
  • Configuração de esquema: Define a estrutura das key e dos valores da mensagem.
  • Configuração de ingestão: Especifica onde e como os dados de transmissão são ingeridos. Consulte Ingestão e preenchimento retroativo para detalhes.
Python
from databricks.feature_engineering import FeatureEngineeringClient
from databricks.feature_engineering.entities import (
KafkaStreamConfig,
KafkaSubscriptionMode,
StreamConnectionConfig,
DirectSchemas,
SchemaConfig,
IngestionConfig,
IngestionDestination,
StreamBackfillSource,
)

client = FeatureEngineeringClient()

stream = client.create_stream(
name="my_catalog.my_schema.my_stream",
source_config=KafkaStreamConfig(
subscription_mode=KafkaSubscriptionMode(subscribe="events-topic"),
),
connection_config=StreamConnectionConfig(
uc_connection_name="my-kafka-connection"
),
schema_config=DirectSchemas(
payload_schema=SchemaConfig(
json_schema=(
'{'
' "type": "object",'
' "properties": {'
' "transaction_id": {"type": "string"},'
' "user_id": {"type": "string"},'
' "amount": {"type": "number"},'
' "event_time": {"type": "string", "format": "date-time"}'
' }'
'}'
)
),
),
ingestion_config=IngestionConfig(
ingestion_destination=IngestionDestination(
delta_table_name="my_catalog.my_schema.events_ingestion"
),
),
)

Conectando-se a fontes de transmissão

Antes de definir recursos de transmissão, conecte e teste uma conexão de pipeline de Lakeflow Spark Declarative Pipelines de transmissão ao seu broker Kafka. Consulte Transmissão em compute serverless e Conectar ao Apache Kafka.

Para transmissão gerenciada pela AWS (Amazon MSK), consulte Conectividade privada serverless ao Amazon MSK. Para obter detalhes sobre as opções de autenticação do Kafka, consulte Autenticação.

Autenticação

Conexão do Unity Catalog (recomendada)

Use uma conexão do Unity Catalog para autenticar em seu cluster Kafka. Esta é a abordagem recomendada para autenticação gerenciada. Para criar uma conexão, consulte Criar uma conexão.

Python
connection_config = StreamConnectionConfig(
uc_connection_name="my-kafka-connection"
)

mTLS Direto

Para autenticação mTLS direta, forneça arquivos de keystore e truststore armazenados em um volume do Unity Catalog, com senhas referenciadas por meio dos Secret Scopes do Databricks. Para obter mais informação sobre a autenticação SSL com Kafka, consulte Usar SSL para conectar o Databricks ao Kafka.

Python
from databricks.feature_engineering.entities import (
DirectMtlsConfig,
MtlsConfig,
SecretScopeReference,
)

connection_config = DirectMtlsConfig(
bootstrap_servers="broker1:9092,broker2:9092",
mtls_config=MtlsConfig(
keystore_location="/Volumes/my_catalog/my_schema/my_volume/keystore.jks",
keystore_password_ref=SecretScopeReference(
scope="my_scope", key="keystore_password"
),
key_password_ref=SecretScopeReference(
scope="my_scope", key="key_password"
),
truststore_location="/Volumes/my_catalog/my_schema/my_volume/truststore.jks",
truststore_password_ref=SecretScopeReference(
scope="my_scope", key="truststore_password"
),
),
)

SASL

A autenticação SASL (tanto SASL/SCRAM quanto SASL/PLAIN) não é compatível durante a pré-visualização.

Modos de inscrição

O modo de inscrição especifica como a Transmissão seleciona tópicos do Kafka para consumir. Três modos são suportados:

Mode

Descrição

Exemplo

subscribe

Lista de nomes de tópicos separados por vírgulas.

KafkaSubscriptionMode(subscribe="topic1,topic2")

subscribe_pattern

Padrão Java regex correspondente aos nomes de tópico

KafkaSubscriptionMode(subscribe_pattern="events-.*")

assign

JSON especificando as atribuições de tópicos-partições

KafkaSubscriptionMode(assign='{"my-topic": [0, 1, 2]}')

Configuração do esquema

Defina a estrutura de keys e values de mensagens usando o formato JSON Schema. Para fontes Kafka, payload_schema corresponde ao value da mensagem Kafka (o value no modelo key-value do Kafka) e key_schema corresponde à key da mensagem Kafka. Pelo menos um de payload_schema ou key_schema deve ser fornecido.

Python
schema_config = DirectSchemas(
payload_schema=SchemaConfig(
json_schema=(
'{'
' "type": "object",'
' "properties": {'
' "user_id": {"type": "string"},'
' "amount": {"type": "number"},'
' "event_time": {"type": "string"}'
' }'
'}'
)
),
key_schema=SchemaConfig(
json_schema='{"type": "string"}'
),
)

Se nenhum esquema for fornecido para uma key ou payload, ele será tratado como uma strings simples.

Ingestão e preenchimento retroativo

O parâmetro ingestion_config configura como os dados de transmissão são capturados e armazenados para treinamento e disponibilização.

O acesso a uma transmissão é governado pela tabela de ingestão:

  • SELECT na tabela de ingestão concede acesso de leitura à Transmissão.
  • MANAGE na tabela de ingestão concede acesso de exclusão.

Para mais informações sobre privilégios de tabela, consulte Tabela e referência de privilégios do Unity Catalog.

Pipeline de ingestão

Quando uma transmissão é criada, o Databricks começa um pipeline de ingestão gerenciado que lê continuamente mensagens do tópico do Kafka e as grava em uma tabela Delta (a tabela de ingestão). O pipeline começa a partir do último deslocamento do Kafka e é executado continuamente, capturando apenas novas mensagens que chegam após a criação da transmissão. Esta tabela de ingestão é usada para treinamento com recursos de transmissão. Quando uma transmissão é excluída, seu pipeline de ingestão e sua tabela de ingestão também são excluídos.

Destino de ingestão

O ingestion_destination especifica o nome da tabela Delta de três partes onde os dados de transmissão são gravados.

Python
ingestion_config = IngestionConfig(
ingestion_destination=IngestionDestination(
delta_table_name="my_catalog.my_schema.events_ingestion"
),
)

Esquema de tabela de ingestão

A tabela de ingestão contém os dados da mensagem junto com as colunas de metadados:

Coluna

Tipo

Descrição

key

Varia (de key_schema)

A key de mensagem do Kafka, estruturada de acordo com o esquema fornecido.

value

Varia (de payload_schema)

O valor da mensagem do Kafka (payload), estruturado de acordo com o esquema fornecido.

stream_record_timestamp

TIMESTAMP

O registro do carimbo de data/hora. Para dados de preenchimento progressivo, este é o carimbo de data/hora de ingestão do broker Kafka. Para dados de preenchimento retroativo, isso é fornecido pelo cliente.

kafka_topic

STRING

O tópico do Kafka do qual o registro foi consumido.

kafka_partition

INT

A partição do Kafka da qual o registro foi consumido.

kafka_offset

LONG

O offset do Kafka do registro dentro de sua partição.

record_source

STRING

Ou "stream" (preenchimento progressivo a partir da transmissão Kafka ao vivo) ou "backfill" (a partir da fonte de preenchimento retroativo).

Fonte de preenchimento retroativo

Como o pipeline de preenchimento antecipado começa a partir do deslocamento mais recente do Kafka, ele não captura mensagens que existiam antes da transmissão ser criada. Para fornecer cobertura de data histórica para treinamento, configure uma fonte de preenchimento retroativo opcional.

Quando uma fonte de preenchimento retroativo é configurada, o Databricks executa um Job único MERGE INTO que copia as linhas de preenchimento retroativo para a tabela de ingestão com record_source="backfill". A execução do MERGE só ocorre depois que o verificador de sobreposição confirma que a origem do preenchimento e a transmissão de preenchimento progressivo têm carimbos de data/hora sobrepostos (consulte Sobreposição entre o preenchimento e os dados da transmissão em tempo real). Se a condição de sobreposição não for atendida dentro de 2 dias, o merge realiza a execução de qualquer forma para evitar o bloqueio indefinido.

A tabela de preenchimento retroativo deve incluir uma coluna stream_record_timestamp do tipo TIMESTAMP no fuso horário UTC. Outras colunas de metadados do Kafka (kafka_topic, kafka_partition, kafka_offset) são passadas se presentes na fonte de preenchimento retroativo, ou definidas como NULL caso contrário.

Python
from databricks.feature_engineering.entities import StreamBackfillSource

ingestion_config = IngestionConfig(
ingestion_destination=IngestionDestination(
delta_table_name="my_catalog.my_schema.events_ingestion"
),
backfill_source=StreamBackfillSource(
delta_table_name="my_catalog.my_schema.historical_events"
),
)

Sobreposição entre dados de preenchimento retroativo e transmissão ao vivo

Antes de executar um MERGE entre o preenchimento de dados e a tabela de ingestão, uma verificação de sobreposição compara os carimbos de data/hora nas duas tabelas:

  • **Máximo de preenchimento retroativo**: O máximo stream_record_timestamp na fonte de preenchimento retroativo.
  • Ingestão mín. : O stream_record_timestamp mínimo de linhas (record_source="stream") na tabela de ingestão.

O MERGE prossegue quando o carimbo de data/hora mais recente do preenchimento excede o carimbo de data/hora mais antigo da tabela de ingestão em pelo menos 1 hora. Essa sobreposição garante que não haja lacunas na tabela de ingestão. Se a condição de sobreposição não for atendida em 2 dias, a execução do merge ocorre de qualquer forma para evitar o bloqueio por tempo indeterminado.

Como o pipeline de ingestão começa a partir do offset Kafka mais recente, ele captura apenas as mensagens que chegam após a criação da transmissão. Sua fonte de preenchimento retroativo deve conter dados que se estendam para o intervalo de tempo de ingestão — não apenas até a hora de criação da transmissão.

Por exemplo, se você criar uma transmissão às 15:00, o pipeline de preenchimento progressivo começa a ler mensagens das 15:00 em diante. Sua fonte de preenchimento retroativo deve incluir dados com carimbos de data/hora até, pelo menos, às 16:00 (1 hora após o começar do preenchimento progressivo) para satisfazer a verificação de sobreposição. Isso significa que você deve atualizar sua tabela de preenchimento retroativo depois das 16:00 para garantir que a tabela de ingestão não tenha lacunas.

Eliminação de duplicação

Use deduplication_columns para especificar caminhos de coluna para identificar linhas duplicadas durante a ingestão entre o preenchimento retroativo e o preenchimento progressivo de dados de transmissão. Use a notação de ponto para campos aninhados (por exemplo, "value.user_id").

Escolha as colunas de desduplicação com base em seus dados:

  • Se cada registro em sua transmissão contiver um identificador exclusivo (por exemplo, value.transaction_id), use essa coluna para deduplicação.
  • Se sua fonte de preenchimento retroativo incluir as colunas kafka_partition e kafka_offset, use-as para identificar exclusivamente cada registro.
  • Se nenhuma coluna de deduplicação for especificada, a key de deduplicação default será a combinação completa de key, value e stream_record_timestamp. Isso não é recomendado, pois a correspondência rigorosa de critérios pode facilmente levar a duplicatas.
Python
ingestion_config = IngestionConfig(
ingestion_destination=IngestionDestination(
delta_table_name="my_catalog.my_schema.events_ingestion"
),
deduplication_columns=["value.transaction_id"],
)

Gerenciar transmissões

Obter uma transmissão

Python
stream = client.get_stream(name="my_catalog.my_schema.my_stream")

Listar transmissões

Python
streams = client.list_streams(
catalog_name="my_catalog",
schema_name="my_schema",
max_results=50,
include_schemas=False,
)

Defina include_schemas=True para incluir detalhes completos do esquema. Esquemas podem ser grandes e isso pode resultar em uma operação de longa duração. Para recuperar esquemas individualmente, use get_stream.

Excluir uma transmissão

A exclusão de uma transmissão também exclui seu pipeline de ingestão e tabela de ingestão.

atenção

Quaisquer modelos ou recursos que referenciam a transmissão excluída não terão mais acesso aos dados da transmissão subjacente. Crie uma cópia da tabela de ingestão antes da exclusão se precisar desses dados, mas não precisar mais da transmissão.

Python
client.delete_stream(name="my_catalog.my_schema.my_stream")

Notebook de exemplo

Para um exemplo completo que cria uma Transmissão, define recursos de transmissão e é implantado em um endpoint de serving, consulte o Notebook a seguir:

Notebook de início rápido de view de recurso de transmissão

Abrir notebook em uma nova aba