Pular para o conteúdo principal

Conecte-se ao Amazon Kinesis

Use transmissão estruturada para ler e gravar dados no Amazon Kinesis.

Databricks recomenda que você habilite o endpoint VPC S3 para que todo o tráfego S3 seja roteado na rede AWS .

nota

Se você excluir e recriar uma transmissão Kinesis , não poderá reutilizar nenhum diretório de ponto de verificação existente para reiniciar uma consulta de transmissão. Você deve excluir os diretórios de ponto de verificação e iniciar essas consultas do zero. Você pode reestruturar a transmissão aumentando o número de shards sem interromper ou reiniciar a transmissão.

Para obter recomendações sobre como solucionar problemas de latência de consulta, consulte Recomendações para reduzir a latência com o Kinesis.

Autenticação com o Amazon Kinesis

No Databricks Runtime 16.1 e versões superiores, Databricks recomenda que você gerencie as conexões com Kinesis usando uma credencial de serviço Databricks . Consulte Criar credenciais de serviço.

Para usar uma credencial de serviço, faça o seguinte:

  1. Crie uma credencial de serviço Databricks usando um IAM role com as permissões necessárias para acessar Kinesis. Consulte a Etapa 1: Criar um IAM role.
  2. Forneça o nome da credencial do serviço usando a opção serviceCredential ao definir uma leitura de transmissão.
nota

A fonte Kinesis requer permissões ListShards, GetRecords e GetShardIterator . Se você encontrar Amazon: Access Denied exceções, verifique se sua IAM role tem essas permissões. Consulte Controlando o acesso ao recurso de transmissão de dados Amazon Kinesis usando IAM.

Métodos alternativos de autenticação

No Databricks Runtime 16.0 e versões anteriores, as credenciais do serviço Databricks não estão disponíveis. O Databricks possui os seguintes métodos alternativos de autenticação:

perfil da instância

Anexar um instance profile durante a configuração do compute. Veja o perfil da instância.

Os perfis de instância não são suportados no modo de acesso padrão (anteriormente modo de acesso compartilhado). Consulte os requisitos e limitações compute padrão.

Use a chave de acesso diretamente

Defina as opções awsAccessKey e awsSecretKey .

Se usar chaves, armazene-as usando segredos Databricks . Veja Gestão Secreta.

Assumir IAM role

Algumas configurações compute permitem que você assuma uma IAM role usando a opção roleArn . Para assumir uma função, inicie seu cluster com permissões para assumir a função ou forneça a chave de acesso por meio de awsAccessKey e awsSecretKey.

Este método suporta autenticação entreaccount . Para obter mais informações, consulte Delegar acesso em toda a conta AWS usando a função IAM.

Opcionalmente, você pode especificar a ID externa com roleExternalId e um nome de sessão com roleSessionName.

Esquema

O Kinesis retorna registros com o seguinte esquema:

Coluna

Tipo

partitionKey

string

data

binário

stream

string

shardId

string

sequenceNumber

string

approximateArrivalTimestamp

carimbo de data/hora

Para desserializar os dados na coluna data , converta o campo para strings.

Início rápido

O notebook a seguir demonstra como executar o WordCount com a Transmissão estruturada com Kinesis.

Kinesis WordCount com o Notebook Transmissão Estruturada

Abrir notebook em uma nova aba

Configurar as opções do Kinesis

Em Databricks Runtime 13.3 LTS e acima, o senhor pode usar Trigger.AvailableNow com Kinesis. Consulte Ingerir registros Kinesis como um lote incremental.

No Databricks Runtime 16.1 e versões superiores, você pode usar streamARN para identificar fontes Kinesis . Para todas as versões do Databricks Runtime, você deve especificar streamName ou streamARN, mas não ambos.

atenção

Não alterne entre streamName e streamARN para uma consulta de transmissão ativa. Databricks não permite alterar essas opções durante a transmissão. Reiniciar a consulta pode resultar em registros duplicados ou perda de dados. Para alternar de streamName para streamARN, inicie uma nova consulta de transmissão com um novo diretório de ponto de verificação.

Veja a seguir as configurações comuns para as fontes de dados do Kinesis:

Opção

Valor

Padrão

Descrição

streamName

Uma lista separada por vírgulas de nomes de transmissão.

Nenhuma

Os nomes dos transmissão para assinar.

streamARN

