Pular para o conteúdo principal

Desenvolver pipeline

O desenvolvimento e teste de código de pipeline diferem de outras cargas de trabalho do Apache Spark. Este artigo fornece uma visão geral da funcionalidade suportada, das melhores práticas e das considerações a serem feitas no desenvolvimento de código pipeline . Para obter mais recomendações e boas práticas, consulte Aplicando as melhores práticas de desenvolvimento software e DevOps ao pipeline.

nota

Você precisa adicionar o código-fonte à configuração pipeline para validar o código ou a execução de uma atualização. Consulte Configurar pipeline.

Quais arquivos são válidos para o código-fonte do pipeline?

O código do pipeline pode ser em Python ou SQL. É possível ter uma combinação de arquivos de código-fonte em Python e SQL para dar suporte a um único pipeline, mas cada arquivo só pode conter um idioma. Consulte Desenvolver código de pipeline com Python e Desenvolver código de pipeline com SQL.

Os arquivos de origem para o pipeline são armazenados em seu workspace. Os arquivos de espaço de trabalho representam scripts Python ou SQL criados no Editor LakeFlow Pipelines . Você também pode editar os arquivos localmente em seu IDE preferido e sincronizá-los com o workspace. Para obter informações sobre arquivos no workspace, consulte O que são arquivos workspace ?. Para obter informações sobre como editar com o Editor LakeFlow Pipelines , consulte Desenvolver e depurar pipelines ETL com o Editor LakeFlow Pipelines. Para obter informações sobre como criar código em um IDE local, consulte Desenvolver código pipeline em seu ambiente de desenvolvimento local.

Se você desenvolver código Python como módulos ou bibliotecas, você deve instalar e importar o código e, em seguida, chamar os métodos de um arquivo Python configurado como código-fonte. Consulte a documentação sobre gerenciamento de dependências Python para o pipeline.

nota

Se você precisar usar um comando SQL arbitrário em um Python Notebook, poderá usar o padrão de sintaxe spark.sql("<QUERY>") para executar SQL como código Python .

As funções Unity Catalog permitem que você registre funções Python arbitrárias definidas pelo usuário para uso em SQL. Veja Funções definidas pelo usuário (UDFs) no Unity Catalog.

Visão geral do recurso de desenvolvimento pipeline

O pipeline amplia e aproveita muitos recursos de desenvolvimento Databricks , além de introduzir novos recursos e conceitos. A tabela a seguir fornece uma breve visão geral dos conceitos e recursos que dão suporte ao desenvolvimento de código pipeline :

Recurso

Descrição

Modo de desenvolvimento

Executar o pipeline interativamente (optando por atualizar através do Editor LakeFlow Pipelines ) utilizará o modo de desenvolvimento . Nova execução de pipeline com modo de desenvolvimento desativado ao executar automaticamente por meio de um gatilho programático ou automatizado. Consulte o Modo de desenvolvimento.

Execução seca

Uma atualização de execução a seco verifica a exatidão do código-fonte do pipeline sem executar uma atualização em nenhuma tabela. Consulte Verificar se há erros em um pipeline sem esperar que as tabelas sejam atualizadas.

Editor LakeFlow Pipelines

Os arquivos Python e SQL configurados como código-fonte para o pipeline oferecem opções interativas para validar o código e executar atualizações. Consulte Desenvolver e depurar pipelines ETL com o Editor LakeFlow Pipelines.

Parâmetros

Utilize parâmetros no código-fonte e nas configurações do pipeline para simplificar os testes e a extensibilidade. Consulte Usar parâmetros com pipeline.

Databricks Asset Bundles

Databricks Ativo Bundles permitem mover configurações pipeline e código-fonte entre espaços de trabalho. Consulte Converter um pipeline em um projeto Databricks ativo Bundle.

Crie um conjunto de dados de amostra para desenvolvimento e teste

