Carregue e processe dados gradualmente com os fluxos do Delta Live Tables

Este artigo explica o que são fluxos e como utilizar fluxos em pipelines do Delta Live Tables para processar dados gradualmente de uma tabela de transmissão de origem para uma tabela de transmissão de destino. No Delta Live Tables, os fluxos são definidos de duas maneiras:

  1. Um fluxo é definido automaticamente quando você cria uma consulta que atualiza uma tabela de transmissão.

  2. O Delta Live Tables também oferece funcionalidade para definir explicitamente fluxos para processamento mais complexo, como anexar a uma tabela de transmissão a partir de várias fontes de transmissão.

Este artigo discute os fluxos implícitos criados quando você define uma consulta para atualizar uma tabela de transmissão e, em seguida, apresenta detalhes sobre a sintaxe para definir fluxos mais complexos.

O que é um fluxo?

No Delta Live Tables, um fluxo é uma consulta de transmissão que processa dados de origem gradualmente para atualizar uma tabela de transmissão de destino. A maioria dos datasets do Delta Live Tables que você cria em um pipeline define o fluxo como parte da consulta e não exige a definição explícita do fluxo. Por exemplo, você cria uma tabela de transmissão no Delta Live Tables em um único comando DDL em vez de utilizar instruções de tabela e fluxo separadas para criar a tabela de transmissão:

Observação

Este exemplo CREATE FLOW é apresentado somente para fins ilustrativos e contém palavras-chave que não são válidas na sintaxe do Delta Live Tables.

CREATE STREAMING TABLE raw_data
AS SELECT * FROM source_data("/path/to/source/data")

-- The above query is equivalent to the following statements:
CREATE STREAMING TABLE raw_data;

CREATE FLOW raw_data
AS INSERT INTO raw_data BY NAME
SELECT * FROM source_data("/path/to/source/data");

Além do fluxo padrão definido por uma consulta, as interfaces em Python e SQL do Delta Live Tables oferecem a funcionalidade de fluxo de acréscimo. O fluxo de acréscimo é compatível com processamentos que exigem a leitura de dados de várias fontes de transmissão para atualizar uma única tabela de transmissão. Por exemplo, utilize a funcionalidade fluxo de acréscimo quando tiver uma tabela e um fluxo de transmissão existentes e quiser adicionar uma nova fonte de transmissão que grave nessa tabela de transmissão existente.

Utilize o fluxo de acréscimo para gravar em uma tabela de transmissão a partir de vários transmissões de origem

Observação

Para usar o processamento de fluxo de acréscimo, seu pipeline deve ser configurado para usar o canal de visualização.

Utilize o decorador @append_flow na interface em Python ou a cláusula CREATE FLOW na interface em SQL para gravar em uma tabela de transmissão a partir de várias fontes de transmissão. Utilize o fluxo de acréscimo para tarefas de processamento como as seguintes:

  • Adicione fontes de transmissão que acrescentam dados a uma tabela de transmissão existente sem exigir uma atualização completa. Por exemplo, você pode ter uma tabela combinando dados regionais de cada região em que opera. À medida que novas regiões forem distribuídas, você poderá adicionar os dados da nova região à tabela sem executar uma atualização completa. Consulte Exemplo: Gravar em uma tabela de transmissão a partir de vários tópicos do Kafka.

  • Atualize uma tabela de transmissão anexando dados históricos ausentes (preenchimento). Por exemplo, você tem uma tabela de transmissão existente gravada por um tópico do Apache Kafka. Você tem também dados históricos armazenados em uma tabela que precisa ser inserida exatamente uma vez na tabela de transmissão e não pode transmitir os dados porque seu processamento inclui a execução de uma agregação complexa antes de inserir os dados. Consulte Exemplo: Executar um preenchimento de dados único.

  • Combine dados de diversas fontes e grave em uma única tabela de transmissão em vez de utilizar a cláusula UNION em uma consulta. O uso do processamento de fluxo de acréscimo em vez de UNION possibilita que você atualize a tabela de destino gradualmente sem executar uma atualização de atualização completa. Veja o exemplo: Usar processamento de fluxo de acréscimo em vez de UNION.

O destino da saída de registros pelo processamento do fluxo de acréscimo pode ser uma tabela existente ou uma nova tabela. Para consultas em Python, utilize a função create_transmissão_table() para criar uma tabela de destino.

Importante

  • Se você tiver que definir restrições de qualidade de dados com expectativas, defina as expectativas na tabela de destino como parte da função create_streaming_table() ou em uma definição de tabela existente. Você não pode definir expectativas na definição @append_flow .

  • Os fluxos são identificados por um nome de fluxo e esse nome é usado para identificar pontos de verificação de transmissão. O uso do nome do fluxo para identificar o ponto de verificação significa o seguinte:

    • Se um fluxo existente em um pipeline for renomeado, o ponto de verificação não será transferido e o fluxo renomeado será efetivamente um fluxo totalmente novo.

    • você não pode reutilizar um nome de fluxo em um pipeline porque o ponto de verificação existente não corresponderá à nova definição de fluxo.