Uma lista separada por vírgulas de Kinesis transmissão ARNs. Por exemplo, "arn:aws:kinesis:myarn1,arn:aws:kinesis:myarn2".

Nenhuma

A ARNs de transmissão a ser assinada. Disponível em Databricks Runtime 16.1 e acima.

region

Região para as transmissões a serem especificadas.

Região resolvida localmente

A região em que as transmissões são definidas.

endpoint

Região para o fluxo de dados do Kinesis.

Região resolvida localmente

O ponto final regional do Kinesis Data Streams.

initialPosition

latest, trim_horizon, earliest (alias de trim_horizon), at_timestamp.

latest

Por onde começar a ler na transmissão.

Especifique at_timestamp como uma cadeia de caracteres JSON usando o formato Java default para carimbos de data/hora, como {"at_timestamp": "06/25/2020 10:23:45 PDT"}. A consulta de transmissão lê todas as alterações em ou após o registro de data e hora fornecido (inclusive). O senhor pode especificar explicitamente os formatos fornecendo um campo adicional nas cadeias de caracteres JSON, como {"at_timestamp": "06/25/2020 10:23:45 PDT", "format": "MM/dd/yyyy HH:mm:ss ZZZ"}.

maxRecordsPerFetch

Um número inteiro positivo.

10.000

Número de registros a serem lidos por solicitação de API ao Kinesis. O número de registros retornados pode ser maior dependendo se os sub-registros foram agregados em um único registro usando a biblioteca Kinesis Producer.

maxFetchRate

Número decimal positivo representando a taxa de dados em MB/s.

1,0 (máx = 2,0)

Qual a velocidade de pré-busca de dados por fragmento? Essa opção limita a taxa de requisições e evita a limitação de requisições pelo Kinesis. 2,0 MB/s é a taxa máxima permitida pelo Kinesis.

minFetchPeriod

Uma string de duração, por exemplo, 1s por 1 segundo.

400ms (min = 200ms)

A duração do tempo de espera entre tentativas consecutivas de pré-busca. Essa opção limita a frequência de requisições e evita a limitação de taxa do Kinesis. 200 ms é o mínimo, pois o Kinesis permite um máximo de 5 buscas por segundo.

maxFetchDuration

Uma string de duração, por exemplo, 1m por 1 minuto.

10s

A duração serve para armazenar em buffer os novos dados pré-buscados antes de disponibilizá-los para processamento.

fetchBufferSize

Uma sequência de bytes, por exemplo, 2gb ou 10mb.

20 Gb

Quantidade de dados a serem armazenados em buffer para o próximo gatilho. Essa opção é uma condição de parada, não um limite superior estrito. É possível que mais dados sejam armazenados em buffer do que o especificado para este valor.

shardsPerTask

Um número inteiro positivo.

5

O número de fragmentos do Kinesis a serem pré-buscados em paralelo por tarefa do Spark. Idealmente, # cores in cluster >= # Kinesis shards / shardsPerTask para latência mínima de consulta e uso máximo de recursos.

shardFetchInterval

Uma string de duração, por exemplo, 2m por 2 minutos.

1s

O intervalo em que o Kinesis deve ser consultado para refragmentação.

serviceCredential

String

Nenhum padrão.

O nome de sua credencial de serviço Databricks. Consulte Criar credenciais de serviço.

awsAccessKey

String

Nenhum padrão.

Chave de acesso AWS.

awsSecretKey

String

Nenhum padrão.

Chave de acesso secreta AWS correspondente à chave de acesso.

roleArn

String

Nenhum padrão.

O nome do recurso da Amazon (ARN) do papel a ser assumido ao acessar o Kinesis.

roleExternalId

String

Nenhum padrão.

Um valor opcional que pode ser usado ao delegar acesso à conta AWS. Consulte Como usar um ID externo.

roleSessionName

String

Nenhum padrão.

Um identificador para a sessão de função assumida que identifica exclusivamente uma sessão quando a mesma função é assumida por diferentes princípios ou por diferentes motivos.

coalesceThresholdBlockSize

Um número inteiro positivo.

10.000.000

O limite no qual ocorre a coalescência automática. Se o tamanho médio do bloco for menor que esse valor, os blocos pré-buscados serão agrupados em coalesceBinSize.

coalesceBinSize

Um número inteiro positivo.

128.000.000

