Pular para o conteúdo principal

Conecte-se ao Amazon Kinesis

Use transmissão estruturada para ler e gravar dados no Amazon Kinesis.

Databricks recomenda que você habilite o endpoint VPC S3 para que todo o tráfego S3 seja roteado na rede AWS .

nota

Se você excluir e recriar uma transmissão Kinesis , não poderá reutilizar nenhum diretório de ponto de verificação existente para reiniciar uma consulta de transmissão. Você deve excluir os diretórios de ponto de verificação e iniciar essas consultas do zero. Você pode reestruturar a transmissão aumentando o número de shards sem interromper ou reiniciar a transmissão.

Para obter recomendações sobre como solucionar problemas de latência de consulta, consulte Recomendações para reduzir a latência com o Kinesis.

Autenticação com o Amazon Kinesis

No Databricks Runtime 16.1 e versões superiores, Databricks recomenda que você gerencie as conexões com Kinesis usando uma credencial de serviço Databricks . Consulte Criar credenciais de serviço.

Para usar uma credencial de serviço, faça o seguinte:

  1. Crie uma credencial de serviço Databricks usando um IAM role com as permissões necessárias para acessar Kinesis. Consulte a Etapa 1: Criar um IAM role.
  2. Forneça o nome da credencial do serviço usando a opção serviceCredential ao definir uma leitura de transmissão.
nota

A fonte Kinesis requer permissões ListShards, GetRecords e GetShardIterator . Se você encontrar Amazon: Access Denied exceções, verifique se sua IAM role tem essas permissões. Consulte Controlando o acesso ao recurso de transmissão de dados Amazon Kinesis usando IAM.

Métodos alternativos de autenticação

No Databricks Runtime 16.0 e versões anteriores, as credenciais do serviço Databricks não estão disponíveis. O Databricks possui os seguintes métodos alternativos de autenticação:

perfil da instância

Anexar um instance profile durante a configuração do compute. Veja o perfil da instância.

Os perfis de instância não são suportados no modo de acesso padrão (anteriormente modo de acesso compartilhado). Consulte os requisitos e limitações compute padrão.

Use a chave de acesso diretamente

Defina as opções awsAccessKey e awsSecretKey .

Se usar chaves, armazene-as usando segredos Databricks . Veja Gestão Secreta.

Assumir IAM role

Algumas configurações compute permitem que você assuma uma IAM role usando a opção roleArn . Para assumir uma função, inicie seu cluster com permissões para assumir a função ou forneça a chave de acesso por meio de awsAccessKey e awsSecretKey.

Este método suporta autenticação entreaccount . Para obter mais informações, consulte Delegar acesso em toda a conta AWS usando a função IAM.

Opcionalmente, você pode especificar a ID externa com roleExternalId e um nome de sessão com roleSessionName.

Esquema

O Kinesis retorna registros com o seguinte esquema:

Coluna

Tipo

partitionKey

string

data

binário

stream

string

shardId

string

sequenceNumber

string

approximateArrivalTimestamp

carimbo de data/hora

Para desserializar os dados na coluna data , converta o campo para strings.

Início rápido

O notebook a seguir demonstra como executar o WordCount com a Transmissão estruturada com Kinesis.

Kinesis WordCount com o Notebook Transmissão Estruturada

Abrir notebook em uma nova aba

Configurar as opções do Kinesis

Em Databricks Runtime 13.3 LTS e acima, o senhor pode usar Trigger.AvailableNow com Kinesis. Consulte Ingerir registros Kinesis como um lote incremental.

No Databricks Runtime 16.1 e versões superiores, você pode usar streamARN para identificar fontes Kinesis . Para todas as versões do Databricks Runtime, você deve especificar streamName ou streamARN, mas não ambos.

atenção

Não alterne entre streamName e streamARN para uma consulta de transmissão ativa. Databricks não permite alterar essas opções durante a transmissão. Reiniciar a consulta pode resultar em registros duplicados ou perda de dados. Para alternar de streamName para streamARN, inicie uma nova consulta de transmissão com um novo diretório de ponto de verificação.

Para obter a lista completa de opções, consulte Kinesis.

Monitoramento e alertas de baixa latência

Os casos de uso de alertas exigem baixa latência. Para minimizar a latência:

  • Verifique se sua consulta de transmissão é a única consumidora da transmissão Kinesis para otimizar o desempenho de busca e evitar os limites de taxa Kinesis .
  • Defina a opção maxFetchDuration para um valor pequeno, como 200ms, para processar os dados obtidos o mais rápido possível. Essa opção representa um equilíbrio: ela prioriza uma velocidade de processamento mais rápida por lote em vez de garantir que os registros mais recentes sejam consumidos em cada lote. Por exemplo, se você usar Trigger.AvailableNow, um valor pequeno pode fazer com que sua consulta fique desatualizada em relação aos registros mais recentes na transmissão Kinesis .
  • Defina a opção minFetchPeriod para 210 ms para buscar com a maior frequência possível.
  • Defina a opção shardsPerTask ou configure o cluster de forma que # cores in cluster >= 2 * (# Kinesis shards) / shardsPerTask. Isso garante que a tarefa de pré-busca em segundo plano e a tarefa de consulta de transmissão sejam executadas simultaneamente.

