Conecte-se ao Amazon Kinesis

Este artigo descreve como você pode usar a transmissão estruturada para ler e gravar dados no Amazon Kinesis.

A Databricks recomenda que você habilite endpoint S3 VPC para garantir que todo o tráfego S3 seja roteado na rede AWS.

Observação

Se você excluir e recriar uma transmissão do Kinesis, não poderá reutilizar nenhum diretório de ponto de verificação existente para reiniciar uma query de transmissão. Você deve excluir os diretórios de pontos de verificação e começar essas query do zero. Você pode reestilhaçar com transmissão estruturada aumentando o número de shards sem interromper ou reiniciar a transmissão.

Consulte Recomendações para trabalhar com o Kinesis.

Autentique-se com o Amazon Kinesis

A Databricks recomenda gerenciar sua conexão com o Kinesis usando um instance profile. Consulte instance profile.

Aviso

não são compatíveis com o modo de acesso compartilhado. Use o modo de acesso de usuário único ou um método de autenticação alternativo com o modo de acesso compartilhado. Consulte Limitações do modo de acesso à computação para o Unity Catalog.

Se você deseja utilizar chaves para acesso, poderá fornecê-las utilizando as opções awsAccessKey e awsSecretKey.

Você também pode assumir uma função IAM utilizando a opção roleArn. Opcionalmente, você pode especificar o ID externo com roleExternalId e um nome de sessão com roleSessionName. Para assumir uma função, você pode iniciar seu agrupamento com permissões para assumir a função ou fornecer chaves de acesso pelo awsAccessKey e awsSecretKey. Para autenticação de conta cruzada, a Databricks recomenda usar o roleArn para manter o papel assumido, que pode então ser assumido através da sua conta do Databricks AWS. Para obter mais informações sobre autenticação entre contas, consulte Delegate Access Across AWS Accounts Using IAM Roles (Delegar acesso em todas as contas AWS usando funções de IAM).

Observação

A origem do Kinesis exige permissões de ListShards, GetRecords e GetShardIterator. Se você encontrar Amazon: Access Denied exceções, verifique se seu usuário ou perfil tem essas permissões. Consulte Controle de acesso aos recursos do Amazon Kinesis Data Streams com o IAM para obter mais detalhes.

Esquema

O Kinesis retorna registros com o seguinte esquema:

Coluna

Tipo

partitionKey

string

dados

binário

transmissão

string

shardId

string

sequenceNumber

string

approximateArrivalTimestamp

timestamp

Para desserializar os dados na coluna data, você pode converter o campo em strings.

Início rápido

O notebook a seguir demonstra como executar o WordCount com a Transmissão estruturada com Kinesis.

Kinesis WordCount com notebook de transmissão estruturada

Abra o bloco de anotações em outra guia

Configurar opções do Kinesis

Importante

Em Databricks Runtime 13.3 LTS e acima, o senhor pode usar Trigger.AvailableNow com Kinesis. Consulte Ingerir registros Kinesis como um lote incremental.

Observação

No Databricks Runtime 16.1 e no acima, o senhor pode usar streamARN para identificar as fontes do Kinesis.

Para todas as versões do Databricks Runtime, o senhor deve especificar streamName ou streamARN. Você só pode fornecer uma dessas opções.

Veja a seguir as configurações comuns para as fontes de dados do Kinesis:

Opção

Valor

Padrão

Descrição

streamName

Uma lista separada por vírgulas de nomes de transmissão.

Nenhuma

Os nomes dos transmissão para assinar.

StreamArn

Uma lista separada por vírgulas de Kinesis transmissão ARNs. Por exemplo, "arn:aws:kinesis:myarn1,arn:aws:kinesis:myarn2".

Nenhuma

O ARNs de transmissão a ser assinado. Disponível em Databricks Runtime 16.1 e acima.

região

Região para as transmissões a serem especificadas.

Região resolvida localmente

A região em que as transmissões são definidas.

endpoint

Região para o fluxo de dados do Kinesis.

Região resolvida localmente

O ponto final regional do Kinesis Data Streams.

initialPosition

latest, trim_horizon, earliest (alias de trim_horizon), at_timestamp.

