Pular para o conteúdo principal

Observabilidade em Databricks for Job, LakeFlow Declarative pipeline e LakeFlow Connect

monitorar o desempenho, o custo e a integridade de seus aplicativos de transmissão é essencial para criar um pipeline confiável e eficiente ETL. Databricks fornece um rico conjunto de recursos de observabilidade entre Jobs, LakeFlow Declarative pipeline e LakeFlow Connect para ajudar a diagnosticar gargalos, otimizar o desempenho e gerenciar o uso e os custos de recursos.

Este artigo descreve as práticas recomendadas nas seguintes áreas:

  • transmissão chave desempenho métricas
  • Esquemas de log de eventos e consultas de exemplo
  • monitoramento da consulta de transmissão
  • Observabilidade de custos usando tabelas do sistema
  • Exportação de logs e métricas para ferramentas externas

principais métricas para a observabilidade da transmissão

Ao operar o oleoduto de transmissão, monitore as seguintes key métricas:

Métrica

Propósito

Contrapressão

Monitora o número de arquivos e compensações (tamanhos). Ajuda a identificar gargalos e garante que o sistema possa lidar com os dados recebidos sem ficar para trás.

Taxa de transferência

Rastreia o número de mensagens processadas por micro-lotes. Avaliar a eficiência do pipeline e verificar se ele acompanha o ritmo da ingestão de dados.

Duração

Mede a duração média de um micro-lote. Indica a velocidade de processamento e ajuda a ajustar os intervalos de lotes.

Latência

Indica quantos gravos/mensagens são processados ao longo do tempo. Ajuda a entender os atrasos do pipeline de ponta a ponta e a otimizar para reduzir as latências.

utilização de clustering

Reflete o uso da CPU e da memória (%). Garante o uso eficiente de recursos e ajuda o agrupamento de escalas para atender às demandas de processamento.

Rede

Mede os dados transferidos e recebidos. Útil para identificar gargalos na rede e melhorar o desempenho da transferência de dados.

Ponto de verificação

Identifica dados processados e compensações. Garante a consistência e permite a tolerância a falhas durante falhas.

Custo

Mostra os custos horários, diários e mensais de um aplicativo de transmissão. Auxilia no orçamento e na otimização de recursos.

Linhagem

Exibe o conjunto de dados e as camadas criadas no aplicativo de transmissão. Facilita as transformações de dados, o acompanhamento, a garantia de qualidade e a depuração.

agrupamento logs e métricas

Databricks clustering logs e métricas fornecem percepções detalhadas sobre o desempenho e a utilização do clustering. Essas logs e métricas incluem informações sobre CPU, memória, E/S de disco, tráfego de rede e outras métricas do sistema. O monitoramento dessas métricas é crucial para otimizar o desempenho do clustering, gerenciar o recurso de forma eficiente e solucionar problemas.

Databricks clustering logs e métricas oferecem percepções detalhadas sobre o desempenho do clustering e a utilização do recurso. Isso inclui uso de CPU e memória, E/S de disco e tráfego de rede. O monitoramento dessas métricas é fundamental para:

  • Otimização do desempenho do clustering.
  • Gerenciar o recurso de forma eficiente.
  • Solução de problemas operacionais.

As métricas podem ser aproveitadas por meio da interface do usuário do Databricks ou exportadas para ferramentas de monitoramento pessoais. Veja o exemplo emNotebook: Datadog métricas.

Spark UI

O site Spark UI mostra informações detalhadas sobre o progresso do trabalho e dos estágios, incluindo o número de tarefas concluídas, pendentes e reprovadas. Isso ajuda você a entender o fluxo de execução e identificar gargalos.

Para aplicações de transmissão, a transmissão tab mostra métricas como taxa de entrada, taxa de processamento e lotes de duração. Ele ajuda o senhor a monitorar o desempenho do seu Job de transmissão e a identificar qualquer problema de ingestão de dados ou de processamento.

