Pular para o conteúdo principal

Exemplos de fluxos em LakeFlow Declarative pipeline

Exemplo: Gravar em uma tabela de transmissão a partir de vários tópicos do site Kafka

Os exemplos a seguir criam uma tabela de transmissão chamada kafka_target e grava nessa tabela de transmissão a partir de dois tópicos de Kafka:

Python
import dlt

dlt.create_streaming_table("kafka_target")

# Kafka stream from multiple topics
@dlt.append_flow(target = "kafka_target")
def topic1():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic1")
.load()
)

@dlt.append_flow(target = "kafka_target")
def topic2():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic2")
.load()
)

Para saber mais sobre a função de valor de tabela read_kafka() usada nas consultas SQL, consulte read_kafka na referência da linguagem SQL.

Em Python, o senhor pode criar, de forma programática, vários fluxos direcionados a uma única tabela. O exemplo a seguir mostra esse padrão para uma lista de tópicos do Kafka.

nota

Esse padrão tem os mesmos requisitos de usar um loop for para criar tabelas. O senhor deve passar explicitamente um valor Python para a função que define o fluxo. Consulte Criar tabelas em um loop for.

Python
import dlt

dlt.create_streaming_table("kafka_target")

topic_list = ["topic1", "topic2", "topic3"]

for topic_name in topic_list:

@dlt.append_flow(target = "kafka_target", name=f"{topic_name}_flow")
def topic_flow(topic=topic_name):
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", topic)
.load()
)

Exemplo: execução de um backfill de dados único

Se o senhor quiser executar uma consulta para anexar dados a uma tabela de transmissão existente, use append_flow.

Depois de acrescentar um conjunto de dados existentes, você tem várias opções:

  • Se você quiser que a consulta acrescente novos dados se eles chegarem ao diretório de preenchimento, deixe a consulta no lugar certo.
  • Se o senhor quiser que o backfill seja feito uma única vez e nunca mais seja executado, remova a consulta depois de executar o pipeline uma vez.
  • Se quiser que a consulta seja executada uma vez e só seja executada novamente nos casos em que os dados estiverem sendo totalmente atualizados, defina o parâmetro once como True no fluxo de acréscimo. No SQL, use INSERT INTO ONCE.

Os exemplos a seguir executam uma consulta para acrescentar dados históricos a uma tabela de transmissão:

Python
import dlt

@dlt.table()
def csv_target():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/sourceDir")

@dlt.append_flow(
target = "csv_target",
once = True)
def backfill():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/backfill/data/dir")

Para obter um exemplo mais detalhado, consulte Backfilling data histórica com LakeFlow Declarative pipeline.

Exemplo: use o processamento de fluxo de acréscimo em vez de UNION

Em vez de usar uma consulta com uma cláusula UNION, o senhor pode usar consultas de fluxo de acréscimo para combinar várias fontes e gravar em uma única tabela de transmissão. O uso de consultas de fluxo de acréscimo em vez de UNION permite que o senhor faça acréscimos em uma tabela de transmissão de várias fontes sem executar um refreshcompleto.

O exemplo de Python a seguir inclui uma consulta que combina várias fontes de dados com uma cláusula UNION:

Python
@dlt.create_table(name="raw_orders")
def unioned_raw_orders():
raw_orders_us = (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")
)

raw_orders_eu = (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")
)

return raw_orders_us.union(raw_orders_eu)

Os exemplos a seguir substituem a consulta UNION por consultas de fluxo de acréscimo:

Python
dlt.create_streaming_table("raw_orders")

@dlt.append_flow(target="raw_orders")
def raw_orders_us():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")

@dlt.append_flow(target="raw_orders")
def raw_orders_eu():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")

# Additional flows can be added without the full refresh that a UNION query would require:
@dlt.append_flow(target="raw_orders")
def raw_orders_apac():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/apac")