Veja a seguir a sintaxe de @append_flow

import dlt

dlt.create_streaming_table("<target-table-name>") # Required only if the target table doesn't exist.

@dlt.append_flow(
  target = "<target-table-name>",
  name = "<flow-name>", # optional, defaults to function name
  spark_conf = {"<key>" : "<value", "<key" : "<value>"}, # optional
  comment = "<comment>") # optional
def <function-name>():
  return (<streaming query>)
CREATE OR REFRESH STREAMING TABLE append_target; -- Required only if the target table doesn't exist.

CREATE FLOW
  flow_name
AS INSERT INTO
  target_table BY NAME
SELECT * FROM
  source;

Exemplo: escrever em uma tabela de transmissão a partir de vários tópicos do Kafka

Os exemplos a seguir criam uma tabela de transmissão chamada kafka_target e grava nessa tabela de transmissão a partir de dois tópicos de Kafka:

import dlt

dlt.create_streaming_table("kafka_target")

# Kafka stream from multiple topics
@dlt.append_flow(target = "kafka_target")
def topic1():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,...")
      .option("subscribe", "topic1")
      .load()
  )

@dlt.append_flow(target = "kafka_target")
def topic2():
  return (
    spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", "host1:port1,...")
      .option("subscribe", "topic2")
      .load()
  )
CREATE OR REFRESH STREAMING TABLE kafka_target;

CREATE FLOW
  topic1
AS INSERT INTO
  kafka_target BY NAME
SELECT * FROM
  read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic1');

CREATE FLOW
  topic2
AS INSERT INTO
  kafka_target BY NAME
SELECT * FROM
  read_kafka(bootstrapServers => 'host1:port1,...', subscribe => 'topic2');

Para saber mais sobre a função de valor de tabela read_kafka() usada nas consultas SQL, consulte read_kafka na referência da linguagem SQL.

Exemplo: executar um preenchimento único de dados

Os exemplos a seguir executam uma consulta para acrescentar dados históricos a uma tabela de transmissão:

Observação

Para garantir um verdadeiro preenchimento único quando a consulta de backfill fizer parte de um pipeline executado de forma agendada ou contínua, remova a consulta depois de executar o pipeline uma vez. Para acrescentar novos dados se eles chegarem no diretório de backfill, deixe a consulta no lugar.

import dlt

@dlt.table()
def csv_target():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format","csv")
    .load("path/to/sourceDir")

@dlt.append_flow(target = "csv_target")
def backfill():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format","csv")
    .load("path/to/backfill/data/dir")
CREATE OR REFRESH STREAMING TABLE csv_target
AS SELECT * FROM
  cloud_files(
    "path/to/sourceDir",
    "csv"
  );

CREATE FLOW
  backfill
AS INSERT INTO
  csv_target BY NAME
SELECT * FROM
  cloud_files(
    "path/to/backfill/data/dir",
    "csv"
  );

Exemplo: utilize o processamento de fluxo de acréscimo em vez de UNION

Em vez de utilizar uma consulta com uma cláusula UNION, você pode utilizar consultas de fluxo de acréscimo para combinar várias fontes e gravar em uma única tabela de transmissão. O uso de consultas de fluxo de acréscimo em vez de UNION possibilita que você anexe a uma tabela de transmissão de várias fontes sem executar uma atualização completa.

O exemplo de Python a seguir inclui uma consulta que combina várias fontes de dados com uma cláusula UNION:

@dlt.create_table(name="raw_orders")
def unioned_raw_orders():
  raw_orders_us =
    spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/path/to/orders/us")

  raw_orders_eu =
    spark.readStream
      .format("cloudFiles")
      .option("cloudFiles.format", "csv")
      .load("/path/to/orders/eu")

  return raw_orders_us.union(raw_orders_eu)

Os exemplos a seguir substituem a consulta UNION por consultas de fluxo de acréscimo:

dlt.create_streaming_table("raw_orders")

@dlt.append_flow(target="raw_orders")
def raw_oders_us():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/us")

@dlt.append_flow(target="raw_orders")
def raw_orders_eu():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/eu")

# Additional flows can be added without the full refresh that a UNION query would require:
@dlt.append_flow(target="raw_orders")
def raw_orders_apac():
  return spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "csv")
    .load("/path/to/orders/apac")
CREATE OR REFRESH STREAMING TABLE raw_orders;

CREATE FLOW
  raw_orders_us
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  cloud_files(
    "/path/to/orders/us",
    "csv"
  );

CREATE FLOW
  raw_orders_eu
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  cloud_files(
    "/path/to/orders/eu",
    "csv"
  );

-- Additional flows can be added without the full refresh that a UNION query would require:
CREATE FLOW
  raw_orders_apac
AS INSERT INTO
  raw_orders BY NAME
SELECT * FROM
  cloud_files(
    "/path/to/orders/apac",
    "csv"
  );