Para obter mais informações, consulte depuração com a UI Apache Spark.

computar métricas

O site compute métricas ajudará o senhor a entender a utilização do clustering. À medida que seu trabalho é executado, o senhor pode ver como ele escala e como seus recursos são afetados. Você poderá encontrar a pressão de memória que pode levar a falhas de OOM ou a pressão da CPU que pode causar longos atrasos. Aqui estão as métricas específicas que o senhor verá:

  • Distribuição da carga do servidor : a utilização da CPU de cada nó no último minuto.
  • Utilização da CPU : A porcentagem de tempo que a CPU passou em vários modos (por exemplo, usuário, sistema, parado e iowait).
  • Utilização de memória : uso total de memória por cada modo (por exemplo, usada, livre, em buffer e em cache).
  • Utilização de swap de memória : Memória total swap usage.
  • Espaço livre no sistema de arquivos: uso total do sistema de arquivos por cada ponto de montagem.
  • Taxa de transferência da rede : O número de bytes recebidos e transmitidos pela rede por cada dispositivo.
  • Número de nós ativos : O número de nós ativos em cada registro de data e hora para o site compute.

Para obter mais informações, consulte os gráficos Monitor desempenho e Hardware métricas.

Tabelas do sistema

Monitoramento de custos

Databricks As tabelas do sistema fornecem uma abordagem estruturada para monitorar o custo e o desempenho do trabalho. Essas tabelas incluem:

  • Job detalhes da execução.
  • utilização de recursos.
  • Custos associados.

Use essas tabelas para entender a saúde operacional e o impacto financeiro.

Requisitos

Usar tabelas do sistema para monitoramento de custos:

  • Um administrador do account deve habilitar o system.lakeflow schema.
  • Os usuários devem:
    • Ser um administrador de metastore e um administrador de account, ou
    • Tenha as permissões USE e SELECT nos esquemas do sistema.

Exemplo de consulta: Trabalho mais caro (últimos 30 dias)

Essa consulta identifica o trabalho mais caro nos últimos 30 dias, auxiliando na análise e otimização de custos.

SQL
WITH list_cost_per_job AS (
SELECT
t1.workspace_id,
t1.usage_metadata.job_id,
COUNT(DISTINCT t1.usage_metadata.job_run_id) AS runs,
SUM(t1.usage_quantity * list_prices.pricing.default) AS list_cost,
FIRST(identity_metadata.run_as, true) AS run_as,
FIRST(t1.custom_tags, true) AS custom_tags,
MAX(t1.usage_end_time) AS last_seen_date
FROM system.billing.usage t1
INNER JOIN system.billing.list_prices list_prices ON
t1.cloud = list_prices.cloud AND
t1.sku_name = list_prices.sku_name AND
t1.usage_start_time >= list_prices.price_start_time AND
(t1.usage_end_time <= list_prices.price_end_time OR list_prices.price_end_time IS NULL)
WHERE
t1.billing_origin_product = "JOBS"
AND t1.usage_date >= CURRENT_DATE() - INTERVAL 30 DAY
GROUP BY ALL
),
most_recent_jobs AS (
SELECT
*,
ROW_NUMBER() OVER(PARTITION BY workspace_id, job_id ORDER BY change_time DESC) AS rn
FROM
system.lakeflow.jobs QUALIFY rn=1
)
SELECT
t2.name,
t1.job_id,
t1.workspace_id,
t1.runs,
t1.run_as,
SUM(list_cost) AS list_cost,
t1.last_seen_date
FROM list_cost_per_job t1
LEFT JOIN most_recent_jobs t2 USING (workspace_id, job_id)
GROUP BY ALL
ORDER BY list_cost DESC

LakeFlow Pipeline declarativo

O evento LakeFlow Declarative pipeline log captura um registro abrangente de todos os eventos pipeline, inclusive:

  • Auditoria logs.
  • Verificações de qualidade de dados.
  • Progresso do pipeline.
  • linhagem de dados.

