Pular para o conteúdo principal

Exemplos de fluxos no pipeline declarativo LakeFlow

Exemplo: Escrever em uma tabela de transmissão a partir de vários tópicos Kafka

Os exemplos a seguir criam uma tabela de transmissão chamada kafka_target e grava nessa tabela de transmissão a partir de dois tópicos de Kafka:

Python
from pyspark import pipelines as dp

dp.create_streaming_table("kafka_target")

# Kafka stream from multiple topics
@dp.append_flow(target = "kafka_target")
def topic1():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic1")
.load()
)

@dp.append_flow(target = "kafka_target")
def topic2():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic2")
.load()
)

Para saber mais sobre a função com valor de tabela read_kafka() usada nas consultas SQL, consulte read_kafka na referência da linguagem SQL.

Em Python, você pode criar programaticamente vários fluxos que têm como alvo uma única tabela. O exemplo a seguir mostra esse padrão para uma lista de tópicos do Kafka.

nota

Este padrão tem os mesmos requisitos que usar um loop for para criar tabelas. Você deve passar explicitamente um valor Python para a função que define o fluxo. Veja Criar tabelas em um loop for.

Python
from pyspark import pipelines as dp

dp.create_streaming_table("kafka_target")

topic_list = ["topic1", "topic2", "topic3"]

for topic_name in topic_list:

@dp.append_flow(target = "kafka_target", name=f"{topic_name}_flow")
def topic_flow(topic=topic_name):
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", topic)
.load()
)

Exemplo: execução de um preenchimento único de dados

Se você quiser executar uma consulta para anexar dados a uma tabela de transmissão existente, use append_flow.

Depois de anexar um conjunto de dados existentes, você tem várias opções:

  • Se você quiser que a consulta anexe novos dados se eles chegarem ao diretório de preenchimento, deixe a consulta no local.
  • Se você quiser que isso seja um preenchimento único e nunca mais seja executado, remova a consulta após executar o pipeline uma vez.
  • Se você quiser que a consulta seja executada uma vez e somente executada novamente nos casos em que os dados estão sendo totalmente atualizados, defina o parâmetro once como True no fluxo de acréscimo. Em SQL, use INSERT INTO ONCE.

Os exemplos a seguir executam uma consulta para acrescentar dados históricos a uma tabela de transmissão:

Python
from pyspark import pipelines as dp

@dp.table()
def csv_target():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/sourceDir")

@dp.append_flow(
target = "csv_target",
once = True)
def backfill():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/backfill/data/dir")

Para um exemplo mais detalhado, consulte Histórico de preenchimento de dados com o pipeline declarativo LakeFlow.

Exemplo: use o processamento de fluxo de acréscimo em vez de UNION

Em vez de usar uma consulta com uma cláusula UNION , você pode usar consultas de fluxo de acréscimo para combinar várias fontes e gravar em uma única tabela de transmissão. Usar consultas de fluxo de acréscimo em vez de UNION permite que você anexe dados a uma tabela de transmissão de várias fontes sem executar uma refreshcompleta.

O exemplo de Python a seguir inclui uma consulta que combina várias fontes de dados com uma cláusula UNION:

Python
@dp.create_table(name="raw_orders")
def unioned_raw_orders():
raw_orders_us = (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")
)

raw_orders_eu = (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")
)

return raw_orders_us.union(raw_orders_eu)

Os exemplos a seguir substituem a consulta UNION por consultas de fluxo de acréscimo:

Python
dp.create_streaming_table("raw_orders")

@dp.append_flow(target="raw_orders")
def raw_orders_us():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")

@dp.append_flow(target="raw_orders")
def raw_orders_eu():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")

# Additional flows can be added without the full refresh that a UNION query would require:
@dp.append_flow(target="raw_orders")
def raw_orders_apac():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/apac")