O tamanho aproximado do bloco após a coalescência.

consumerMode

polling ou efo.

polling

Tipo de consumidor para executar a consulta de transmissão. Consulte Configurar Kinesis enhanced fan-out (EFO) para leituras de consulta de transmissão. Disponível em Databricks Runtime 11.3 LTS e acima.

requireConsumerDeregistration

true ou false.

false

Se o registro do consumidor de fan-out avançado deve ser cancelado no término da consulta. Requer efo para consumerMode. Disponível em Databricks Runtime 11.3 LTS e acima.

maxShardsPerDescribe

Um número inteiro positivo (máx. 10000).

100

O número máximo de fragmentos a serem lidos por chamada de API ao listar fragmentos.

consumerName

Nome único do consumidor a ser usado com todas as transmissões ou lista de nomes de consumidores separados por vírgulas.

ID da consulta de transmissão

Nome do consumidor usado para registrar a consulta com Kinesis serviço no modo EFO. Disponível em Databricks Runtime 11.3 LTS e acima.

consumerNamePrefix

Strings vazias ou strings de prefixo personalizadas.

databricks_

Prefixo usado junto com consumerName para registrar consumidores com o Kinesis serviço no modo EFO. Disponível em Databricks Runtime 16.0 e acima.

consumerRefreshInterval

A duração é definida, por exemplo, "1s" para 1 segundo (máx. 3600s).

300s

O intervalo em que o registro do consumidor do Kinesis EFO é verificado e atualizado. Disponível em Databricks Runtime 11.3 LTS e acima.

registeredConsumerId

Uma lista separada por vírgulas de nomes de consumidores ou ARNs.

Nenhuma

Identificadores para consumidores existentes no modo EFO. Disponível em Databricks Runtime 16.1 e acima.

registeredConsumerIdType

name ou ARN.

Nenhuma

Especifica se as IDs do consumidor são nomes ou ARNs. Disponível em Databricks Runtime 16.1 e acima.

Os valores de opção default permitem que dois leitores (Spark ou outros) consumam simultaneamente uma transmissão Kinesis sem atingir os limites de taxa Kinesis . Se você tiver mais consumidores, ajuste as opções de acordo. Por exemplo, você pode precisar reduzir maxFetchRate e aumentar minFetchPeriod.

Monitoramento e alertas de baixa latência

Os casos de uso de alertas exigem baixa latência. Para minimizar a latência:

  • Verifique se sua consulta de transmissão é a única consumidora da transmissão Kinesis para otimizar o desempenho de busca e evitar os limites de taxa Kinesis .
  • Defina a opção maxFetchDuration para um valor pequeno, como 200ms, para processar os dados obtidos o mais rápido possível. Essa opção representa um equilíbrio: ela prioriza uma velocidade de processamento mais rápida por lote em vez de garantir que os registros mais recentes sejam consumidos em cada lote. Por exemplo, se você usar Trigger.AvailableNow, um valor pequeno pode fazer com que sua consulta fique desatualizada em relação aos registros mais recentes na transmissão Kinesis .
  • Defina a opção minFetchPeriod para 210 ms para buscar com a maior frequência possível.
  • Defina a opção shardsPerTask ou configure o cluster de forma que # cores in cluster >= 2 * (# Kinesis shards) / shardsPerTask. Isso garante que a tarefa de pré-busca em segundo plano e a tarefa de consulta de transmissão sejam executadas simultaneamente.

Se sua consulta estiver recebendo dados a cada 5 segundos, você pode estar excedendo os limites de taxa do Kinesis. Revise suas configurações.

Monitorar métricas do Kinesis

Kinesis informa o número de milissegundos que um consumidor fica atrasado em relação ao início de uma transmissão para cada workspace. As métricas avgMsBehindLatest, maxMsBehindLatest e minMsBehindLatest fornecem a média, o mínimo e o máximo em milissegundos em todo o espaço de trabalho no processo de consulta de transmissão. Veja consultas de monitoramento transmissão estruturada no Databricks.

Se você estiver executando a transmissão em um Notebook, consulte as métricas na tab dados brutos no painel de progresso da consulta de transmissão. Eis um exemplo:

JSON
{
"sources": [
{
"description": "KinesisV2[stream]",
"metrics": {
"avgMsBehindLatest": "32000.0",
"maxMsBehindLatest": "32000",
"minMsBehindLatest": "32000"
}
}
]
}