O evento log é ativado automaticamente para todos os pipelines LakeFlow Declarative e pode ser acessado por meio de

  • UI do pipeline : visualize logs diretamente.
  • API DLT : Acesso programático.
  • Consulta direta : Consultar a tabela de eventos log.

Para obter mais informações, consulte o evento log schema for LakeFlow Declarative pipeline.

Exemplos de consultas

Essas consultas de exemplo ajudam a monitorar o desempenho e a integridade do pipeline, fornecendo key métricas como duração dos lotes, taxa de transferência, pressão de retorno e utilização de recursos.

Duração média dos lotes

Essa consulta calcula a duração média dos lotes processados pelo pipeline.

SQL
SELECT
(max_t - min_t) / batch_count as avg_batch_duration_seconds,
batch_count,
min_t,
max_t,
date_hr,
message
FROM
-- /60 for minutes
(
SELECT
count(*) as batch_count,
unix_timestamp(
min(timestamp)
) as min_t,
unix_timestamp(
max(timestamp)
) as max_t,
date_format(timestamp, 'yyyy-MM-dd:HH') as date_hr,
message
FROM
event_log
WHERE
event_type = 'flow_progress'
AND level = 'METRICS'
GROUP BY
date_hr,
message
)
ORDER BY
date_hr desc

Taxa de transferência média

Essa consulta calcula a taxa média de transferência do site pipeline em termos de linhas processadas por segundo.

SQL
SELECT
(max_t - min_t) / total_rows as avg_throughput_rps,
total_rows,
min_t,
max_t,
date_hr,
message
FROM
-- /60 for minutes
(
SELECT
sum(
details:flow_progress:metrics:num_output_rows
) as total_rows,
unix_timestamp(
min(timestamp)
) as min_t,
unix_timestamp(
max(timestamp)
) as max_t,
date_format(timestamp, 'yyyy-MM-dd:HH') as date_hr,
message
FROM
event_log
WHERE
event_type = 'flow_progress'
AND level = 'METRICS'
GROUP BY
date_hr,
message
)
ORDER BY
date_hr desc

Contrapressão

Essa consulta mede a contrapressão do pipeline verificando o backlog de dados.

SQL
SELECT
timestamp,
DOUBLE(
details:flow_progress:metrics:backlog_bytes
) AS backlog_bytes,
DOUBLE(
details:flow_progress:metrics:backlog_files
) AS backlog_files
FROM
event_log
WHERE
event_type = 'flow_progress'

agrupamento e utilização de slots

Essa consulta tem percepções sobre a utilização de clustering ou slots usados pelo pipeline.

SQL
SELECT
date_trunc("hour", timestamp) AS hour,
AVG (
DOUBLE (
details:cluster_resources:num_task_slots
)
) AS num_task_slots,
AVG (
DOUBLE (
details:cluster_resources:avg_num_task_slots
)
) AS avg_num_task_slots,
AVG (
DOUBLE (
details:cluster_resources:num_executors
)
) AS num_executors,
AVG (
DOUBLE (
details:cluster_resources:avg_task_slot_utilization
)
) AS avg_utilization,
AVG (
DOUBLE (
details:cluster_resources:avg_num_queued_tasks
)
) AS queue_size
FROM
event_log
WHERE
details : cluster_resources : avg_num_queued_tasks IS NOT NULL
AND origin.update_id = '${latest_update_id}'
GROUP BY
1;

Empregos

O senhor pode monitorar as consultas de transmissão no Job por meio do Query Listener de transmissão.

Anexe um ouvinte à sessão Spark para ativar a transmissão do Query Listener no Databricks. Esse ouvinte monitorará o progresso e as métricas de suas consultas de transmissão. Ele pode ser usado para enviar métricas para ferramentas de monitoramento externas ou log para análise posterior.

Exemplo: Exportar métricas para ferramentas de monitoramento externas