Se sua consulta estiver recebendo dados a cada 5 segundos, você pode estar excedendo os limites de taxa do Kinesis. Revise suas configurações.

Monitorar métricas do Kinesis

Kinesis informa o número de milissegundos que um consumidor fica atrasado em relação ao início de uma transmissão para cada workspace. As métricas avgMsBehindLatest, maxMsBehindLatest e minMsBehindLatest fornecem a média, o mínimo e o máximo em milissegundos em todo o espaço de trabalho no processo de consulta de transmissão. Veja consultas de monitoramento transmissão estruturada no Databricks.

Se você estiver executando a transmissão em um Notebook, consulte as métricas na tab dados brutos no painel de progresso da consulta de transmissão. Eis um exemplo:

JSON
{
"sources": [
{
"description": "KinesisV2[stream]",
"metrics": {
"avgMsBehindLatest": "32000.0",
"maxMsBehindLatest": "32000",
"minMsBehindLatest": "32000"
}
}
]
}

Ingerir registros do Kinesis como lotes incrementais

Em Databricks Runtime 13.3 LTS e acima, Databricks suporta o uso de Trigger.AvailableNow com Kinesis fonte de dados para semântica de lotes incrementais. A seguir, descrevemos a configuração básica:

  1. Quando um micro-batch lê gatilhos no modo disponível agora, a hora atual é registrada pelo cliente Databricks.
  2. O Databricks pesquisa o sistema de origem para todos os registros com carimbos de data/hora entre esse tempo registrado e o ponto de verificação anterior.
  3. O Databricks carrega esses registros com Trigger.AvailableNow semântica.

Databricks usa um mecanismo de melhor esforço para tentar consumir todos os registros que existem na transmissão Kinesis durante a execução da consulta de transmissão. Devido a pequenas diferenças potenciais nos registros de data e hora e à falta de garantia na ordenação na fonte de dados, um lote acionado pode não incluir alguns registros. Os registros omitidos são processados nos próximos microlotes acionados.

nota

Se a consulta continuar falhando na busca de registros da transmissão Kinesis mesmo que haja registros, tente aumentar o valor maxFetchDuration.

Veja AvailableNow: Processamento incremental de lotes.

Escreva para a Kinesis

Use o seguinte trecho de código como um ForeachSink para gravar dados no Kinesis. Exige um Dataset[(String, Array[Byte])].

nota

O trecho de código a seguir fornece semântica pelo menos uma vez , não exatamente uma vez.

Kinesis Notebook Foreach Sink

Abrir notebook em uma nova aba

Recomendações para reduzir a latência com o Kinesis

Esta seção contém recomendações para solucionar diversas causas de latência na transmissão Kinesis .

A tarefa Spark de execução da fonte Kinesis é executada em uma thread em segundo plano para buscar periodicamente os dados Kinesis e, em seguida, armazená-los em cache na memória executor Spark . Após a conclusão de cada pré-busca ou etapa, a consulta de transmissão pode processar os dados armazenados em cache. A pré-busca ou passo afeta significativamente a latência ponta a ponta observada e a taxa de transferência.

Reduza a latência de pré-busca

Para otimizar a latência mínima da consulta e o uso máximo do recurso, use o cálculo a seguir:

total number of CPU cores in the cluster (across all executors) > = total number of Kinesis shards / shardsPerTask.

importante

minFetchPeriod pode criar várias chamadas de API GetRecords para o shard do Kinesis até que ele chegue a ReadProvisionedThroughputExceeded. Caso ocorra uma exceção, isso pode não ser um problema, pois o conector maximiza a utilização do shard do Kinesis.

Evite lentidões causadas por muitos erros de limite de taxa

O conector reduz a quantidade de dados lidos do Kinesis pela metade sempre que encontra um erro de limitação de taxa e registra esse evento no log com uma mensagem: "Hit rate limit. Sleeping for 5 seconds."

Você pode se deparar com esses erros enquanto uma transmissão está sendo processada. Se você observar esses erros após uma transmissão ser concluída, talvez seja necessário ajustar a carga de trabalho, aumentando a capacidade Kinesis na AWS ou ajustando as opções de pré-busca no Spark.

Evite o despejo em disco