Ingerir registros do Kinesis como lotes incrementais

Em Databricks Runtime 13.3 LTS e acima, Databricks suporta o uso de Trigger.AvailableNow com Kinesis fonte de dados para semântica de lotes incrementais. A seguir, descrevemos a configuração básica:

  1. Quando um micro-batch lê gatilhos no modo disponível agora, a hora atual é registrada pelo cliente Databricks.
  2. O Databricks pesquisa o sistema de origem para todos os registros com carimbos de data/hora entre esse tempo registrado e o ponto de verificação anterior.
  3. O Databricks carrega esses registros com Trigger.AvailableNow semântica.

Databricks usa um mecanismo de melhor esforço para tentar consumir todos os registros que existem na transmissão Kinesis durante a execução da consulta de transmissão. Devido a pequenas diferenças potenciais nos registros de data e hora e à falta de garantia na ordenação na fonte de dados, um lote acionado pode não incluir alguns registros. Os registros omitidos são processados nos próximos microlotes acionados.

nota

Se a consulta continuar falhando na busca de registros da transmissão Kinesis mesmo que haja registros, tente aumentar o valor maxFetchDuration.

Veja AvailableNow: Processamento incremental de lotes.

Escreva para a Kinesis

O seguinte trecho de código pode ser utilizado como um ForeachSink para gravar dados no Kinesis. Exige um Dataset[(String, Array[Byte])].

nota

O trecho de código a seguir fornece semântica pelo menos uma vez , não exatamente uma vez.

Kinesis Notebook Foreach Sink

Abrir notebook em uma nova aba

Recomendações para reduzir a latência com o Kinesis

Esta seção contém recomendações para solucionar diversas causas de latência na transmissão Kinesis .

A tarefa Spark de execução da fonte Kinesis é executada em uma thread em segundo plano para buscar periodicamente os dados Kinesis e, em seguida, armazená-los em cache na memória executor Spark . Após a conclusão de cada pré-busca ou etapa, a consulta de transmissão pode processar os dados armazenados em cache. A pré-busca ou passo afeta significativamente a latência ponta a ponta observada e a taxa de transferência.

Reduza a latência de pré-busca

Para otimizar a latência mínima da consulta e o uso máximo do recurso, use o cálculo a seguir:

total number of CPU cores in the cluster (across all executors) > = total number of Kinesis shards / shardsPerTask.

importante

minFetchPeriod pode criar várias chamadas de API GetRecords para o shard do Kinesis até que ele chegue a ReadProvisionedThroughputExceeded. Caso ocorra uma exceção, isso pode não ser um problema, pois o conector maximiza a utilização do shard do Kinesis.

Evite lentidões causadas por muitos erros de limite de taxa

O conector reduz a quantidade de dados lidos do Kinesis pela metade sempre que encontra um erro de limitação de taxa e registra esse evento no log com uma mensagem: "Hit rate limit. Sleeping for 5 seconds."

Você pode se deparar com esses erros enquanto uma transmissão está sendo processada. Se você observar esses erros após uma transmissão ser concluída, talvez seja necessário ajustar a carga de trabalho, aumentando a capacidade Kinesis na AWS ou ajustando as opções de pré-busca no Spark.

Evite derramar dados no disco

Se houver um aumento repentino no volume de dados em sua transmissão Kinesis , a capacidade do buffer alocado pode se esgotar e não ser liberada com rapidez suficiente para adicionar novos dados. Spark está gravando dados do buffer em disco, o que torna o processamento da transmissão mais lento, e um evento aparece no log com uma mensagem como a seguinte:

Bash
./log4j.txt:879546:20/03/02 17:15:04 INFO BlockManagerInfo: Updated kinesis_49290928_1_ef24cc00-abda-4acd-bb73-cb135aed175c on disk on 10.0.208.13:43458 (current size: 88.4 MB, original size: 0.0 B)

Para evitar o transbordamento, aumente a capacidade de memória do cluster adicionando mais nós ou aumentando a memória por nó, ou reduza o parâmetro de configuração fetchBufferSize.

Tarefa de gravação S3 suspensa

Habilite a especulação Spark para encerrar tarefas suspensas que impediriam o processamento da transmissão. Para garantir que as tarefas não sejam encerradas de forma muito agressiva, ajuste cuidadosamente o quantil e o multiplicador para essa configuração. A Databricks recomenda que você defina spark.speculation.multiplier como 3 e spark.speculation.quantile como 0.95 e ajuste conforme necessário.