latest

Por onde começar a ler na transmissão.

Especifique at_timestamp como strings JSON usando o formato default Java para carimbos de data/hora, como {"at_timestamp": "06/25/2020 10:23:45 PDT"}. A query de transmissão lê todas as alterações no carimbo de data/hora fornecido ou após ele (inclusive). Você pode especificar formatos explicitamente fornecendo um campo adicional nas strings JSON, como {"at_timestamp": "06/25/2020 10:23:45 PDT", "format": "MM/dd/yyyy HH:mm:ss ZZZ"}.

maxRecordsPerFetch

Um número inteiro positivo.

10.000

Quantos registros serão lidos por solicitação de API ao Kinesis. O número de registros retornados pode, na verdade, ser maior, dependendo do fato de os sub-registros terem sido agregados em um único registro com a Kinesis Producer Library.

maxFetchRate

Número decimal positivo representando a taxa de dados em MB/s.

1,0 (máx = 2,0)

Quão rápido é a pré-busca de dados por fragmento. Isso serve para limitar a taxa de buscas e evitar a limitação do Kinesis. 2,0 Mb/s é a taxa máxima permitida pelo Kinesis.

minFetchPeriod

Uma string de duração, por exemplo, 1s por 1 segundo.

400ms (min = 200ms)

Quanto tempo esperar entre as tentativas consecutivas de pré-busca. O objetivo é limitar a frequência das buscas e evitar a limitação do Kinesis. 200 ms é o mínimo, pois o Kinesis permite no máximo 5 buscas por segundo.

maxFetchDuration

Uma string de duração, por exemplo, 1m por 1 minuto.

10s

Por quanto tempo você deve armazenar em buffer os novos dados pré-configurados antes de disponibilizá-los para processamento.

fetchBufferSize

Uma sequência de bytes, por exemplo, 2gb ou 10mb.

20 Gb

Quantos dados devem ser armazenados em buffer para o próximo gatilho. Isso é usado como uma condição de parada e não como um limite superior estrito, portanto mais dados podem ser armazenados em buffer do que o especificado para esse valor.

shardsPerTask

Um número inteiro positivo.

5

Quantos fragmentos do Kinesis devem ser buscados previamente em paralelo por tarefa do Spark. O ideal é # cores in cluster >= # Kinesis shards / shardsPerTaskpara latência mínima de consulta e uso máximo de recursos.

shardFetchInterval

Uma string de duração, por exemplo, 2m por 2 minutos.

1s

Com que frequência pesquisar o Kinesis para resharding.

awsAccessKey

String

Nenhum padrão.

Chave de acesso AWS.

awsSecretKey

String

Nenhum padrão.

Chave de acesso secreta AWS correspondente à chave de acesso.

roleArn

String

Nenhum padrão.

O nome do recurso da Amazon (ARN) do papel a ser assumido ao acessar o Kinesis.

roleExternalId

String

Nenhum padrão.

Um valor opcional que pode ser usado ao delegar acesso à conta AWS. Consulte Como usar um ID externo.

roleSessionName

String

Nenhum padrão.

Um identificador para a sessão de função assumida que identifica exclusivamente uma sessão quando a mesma função é assumida por diferentes princípios ou por diferentes motivos.

coalesceThresholdBlockSize

Um número inteiro positivo.

10.000.000

O limite no qual ocorre a coalescência automática. Se o tamanho médio do bloco for menor que esse valor, os blocos pré-buscados serão agrupados em coalesceBinSize.

coalesceBinSize

Um número inteiro positivo.

128.000.000

O tamanho aproximado do bloco após a coalescência.

consumerMode

polling ou efo.

polling

Tipo de consumidor para executar a consulta de transmissão. Consulte Configurar Kinesis enhanced fan-out (EFO) para leituras de consulta de transmissão. Disponível em Databricks Runtime 11.3 LTS e acima.

requireConsumerDeregistration

true ou false.

false

Se o registro do consumidor de fan-out avançado deve ser cancelado no término da consulta. Requer efo para consumerMode. Disponível em Databricks Runtime 11.3 LTS e acima.

Máximo de fragmentos por descrição

