Exemplos de fluxos no pipeline declarativo LakeFlow
Exemplo: Escrever em uma tabela de transmissão a partir de vários tópicos Kafka
Os exemplos a seguir criam uma tabela de transmissão chamada kafka_target
e grava nessa tabela de transmissão a partir de dois tópicos de Kafka:
- Python
- SQL
from pyspark import pipelines as dp
dp.create_streaming_table("kafka_target")
# Kafka stream from multiple topics
@dp.append_flow(target = "kafka_target")
def topic1():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic1")
.load()
)
@dp.append_flow(target = "kafka_target")
def topic2():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", "topic2")
.load()
)
CREATE OR REFRESH STREAMING TABLE kafka_target;
CREATE FLOW
topic1
AS INSERT INTO
kafka_target BY NAME
SELECT * FROM
read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic1');
CREATE FLOW
topic2
AS INSERT INTO
kafka_target BY NAME
SELECT * FROM
read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic2');
Para saber mais sobre a função com valor de tabela read_kafka()
usada nas consultas SQL, consulte read_kafka na referência da linguagem SQL.
Em Python, você pode criar programaticamente vários fluxos que têm como alvo uma única tabela. O exemplo a seguir mostra esse padrão para uma lista de tópicos do Kafka.
Este padrão tem os mesmos requisitos que usar um loop for
para criar tabelas. Você deve passar explicitamente um valor Python para a função que define o fluxo. Veja Criar tabelas em um loop for
.
from pyspark import pipelines as dp
dp.create_streaming_table("kafka_target")
topic_list = ["topic1", "topic2", "topic3"]
for topic_name in topic_list:
@dp.append_flow(target = "kafka_target", name=f"{topic_name}_flow")
def topic_flow(topic=topic_name):
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,...")
.option("subscribe", topic)
.load()
)
Exemplo: execução de um preenchimento único de dados
Se você quiser executar uma consulta para anexar dados a uma tabela de transmissão existente, use append_flow
.
Depois de anexar um conjunto de dados existentes, você tem várias opções:
- Se você quiser que a consulta anexe novos dados se eles chegarem ao diretório de preenchimento, deixe a consulta no local.
- Se você quiser que isso seja um preenchimento único e nunca mais seja executado, remova a consulta após executar o pipeline uma vez.
- Se você quiser que a consulta seja executada uma vez e somente executada novamente nos casos em que os dados estão sendo totalmente atualizados, defina o parâmetro
once
comoTrue
no fluxo de acréscimo. Em SQL, useINSERT INTO ONCE
.
Os exemplos a seguir executam uma consulta para acrescentar dados históricos a uma tabela de transmissão:
- Python
- SQL
from pyspark import pipelines as dp
@dp.table()
def csv_target():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/sourceDir")
@dp.append_flow(
target = "csv_target",
once = True)
def backfill():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format","csv")
.load("path/to/backfill/data/dir")
CREATE OR REFRESH STREAMING TABLE csv_target
AS SELECT * FROM
read_files(
"path/to/sourceDir",
"csv"
);
CREATE FLOW
backfill
AS INSERT INTO ONCE
csv_target BY NAME
SELECT * FROM
read_files(
"path/to/backfill/data/dir",
"csv"
);
Para um exemplo mais detalhado, consulte Histórico de preenchimento de dados com o pipeline declarativo LakeFlow.
Exemplo: use o processamento de fluxo de acréscimo em vez de UNION
Em vez de usar uma consulta com uma cláusula UNION
, você pode usar consultas de fluxo de acréscimo para combinar várias fontes e gravar em uma única tabela de transmissão. Usar consultas de fluxo de acréscimo em vez de UNION
permite que você anexe dados a uma tabela de transmissão de várias fontes sem executar uma refreshcompleta.
O exemplo de Python a seguir inclui uma consulta que combina várias fontes de dados com uma cláusula UNION
:
@dp.create_table(name="raw_orders")
def unioned_raw_orders():
raw_orders_us = (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")
)
raw_orders_eu = (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")
)
return raw_orders_us.union(raw_orders_eu)
Os exemplos a seguir substituem a consulta UNION
por consultas de fluxo de acréscimo:
- Python
- SQL
dp.create_streaming_table("raw_orders")
@dp.append_flow(target="raw_orders")
def raw_orders_us():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/us")
@dp.append_flow(target="raw_orders")
def raw_orders_eu():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/eu")
# Additional flows can be added without the full refresh that a UNION query would require:
@dp.append_flow(target="raw_orders")
def raw_orders_apac():
return spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/path/to/orders/apac")
CREATE OR REFRESH STREAMING TABLE raw_orders;
CREATE FLOW
raw_orders_us
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
STREAM read_files(
"/path/to/orders/us",
format => "csv"
);
CREATE FLOW
raw_orders_eu
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
STREAM read_files(
"/path/to/orders/eu",
format => "csv"
);
-- Additional flows can be added without the full refresh that a UNION query would require:
CREATE FLOW
raw_orders_apac
AS INSERT INTO
raw_orders BY NAME
SELECT * FROM
STREAM read_files(
"/path/to/orders/apac",
format => "csv"
);