::: nota

Isso está disponível em Databricks Runtime 11.3 LTS e acima para Python e Scala.

:::

O senhor pode exportar as métricas de transmissão para um serviço externo para alertas ou painéis de controle usando a interface StreamingQueryListener.

Aqui está um exemplo básico de como implementar um ouvinte:

Python
from pyspark.sql.streaming import StreamingQueryListener

class MyListener(StreamingQueryListener):
def onQueryStarted(self, event):
print("Query started: ", event.id)

def onQueryProgress(self, event):
print("Query made progress: ", event.progress)

def onQueryTerminated(self, event):
print("Query terminated: ", event.id)

spark.streams.addListener(MyListener())

Exemplo: Usar o ouvinte de consulta no Databricks

Abaixo está um exemplo de um evento StreamingQueryListener log para uma consulta de transmissão Kafka para Delta Lake:

JSON
{
"id": "210f4746-7caa-4a51-bd08-87cabb45bdbe",
"runId": "42a2f990-c463-4a9c-9aae-95d6990e63f4",
"timestamp": "2024-05-15T21:57:50.782Z",
"batchId": 0,
"batchDuration": 3601,
"numInputRows": 20,
"inputRowsPerSecond": 0.0,
"processedRowsPerSecond": 5.55401277422938,
"durationMs": {
"addBatch": 1544,
"commitBatch": 686,
"commitOffsets": 27,
"getBatch": 12,
"latestOffset": 577,
"queryPlanning": 105,
"triggerExecution": 3600,
"walCommit": 34
},
"stateOperators": [
{
"operatorName": "symmetricHashJoin",
"numRowsTotal": 20,
"numRowsUpdated": 20,
"allUpdatesTimeMs": 473,
"numRowsRemoved": 0,
"allRemovalsTimeMs": 0,
"commitTimeMs": 277,
"memoryUsedBytes": 13120,
"numRowsDroppedByWatermark": 0,
"numShufflePartitions": 5,
"numStateStoreInstances": 20,
"customMetrics": {
"loadedMapCacheHitCount": 0,
"loadedMapCacheMissCount": 0,
"stateOnCurrentVersionSizeBytes": 5280
}
}
],
"sources": [
{
"description": "KafkaV2[Subscribe[topic-1]]",
"numInputRows": 10,
"inputRowsPerSecond": 0.0,
"processedRowsPerSecond": 2.77700638711469,
"metrics": {
"avgOffsetsBehindLatest": "0.0",
"estimatedTotalBytesBehindLatest": "0.0",
"maxOffsetsBehindLatest": "0",
"minOffsetsBehindLatest": "0"
}
},
{
"description": "DeltaSource[file:/tmp/spark-1b7cb042-bab8-4469-bb2f-733c15141081]",
"numInputRows": 10,
"inputRowsPerSecond": 0.0,
"processedRowsPerSecond": 2.77700638711469,
"metrics": {
"numBytesOutstanding": "0",
"numFilesOutstanding": "0"
}
}
]
}

Para obter mais exemplos, consulte: Exemplos.

Consultar o progresso das métricas

As métricas de progresso de consultas são essenciais para monitorar o desempenho e a integridade de suas consultas de transmissão. Essas métricas incluem o número de linhas de entrada, taxas de processamento e várias durações relacionadas à execução da consulta. O senhor pode observar essas métricas anexando um StreamingQueryListener à sessão do Spark. O ouvinte emitirá eventos contendo essas métricas no final de cada período de transmissão.

Por exemplo, o senhor pode acessar as métricas usando o mapa StreamingQueryProgress.observedMetrics no método onQueryProgress do ouvinte. Isso permite que o senhor acompanhe e analise o desempenho de suas consultas de transmissão em tempo real.

Python
class MyListener(StreamingQueryListener):
def onQueryProgress(self, event):
print("Query made progress: ", event.progress.observedMetrics)