Um número inteiro positivo (máx. 10000).

100

O número máximo de fragmentos a serem lidos por chamada de API ao listar fragmentos.

Nome do consumidor

Nome único do consumidor a ser usado em todas as transmissões ou lista de nomes de consumidores separados por vírgula.

ID da consulta de transmissão

Nome do consumidor usado para registrar a consulta com Kinesis serviço no modo EFO. Disponível em Databricks Runtime 11.3 LTS e acima.

Prefixo do nome do consumidor

Strings vazias ou strings de prefixo personalizadas.

bancos de dados_

Prefixo usado junto com consumerName para registrar consumidores com o Kinesis serviço no modo EFO. Disponível em Databricks Runtime 16.0 e acima.

Intervalo de atualização do consumidor

A duração é definida, por exemplo, "1s" para 1 segundo (máx. 3600s).

300s

O intervalo em que o registro do consumidor do Kinesis EFO é verificado e atualizado. Disponível em Databricks Runtime 11.3 LTS e acima.

ID de consumidor registrado

Uma lista separada por vírgulas de nomes de consumidores ou ARNs.

Nenhuma

Identificadores para consumidores existentes no modo EFO. Disponível em Databricks Runtime 16.1 e acima.

Tipo de ID de consumidor registrado

name ou ARN.

Nenhuma

Especifica se as IDs do consumidor são nomes ou ARNs. Disponível em Databricks Runtime 16.1 e acima.

Observação

Os valores padrão das opções foram escolhidos de modo que dois leitores (Spark ou outros) possam consumir simultaneamente uma transmissão do Kinesis sem atingir os limites de taxa do Kinesis. Se você tiver mais consumidores, deverá ajustar as opções de acordo. Por exemplo, pode ser necessário reduzir maxFetchRate e aumentar minFetchPeriod.

Monitoramento e alerta de baixa latência

Se você tiver um caso de uso de alerta, deverá usar latência menor. Para conseguir isso:

  • Faça com que haja somente um consumidor (ou seja, somente sua consulta de transmissão e mais ninguém) do fluxo do Kinesis, para que possamos otimizar sua única consulta de transmissão para buscar o mais rápido possível sem esbarrar nos limites de taxa do Kinesis.

  • Defina a opção maxFetchDuration com um valor pequeno (digamos, 200 ms) para começar a processar os dados buscados o mais rápido possível. Se você estiver usando Trigger.AvailableNow, isso aumenta as chances de não conseguir acompanhar os registros mais recentes na transmissão do Kinesis.

  • Defina a opção minFetchPeriod como 210 ms para buscar com a maior frequência possível.

  • Configure a opção shardsPerTask ou configure o agrupamento de forma que # cores in cluster >= 2 * (# Kinesis shards) / shardsPerTask. Isso garante que as tarefas de pré-busca em segundo plano e as tarefas de consulta de transmissão possam ser executadas simultaneamente.

Se você perceber que sua consulta está recebendo dados a cada 5 segundos, é provável que esteja atingindo os limites de taxa do Kinesis. Revise suas configurações.

Quais métricas o Kinesis reporta?

O Kinesis informa o número de milissegundos que um consumidor ficou atrasado em relação ao início de uma transmissão para cada workspace. Você pode obter a média, o mínimo e o máximo do número de milissegundos entre todos os workspace no processo de consulta de transmissão como as métricas avgMsBehindLatest, maxMsBehindLatest e minMsBehindLatest. Consulte o objeto de métricas de fontes (Kinesis).

Se você estiver executando a transmissão em um Notebook, poderá ver as métricas na dados brutos no tab query painel de progresso transmissão, como no exemplo a seguir:

{
  "sources" : [ {
    "description" : "KinesisV2[stream]",
    "metrics" : {
      "avgMsBehindLatest" : "32000.0",
      "maxMsBehindLatest" : "32000",
      "minMsBehindLatest" : "32000"
    },
  } ]
}

Ingerir registros do Kinesis como um lote incremental