Databricks recomenda a criação de um conjunto de dados de desenvolvimento e teste para testar a lógica pipeline com dados esperados e registros potencialmente malformados ou corrompidos. Existem várias maneiras de criar conjuntos de dados que podem ser úteis para desenvolvimento e testes, incluindo as seguintes:

  • Selecione um subconjunto de dados de um dataset de produção.
  • Utilize dados anonimizados ou gerados artificialmente para fontes que contenham informações pessoais identificáveis. Para ver um tutorial que usa a biblioteca faker para gerar dados para teste, veja o tutorial: Construir um pipeline ETL usando captura de dados de alterações (CDC).
  • Crie dados de teste com resultados bem definidos com base na lógica de transformações posteriores.
  • Antecipe possíveis corrupções de dados, registros malformados e alterações de dados upstream criando registros que quebram as expectativas do esquema de dados.

Por exemplo, se você tiver um arquivo que define um dataset usando o seguinte código:

SQL
CREATE OR REFRESH STREAMING TABLE input_data
AS SELECT * FROM STREAM read_files(
"/production/data",
format => "json")

Você pode criar um dataset de amostra contendo um subconjunto de registros usando uma consulta como a seguinte:

SQL
CREATE OR REFRESH MATERIALIZED VIEW input_data AS
SELECT "2021/09/04" AS date, 22.4 as sensor_reading UNION ALL
SELECT "2021/09/05" AS date, 21.5 as sensor_reading

O exemplo a seguir demonstra a filtragem de dados publicados para criar um subconjunto de dados de produção para desenvolvimento ou teste:

SQL
CREATE OR REFRESH MATERIALIZED VIEW input_data AS SELECT * FROM prod.input_data WHERE date > current_date() - INTERVAL 1 DAY

Para usar esses diferentes conjuntos de dados, crie vários pipelines com o código-fonte implementando a lógica das transformações. Cada pipeline pode ler dados do dataset input_data mas é configurado para incluir o arquivo que cria o dataset específico para o ambiente.

Como os conjuntos de dados pipeline processam dados?

A tabela a seguir descreve como a visualização materializada, as tabelas de transmissão e a visualização processam dados:

Tipo de dataset

Como os registros são processados por meio de consultas definidas?

Tabela de transmissão

Cada registro é processado exatamente uma vez. Isso pressupõe uma origem somente de acréscimo.

Visualização materializada

Os registros são processados conforme necessário para retornar resultados precisos para o estado atual dos dados. A visão materializada deve ser usada para tarefas de processamento de dados, como transformações, agregações ou pré-computação de consultas lentas e computações usadas com frequência. Os resultados são armazenados em cache entre as atualizações.

View

Os registros são processados sempre que a exibição é consultada. Use modos de exibição para transformações intermediárias e verificações de qualidade de dados que não devem ser publicadas em conjuntos de dados públicos.

Declare seu primeiro conjunto de dados no pipeline.

O pipeline introduz uma nova sintaxe para Python e SQL. Para aprender o básico da sintaxe de pipeline, consulte Desenvolver código de pipeline com Python e Desenvolver código de pipeline com SQL.

nota

O pipeline separa as definições dataset do processamento de atualização, e o código-fonte pipeline não se destina à execução interativa.

Como configurar um pipeline?

As configurações de um pipeline se dividem em duas categorias principais:

  1. Configurações que definem uma coleção de arquivos (conhecidos como código-fonte ) que usam a sintaxe pipeline para declarar o conjunto de dados.
  2. Configurações que controlam a infraestrutura pipeline , o gerenciamento de dependências, como as atualizações são processadas e como as tabelas são salvas no workspace.

A maioria das configurações é opcional, mas algumas exigem atenção cuidadosa, especialmente ao configurar pipelines de produção. Isso inclui o seguinte:

  • Para disponibilizar dados fora do pipeline, você deve declarar um esquema de destino para publicar no Hive metastore ou um catálogo de destino e um esquema de destino para publicar no Unity Catalog.
  • As permissões de acesso aos dados são configuradas por meio do cluster usado para execução. Certifique-se de que seu cluster tenha permissões apropriadas configuradas para fonte de dados e o local de armazenamento de destino, se especificado.

Para obter detalhes sobre como usar Python e SQL para escrever código-fonte para pipelines, consulte a referência da linguagem SQL para pipelines e a referência da linguagem Python para pipelines declarativos doLakeFlow Spark.

Para obter mais informações sobre configurações e definições pipeline , consulte Configurar pipeline.

implementou seu primeiro pipeline e acionou atualizações