Se houver um aumento repentino no volume de dados em sua transmissão Kinesis , a capacidade do buffer alocado pode se esgotar e não ser liberada com rapidez suficiente para adicionar novos dados. Spark está gravando dados do buffer em disco, o que torna o processamento da transmissão mais lento, e um evento aparece no log com uma mensagem como a seguinte:

Bash
./log4j.txt:879546:20/03/02 17:15:04 INFO BlockManagerInfo: Updated kinesis_49290928_1_ef24cc00-abda-4acd-bb73-cb135aed175c on disk on 10.0.208.13:43458 (current size: 88.4 MB, original size: 0.0 B)

Para evitar o transbordamento, aumente a capacidade de memória do cluster adicionando mais nós ou aumentando a memória por nó, ou reduza o parâmetro de configuração fetchBufferSize.

Tarefa de gravação S3 suspensa

Habilite a especulação Spark para encerrar tarefas suspensas que impediriam o processamento da transmissão. Para garantir que as tarefas não sejam encerradas de forma muito agressiva, ajuste cuidadosamente o quantil e o multiplicador para essa configuração. A Databricks recomenda que você defina spark.speculation.multiplier como 3 e spark.speculation.quantile como 0.95 e ajuste conforme necessário.

Reduzir a latência do checkpoint em sistemas com estado

Databricks recomenda o uso do site RocksDB com checkpointing de changelog para consultas de transmissão com estado. Consulte Habilitar o checkpoint do registro de alterações.

Configure o Kinesis enhanced fan-out (EFO) para leituras de consulta de transmissão

No Databricks Runtime 11.3e acima, o conector Databricks Runtime Kinesis fornece suporte para usar o recurso de fan-out aprimorado (EFO) do Amazon Kinesis.

Kinesis enhanced fan-out fornece uma taxa de transferência dedicada de 2 MB/s por shard por consumidor (máximo de 20 consumidores por transmissão) e entrega registros no modo push em vez do modo pull.

Por default, uma consulta de transmissão estruturada configurada com modo EFO se registra como um consumidor com Taxa de transferência dedicada e um nome de consumidor exclusivo e ARN (nome de recurso Amazon ) do consumidor no Kinesis Data transmissão.

Por default, Databricks usa o ID da consulta de transmissão com o prefixo databricks_ para nomear o novo consumidor. Você pode, opcionalmente, especificar as opções consumerNamePrefix ou consumerName para substituir esse comportamento. O consumerName deve ser uma cadeia que contém apenas letras, números e os caracteres especiais _ . -.

Ao reiniciar a consulta, a fonte Kinesis usa o modo de polling para reproduzir os últimos lotes não confirmados, caso existam. Após a transmissão reproduzir os lotes não confirmados, a fonte retorna ao modo EFO para as leituras subsequentes.

importante

Um consumidor EFO registrado incorre em cobranças adicionais no Amazon Kinesis. Para cancelar o registro do consumidor automaticamente na desmontagem da consulta, defina a opção requireConsumerDeregistration como true. A Databricks não pode garantir o cancelamento do registro em eventos como falhas de driver ou falhas de nó. Em caso de falha no trabalho, o site Databricks recomenda gerenciar os consumidores registrados diretamente para evitar cobranças excessivas no site Kinesis.

Gerenciamento de consumidores off-line usando um notebook Databricks

Use as utilidades AWSKinesisConsumerManager para registrar, listar ou cancelar o registro de consumidores para transmissão de dados Kinesis programaticamente, em vez de configurar manualmente os consumidores no console da sua account AWS . Por exemplo, use as utilidades para criar um consumidor para uma nova transmissão ou, se você planeja interromper permanentemente uma transmissão, use as utilidades para excluir o consumidor na AWS.

O gerenciador de consumidores `utilities` está disponível apenas em Scala com o modo de acesso dedicado definido para o parâmetro compute . Consulte Modos de acesso.

Para usar essas utilidades em um Notebook Databricks :

  1. Em um novo Notebook Databricks anexado a um cluster ativo, crie um AWSKinesisConsumerManager com as informações de autenticação necessárias.

    Scala
    import com.databricks.sql.kinesis.AWSKinesisConsumerManager

    val manager = AWSKinesisConsumerManager.newManager()
    .option("serviceCredential", serviceCredentialName)
    .option("region", kinesisRegion)
    .create()
  2. Liste e exiba os consumidores.

    Scala
    val consumers = manager.listConsumers("<stream name>")
    display(consumers)
  3. registrar um consumidor para determinada transmissão.

    Scala
    val consumerARN = manager.registerConsumer("<stream name>", "<consumer name>")
  4. Cancelar o cadastro de um consumidor para determinada transmissão.

    Scala
    manager.deregisterConsumer("<stream name>", "<consumer name>")