Em Databricks Runtime 13.3 LTS e acima, Databricks suporta o uso de Trigger.AvailableNow com Kinesis fonte de dados para semântica de lotes incrementais. A seguir, descrevemos a configuração básica:

  1. Quando um micro-batch lê gatilhos no modo disponível agora, a hora atual é registrada pelo cliente Databricks.

  2. O Databricks pesquisa o sistema de origem para todos os registros com carimbos de data/hora entre esse tempo registrado e o ponto de verificação anterior.

  3. O Databricks carrega esses registros com Trigger.AvailableNow semântica.

O Databricks usa um mecanismo de melhor esforço para tentar consumir todos os registros existentes na(s) transmissão(ões) do Kinesis quando a query de transmissão é executada. Devido a pequenas diferenças potenciais nos carimbos de data/hora e à falta de garantia na ordenação na fonte de dados, alguns registros podem não ser incluídos em lotes acionados. Os registros omitidos são processados como parte dos próximos microlotes acionados.

Observação

Se a query continuar falhando ao buscar registros da transmissão do Kinesis mesmo que haja registros, tente aumentar o valor maxFetchDuration.

Consulte Configurando o processamento de lotes incrementais.

Escreva para a Kinesis

O seguinte trecho de código pode ser utilizado como um ForeachSink para gravar dados no Kinesis. Exige um Dataset[(String, Array[Byte])].

Observação

O trecho de código a seguir fornece semântica pelo menos uma vez, não exatamente uma vez.

Notebook Kinesis Foreach Sink

Abra o bloco de anotações em outra guia

Recomendações para trabalhar com Kinesis

query do Kinesis pode apresentar latência por vários motivos. Esta seção fornece recomendações para solucionar problemas de latência.

A origem do Kinesis executa Job do Spark em um thread em segundo plano para pré-buscar os dados do Kinesis periodicamente e armazená-los em cache na memória dos executores do Spark. A query transmitida processa os dados em cache após a conclusão de cada passo de pré-busca e disponibiliza os dados para processamento. A passo de pré-busca afeta significativamente a latência de ponta a ponta observada e a Taxa de transferência.

Reduza a latência de pré-busca

Para otimizar a latência mínima query e o uso máximo de recursos, use o seguinte cálculo:

total number of CPU cores in the cluster (across all executors) >= total number of Kinesis shards / shardsPerTask.

Importante

minFetchPeriod pode criar várias chamadas de API GetRecords para o fragmento do Kinesis até atingir ReadProvisionedThroughputExceeded. Se ocorrer uma exceção, isso não indica um problema, pois o conector maximiza a utilização do fragmento do Kinesis.

Evite lentidão causada por muitos erros de limite de taxa

O conector reduz pela metade a quantidade de dados lidos do Kinesis cada vez que encontra um erro de limitação de taxa e registra esse evento nos logs com uma mensagem: "Hit rate limit. Sleeping for 5 seconds."

É comum ver esses erros enquanto uma transmissão está sendo travada, mas depois disso, você não deverá mais ver esses erros. Se fizer isso, talvez seja necessário ajustar do lado do Kinesis (aumentando a capacidade) ou ajustar as opções de pré-busca.

Evite derramar dados no disco

Se você tiver um aumento repentino na transmissão do Kinesis, a capacidade do buffer atribuída poderá ficar cheia e o buffer não ser esvaziado rápido o suficiente para que novos dados sejam adicionados.

Nesses casos, o Spark derrama blocos do buffer para o disco e retarda o processamento, o que afeta o desempenho da transmissão. Este evento aparece nos logs com uma mensagem como esta:

./log4j.txt:879546:20/03/02 17:15:04 INFO BlockManagerInfo: Updated kinesis_49290928_1_ef24cc00-abda-4acd-bb73-cb135aed175c on disk on 10.0.208.13:43458 (current size: 88.4 MB, original size: 0.0 B)

Para resolver esse problema, tente aumentar a capacidade de memória clusters (adicione mais nós ou aumente a memória por nó) ou ajuste o parâmetro de configuração fetchBufferSize.

Pendurando S3 escrever tarefa