Para processar dados com SDP, configure um pipeline. Após configurar um pipeline , você pode acionar uma atualização para calcular os resultados de cada dataset no seu pipeline. Para começar a usar o pipeline, consulte o tutorial: Construir um pipeline ETL usando captura de dados de alterações (CDC).

O que é uma atualização de pipeline?

Os pipelines implementam a infraestrutura e recalculam o estado dos dados quando você inicia uma atualização . Uma atualização executa as seguintes ações:

  • Inicia um cluster com a configuração correta.
  • Descobre todas as tabelas e visualizações definidas e verifica se há erros de análise, como nomes de colunas inválidos, dependências ausentes e erros de sintaxe.
  • Cria ou atualiza tabelas e visualizações com os dados mais recentes disponíveis.

O pipeline pode ser executado continuamente ou de forma programática, dependendo dos requisitos de custo e latência do seu caso de uso. Consulte execução de uma atualização pipeline.

Ingerir dados com pipeline

O pipeline suporta todas as fontes de dados disponíveis no Databricks.

Databricks recomenda o uso de tabelas de transmissão para a maioria dos casos de uso de ingestão. Para arquivos que chegam ao armazenamento de objetos cloud , Databricks recomenda Auto Loader. É possível ingerir dados diretamente com um pipeline a partir da maioria dos barramentos de mensagens.

Para obter mais informações sobre como configurar o acesso ao armazenamento cloud , consulte configuração de armazenamento em nuvem.

Para formatos não suportados pelo Auto Loader, você pode usar Python ou SQL para consultar qualquer formato suportado pelo Apache Spark. Consulte Carregar dados no pipeline.

Monitorar e aplicar a qualidade dos dados

Você pode usar expectativas para especificar controles de qualidade de dados sobre o conteúdo de um dataset. Ao contrário de uma restrição CHECK em um banco de dados tradicional que impede a adição de quaisquer registros que não atendam à restrição, as expectativas fornecem flexibilidade ao processar dados que não atendem aos requisitos de qualidade de dados. Essa flexibilidade permite que você processe e armazene dados que você espera que sejam confusos e dados que devem atender a requisitos de qualidade rigorosos. Veja gerenciar a qualidade dos dados com expectativas pipeline.

Qual a relação entre o pipeline declarativo LakeFlow Spark e Delta Lake ?

O SDP amplia a funcionalidade do Delta Lake. Como as tabelas criadas e gerenciadas pelo pipeline são tabelas Delta , elas possuem as mesmas garantias e recursos fornecidos pelo Delta Lake. Veja O que é Delta Lake no Databricks?.

O pipeline adiciona diversas propriedades de tabela, além das muitas propriedades de tabela que podem ser definidas no Delta Lake. Consulte a referência de propriedades do pipeline e a referência de propriedades da tabelaDelta.

Como as tabelas são criadas e gerenciadas pelo pipeline

Databricks gerencia automaticamente as tabelas criadas pelo pipeline, determinando como as atualizações precisam ser processadas para compute corretamente o estado atual de uma tabela e executando uma série de tarefas de manutenção e otimização.

Para a maioria das operações, você deve permitir que o pipeline processe todas as atualizações, inserções e exclusões em uma tabela de destino. Para obter detalhes e limitações, consulte Manter exclusões ou atualizações manuais.

Tarefa de manutenção realizada por oleoduto

Databricks executa tarefas de manutenção em tabelas gerenciadas por pipeline em uma cadência ideal, utilizando otimização preditiva. A manutenção pode melhorar o desempenho das consultas e reduzir custos, removendo versões antigas das tabelas. Isso inclui operações completas OPTIMIZE e vacuum . As tarefas de manutenção são executadas em um programa definido por otimização preditiva, e somente se uma atualização pipeline tiver sido executada desde a manutenção anterior.

Para entender com que frequência a otimização preditiva é executada e para entender os custos de manutenção, consulte Referência da tabela do sistema de otimização preditiva.

Limitações

Para obter uma lista de limitações, consulte Limitações do pipeline.

Para obter uma lista de requisitos e limitações específicos para o uso do pipeline com Unity Catalog, consulte Usar Unity Catalog com o pipeline.

Recurso adicional