Pular para o conteúdo principal

Exemplos de fluxos em DLT

Exemplo: Gravar em uma tabela de transmissão a partir de vários tópicos do site Kafka

Os exemplos a seguir criam uma tabela de transmissão chamada kafka_target e grava nessa tabela de transmissão a partir de dois tópicos de Kafka:

Python
import dlt

dlt.create_streaming_table("kafka_target")

# Kafka stream from multiple topics
@dlt.append_flow(target = "kafka_target")
def topic1():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic1")
.load()
)

@dlt.append_flow(target = "kafka_target")
def topic2():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic2")
.load()
)

Para saber mais sobre a função de valor de tabela read_kafka() usada nas consultas SQL, consulte read_kafka na referência da linguagem SQL.

Em Python, o senhor pode criar, de forma programática, vários fluxos direcionados a uma única tabela. O exemplo a seguir mostra esse padrão para uma lista de tópicos do Kafka.

nota

Esse padrão tem os mesmos requisitos de usar um loop for para criar tabelas. O senhor deve passar explicitamente um valor Python para a função que define o fluxo. Consulte Criar tabelas em um loop for.

Python
import dlt

dlt.create_streaming_table("kafka_target")

topic_list = ["topic1", "topic2", "topic3"]

for topic_name in topic_list:

@dlt.append_flow(target = "kafka_target", name=f"{topic_name}_flow")
def topic_flow(topic=topic_name):
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", topic)
.load()
)

Exemplo: execução de um backfill de dados único

Os exemplos a seguir executam uma consulta para acrescentar dados históricos a uma tabela de transmissão:

nota

Para garantir um verdadeiro preenchimento único quando a consulta de backfill fizer parte de um pipeline executado de forma agendada ou contínua, remova a consulta depois de executar o pipeline uma vez. Para acrescentar novos dados se eles chegarem no diretório de backfill, deixe a consulta no lugar.

Python
import dlt

@dlt.table()
def csv_target():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/sourceDir")

@dlt.append_flow(target = "csv_target")
def backfill():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/backfill/data/dir")

Exemplo: use o processamento de fluxo de acréscimo em vez de UNION

Em vez de usar uma consulta com uma cláusula UNION, o senhor pode usar consultas de fluxo de acréscimo para combinar várias fontes e gravar em uma única tabela de transmissão. O uso de consultas de fluxo de acréscimo em vez de UNION permite que o senhor faça acréscimos em uma tabela de transmissão de várias fontes sem executar um refreshcompleto.

O exemplo de Python a seguir inclui uma consulta que combina várias fontes de dados com uma cláusula UNION:

Python
@dlt.create_table(name="raw_orders")
def unioned_raw_orders():
raw_orders_us =
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")

raw_orders_eu =
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")

return raw_orders_us.union(raw_orders_eu)

Os exemplos a seguir substituem a consulta UNION por consultas de fluxo de acréscimo:

Python
dlt.create_streaming_table("raw_orders")

@dlt.append_flow(target="raw_orders")
def raw_oders_us():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")

@dlt.append_flow(target="raw_orders")
def raw_orders_eu():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")

# Additional flows can be added without the full refresh that a UNION query would require:
@dlt.append_flow(target="raw_orders")
def raw_orders_apac():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/apac")