Você pode ativar a especulação do Spark para encerrar a tarefa suspensa que impediria o processamento da transmissão de prosseguir. Para garantir que a tarefa não seja encerrada de forma muito agressiva, ajuste cuidadosamente o quantil e o multiplicador para esta configuração. Um bom ponto de partida é definir spark.speculation.multiplier como 3 e spark.speculation.quantile como 0.95.

Reduza a latência associada ao checkpoint na transmissão stateful

Databricks recomenda usar RocksDB com checkpoint de changelog para query de transmissão com estado. Consulte Habilitar ponto de verificação do changelog.

Configurar a distribuição avançada do Kinesis (EFO) para leituras de consulta de streaming

No Databricks Runtime 11.3e acima, o conector Databricks Runtime Kinesis fornece suporte para usar o recurso de fan-out aprimorado (EFO) do Amazon Kinesis.

O fan-out aprimorado do Kinesis é um recurso que oferece suporte a consumidores de fluxo de fan-out aprimorado com uma taxa de transferência dedicada de 2 MB/s por fragmento, por consumidor (máximo de 20 consumidores por fluxo do Kinesis), e registra a entrega no modo push no lugar do modo pull.

Por meio do site default, uma consulta de transmissão estruturada configurada no modo EFO se registra como um consumidor com uma taxa de transferência dedicada e um nome de consumidor exclusivo e um consumidor ARN (Amazon recurso Number) em Kinesis Data transmission.

Por default, Databricks usa o ID da consulta de transmissão com o prefixo databricks_ para nomear o novo consumidor. Opcionalmente, você pode especificar as opções consumerNamePrefix ou consumerName para substituir esse comportamento. O endereço consumerName deve ser uma cadeia de caracteres composta de letras, números e caracteres especiais _ . -.

Importante

Um consumidor registrado do EFO incorre em taxas adicionais na Amazon Kinesis. Para cancelar o registro do consumidor automaticamente na desativação da consulta, defina a opção requireConsumerDeregistration como true. O Databricks não pode garantir o cancelamento do registro em eventos como falhas no driver ou falhas nos nós. Em caso de falha no trabalho, a Databricks recomenda gerenciar os consumidores registrados diretamente para evitar cobranças excessivas de Kinesis.

Opções avançadas de configuração do consumidor

Em Databricks Runtime 16.1 e acima, o senhor pode configurar as leituras no modo EFO usando os consumidores existentes por meio das opções registeredConsumerId e registeredConsumerIdType, como no exemplo a seguir:

df = (spark.readStream
  .format("kinesis")
  .option("streamName", "mystreamname1,mystreamname2")
  .option("registeredConsumerId", "consumer1,consumer2")
  .option("registeredConsumerIdType", "name")
  .load()
)
val kinesis = spark.readStream
  .format("kinesis")
  .option("streamName", "mystreamname1,mystreamname2")
  .option("registeredConsumerId", "consumer1,consumer2")
  .option("registeredConsumerIdType", "name")
  .load()

O senhor pode especificar os consumidores existentes usando o nome ou o ARN. O senhor deve fornecer um ID de consumidor para cada fonte de transmissão Kinesis especificada na configuração.

Gerenciamento de consumidores off-line com um notebook Databricks

A Databricks oferece um utilitário de gerenciamento de consumidores para registrar, listar ou cancelar o registro de consumidores associados aos fluxos de dados do Kinesis. O seguinte código demonstra o uso desse utilitário em um notebook Databricks:

  1. Em um novo caderno de dados anexado a um agrupamento ativo, crie um AWSKinesisConsumerManager fornecendo as informações de autenticação necessárias.

    import com.databricks.sql.kinesis.AWSKinesisConsumerManager
    
    val manager = AWSKinesisConsumerManager.newManager()
    .option("awsAccessKey", awsAccessKeyId)
    .option("awsSecretKey", awsSecretKey)
    .option("region", kinesisRegion)
    .create()
    
  2. Liste e exiba os consumidores.

    val consumers = manager.listConsumers("<stream name>")
    display(consumers)
    
  3. Registre o consumidor do fluxo informado.

    val consumerARN = manager.registerConsumer("<stream name>", "<consumer name>")
    
  4. Cancele o registro do consumidor da transmissão informada.

    manager.deregisterConsumer("<stream name>", "<consumer name>")