Reduzir a latência do checkpoint em sistemas com estado

Databricks recomenda o uso do site RocksDB com checkpointing de changelog para consultas de transmissão com estado. Consulte Habilitar o checkpoint do registro de alterações.

Configure o Kinesis enhanced fan-out (EFO) para leituras de consulta de transmissão

No Databricks Runtime 11.3e acima, o conector Databricks Runtime Kinesis fornece suporte para usar o recurso de fan-out aprimorado (EFO) do Amazon Kinesis.

Kinesis enhanced fan-out fornece uma taxa de transferência dedicada de 2 MB/s por shard por consumidor (máximo de 20 consumidores por transmissão) e entrega registros no modo push em vez do modo pull.

Por default, uma consulta de transmissão estruturada configurada com modo EFO se registra como um consumidor com Taxa de transferência dedicada e um nome de consumidor exclusivo e ARN (nome de recurso Amazon ) do consumidor no Kinesis Data transmissão.

Por default, Databricks usa o ID da consulta de transmissão com o prefixo databricks_ para nomear o novo consumidor. Você pode, opcionalmente, especificar as opções consumerNamePrefix ou consumerName para substituir esse comportamento. O consumerName deve ser uma cadeia que contém apenas letras, números e os caracteres especiais _ . -.

Ao reiniciar a consulta, a fonte Kinesis usa o modo de polling para reproduzir os últimos lotes não confirmados, caso existam. Após a transmissão reproduzir os lotes não confirmados, a fonte retorna ao modo EFO para as leituras subsequentes.

importante

Um consumidor EFO registrado incorre em cobranças adicionais no Amazon Kinesis. Para cancelar o registro do consumidor automaticamente na desmontagem da consulta, defina a opção requireConsumerDeregistration como true. A Databricks não pode garantir o cancelamento do registro em eventos como falhas de driver ou falhas de nó. Em caso de falha no trabalho, o site Databricks recomenda gerenciar os consumidores registrados diretamente para evitar cobranças excessivas no site Kinesis.

Opções avançadas de configuração do consumidor

No Databricks Runtime 16.1 e versões superiores, configure as leituras no modo EFO para consumidores existentes usando as opções registeredConsumerId e registeredConsumerIdType:

Python
df = (spark.readStream
.format("kinesis")
.option("streamName", "mystreamname1,mystreamname2")
.option("registeredConsumerId", "consumer1,consumer2")
.option("registeredConsumerIdType", "name")
.load()
)

O senhor pode especificar os consumidores existentes usando o nome ou o ARN. O senhor deve fornecer um ID de consumidor para cada fonte de transmissão Kinesis especificada na configuração.

Gerenciamento de consumidores off-line usando um notebook Databricks

Use as utilidades AWSKinesisConsumerManager para registrar, listar ou cancelar o registro de consumidores para transmissão de dados Kinesis programaticamente, em vez de configurar manualmente os consumidores no console da sua account AWS . Por exemplo, use as utilidades para criar um consumidor para uma nova transmissão ou, se você planeja interromper permanentemente uma transmissão, use as utilidades para excluir o consumidor na AWS.

O gerenciador de consumidores `utilities` está disponível apenas em Scala com o modo de acesso dedicado definido para o parâmetro compute . Consulte Modos de acesso.

Para usar essas utilidades em um Notebook Databricks :

  1. Em um novo Notebook Databricks anexado a um cluster ativo, crie um AWSKinesisConsumerManager com as informações de autenticação necessárias.

    Scala
    import com.databricks.sql.kinesis.AWSKinesisConsumerManager

    val manager = AWSKinesisConsumerManager.newManager()
    .option("serviceCredential", serviceCredentialName)
    .option("region", kinesisRegion)
    .create()
  2. Liste e exiba os consumidores.

    Scala
    val consumers = manager.listConsumers("<stream name>")
    display(consumers)
  3. registrar um consumidor para determinada transmissão.

    Scala
    val consumerARN = manager.registerConsumer("<stream name>", "<consumer name>")
  4. Cancelar o cadastro de um consumidor para determinada transmissão.

    Scala
    manager.deregisterConsumer("<stream name>", "<consumer name>")