Conecte-se ao Amazon Kinesis
Este artigo descreve como você pode usar a transmissão estruturada para ler e gravar dados no Amazon Kinesis.
A Databricks recomenda que você habilite endpoint S3 VPC para garantir que todo o tráfego S3 seja roteado na rede AWS.
Observação
Se você excluir e recriar uma transmissão do Kinesis, não poderá reutilizar nenhum diretório de ponto de verificação existente para reiniciar uma query de transmissão. Você deve excluir os diretórios de pontos de verificação e começar essas query do zero. Você pode reestilhaçar com transmissão estruturada aumentando o número de shards sem interromper ou reiniciar a transmissão.
Consulte Recomendações para trabalhar com o Kinesis.
Autentique-se com o Amazon Kinesis
A Databricks recomenda gerenciar sua conexão com o Kinesis usando um instance profile. Consulte instance profile.
Aviso
não são compatíveis com o modo de acesso compartilhado. Use o modo de acesso de usuário único ou um método de autenticação alternativo com o modo de acesso compartilhado. Consulte Limitações do modo de acesso à computação para o Unity Catalog.
Se você deseja utilizar chaves para acesso, poderá fornecê-las utilizando as opções awsAccessKey
e awsSecretKey
.
Você também pode assumir uma função IAM utilizando a opção roleArn
. Opcionalmente, você pode especificar o ID externo com roleExternalId
e um nome de sessão com roleSessionName
. Para assumir uma função, você pode iniciar seu agrupamento com permissões para assumir a função ou fornecer chaves de acesso pelo awsAccessKey
e awsSecretKey
. Para autenticação de conta cruzada, a Databricks recomenda usar o roleArn
para manter o papel assumido, que pode então ser assumido através da sua conta do Databricks AWS. Para obter mais informações sobre autenticação entre contas, consulte Delegate Access Across AWS Accounts Using IAM Roles (Delegar acesso em todas as contas AWS usando funções de IAM).
Observação
A origem do Kinesis exige permissões de ListShards
, GetRecords
e GetShardIterator
. Se você encontrar Amazon: Access Denied
exceções, verifique se seu usuário ou perfil tem essas permissões. Consulte Controle de acesso aos recursos do Amazon Kinesis Data Streams com o IAM para obter mais detalhes.
Esquema
O Kinesis retorna registros com o seguinte esquema:
Coluna |
Tipo |
---|---|
partitionKey |
string |
dados |
binário |
transmissão |
string |
shardId |
string |
sequenceNumber |
string |
approximateArrivalTimestamp |
timestamp |
Para desserializar os dados na coluna data
, você pode converter o campo em strings.
Início rápido
O notebook a seguir demonstra como executar o WordCount com a Transmissão estruturada com Kinesis.
Configurar opções do Kinesis
Importante
Em Databricks Runtime 13.3 LTS e acima, o senhor pode usar Trigger.AvailableNow
com Kinesis. Consulte Ingerir registros Kinesis como um lote incremental.
Observação
No Databricks Runtime 16.1 e no acima, o senhor pode usar streamARN
para identificar as fontes do Kinesis.
Para todas as versões do Databricks Runtime, o senhor deve especificar streamName
ou streamARN
. Você só pode fornecer uma dessas opções.
Veja a seguir as configurações comuns para as fontes de dados do Kinesis:
Opção |
Valor |
Padrão |
Descrição |
---|---|---|---|
streamName |
Uma lista separada por vírgulas de nomes de transmissão. |
Nenhuma |
Os nomes dos transmissão para assinar. |
StreamArn |
Uma lista separada por vírgulas de Kinesis transmissão ARNs. Por exemplo, |
Nenhuma |
O ARNs de transmissão a ser assinado. Disponível em Databricks Runtime 16.1 e acima. |
região |
Região para as transmissões a serem especificadas. |
Região resolvida localmente |
A região em que as transmissões são definidas. |
endpoint |
Região para o fluxo de dados do Kinesis. |
Região resolvida localmente |
O ponto final regional do Kinesis Data Streams. |
initialPosition |
|
|
Por onde começar a ler na transmissão. Especifique |
maxRecordsPerFetch |
Um número inteiro positivo. |
10.000 |
Quantos registros serão lidos por solicitação de API ao Kinesis. O número de registros retornados pode, na verdade, ser maior, dependendo do fato de os sub-registros terem sido agregados em um único registro com a Kinesis Producer Library. |
maxFetchRate |
Número decimal positivo representando a taxa de dados em MB/s. |
1,0 (máx = 2,0) |
Quão rápido é a pré-busca de dados por fragmento. Isso serve para limitar a taxa de buscas e evitar a limitação do Kinesis. 2,0 Mb/s é a taxa máxima permitida pelo Kinesis. |
minFetchPeriod |
Uma string de duração, por exemplo, |
400ms (min = 200ms) |
Quanto tempo esperar entre as tentativas consecutivas de pré-busca. O objetivo é limitar a frequência das buscas e evitar a limitação do Kinesis. 200 ms é o mínimo, pois o Kinesis permite no máximo 5 buscas por segundo. |
maxFetchDuration |
Uma string de duração, por exemplo, |
10s |
Por quanto tempo você deve armazenar em buffer os novos dados pré-configurados antes de disponibilizá-los para processamento. |
fetchBufferSize |
Uma sequência de bytes, por exemplo, |
20 Gb |
Quantos dados devem ser armazenados em buffer para o próximo gatilho. Isso é usado como uma condição de parada e não como um limite superior estrito, portanto mais dados podem ser armazenados em buffer do que o especificado para esse valor. |
shardsPerTask |
Um número inteiro positivo. |
5 |
Quantos fragmentos do Kinesis devem ser buscados previamente em paralelo por tarefa do Spark. O ideal é |
shardFetchInterval |
Uma string de duração, por exemplo, |
1s |
Com que frequência pesquisar o Kinesis para resharding. |
awsAccessKey |
String |
Nenhum padrão. |
Chave de acesso AWS. |
awsSecretKey |
String |
Nenhum padrão. |
Chave de acesso secreta AWS correspondente à chave de acesso. |
roleArn |
String |
Nenhum padrão. |
O nome do recurso da Amazon (ARN) do papel a ser assumido ao acessar o Kinesis. |
roleExternalId |
String |
Nenhum padrão. |
Um valor opcional que pode ser usado ao delegar acesso à conta AWS. Consulte Como usar um ID externo. |
roleSessionName |
String |
Nenhum padrão. |
Um identificador para a sessão de função assumida que identifica exclusivamente uma sessão quando a mesma função é assumida por diferentes princípios ou por diferentes motivos. |
coalesceThresholdBlockSize |
Um número inteiro positivo. |
10.000.000 |
O limite no qual ocorre a coalescência automática. Se o tamanho médio do bloco for menor que esse valor, os blocos pré-buscados serão agrupados em |
coalesceBinSize |
Um número inteiro positivo. |
128.000.000 |
O tamanho aproximado do bloco após a coalescência. |
consumerMode |
|
|
Tipo de consumidor para executar a consulta de transmissão. Consulte Configurar Kinesis enhanced fan-out (EFO) para leituras de consulta de transmissão. Disponível em Databricks Runtime 11.3 LTS e acima. |
requireConsumerDeregistration |
|
|
Se o registro do consumidor de fan-out avançado deve ser cancelado no término da consulta. Requer |
Máximo de fragmentos por descrição |
Um número inteiro positivo (máx. 10000). |
100 |
O número máximo de fragmentos a serem lidos por chamada de API ao listar fragmentos. |
Nome do consumidor |
Nome único do consumidor a ser usado em todas as transmissões ou lista de nomes de consumidores separados por vírgula. |
ID da consulta de transmissão |
Nome do consumidor usado para registrar a consulta com Kinesis serviço no modo EFO. Disponível em Databricks Runtime 11.3 LTS e acima. |
Prefixo do nome do consumidor |
Strings vazias ou strings de prefixo personalizadas. |
bancos de dados_ |
Prefixo usado junto com consumerName para registrar consumidores com o Kinesis serviço no modo EFO. Disponível em Databricks Runtime 16.0 e acima. |
Intervalo de atualização do consumidor |
A duração é definida, por exemplo, "1s" para 1 segundo (máx. 3600s). |
300s |
O intervalo em que o registro do consumidor do Kinesis EFO é verificado e atualizado. Disponível em Databricks Runtime 11.3 LTS e acima. |
ID de consumidor registrado |
Uma lista separada por vírgulas de nomes de consumidores ou ARNs. |
Nenhuma |
Identificadores para consumidores existentes no modo EFO. Disponível em Databricks Runtime 16.1 e acima. |
Tipo de ID de consumidor registrado |
|
Nenhuma |
Especifica se as IDs do consumidor são nomes ou ARNs. Disponível em Databricks Runtime 16.1 e acima. |
Observação
Os valores padrão das opções foram escolhidos de modo que dois leitores (Spark ou outros) possam consumir simultaneamente uma transmissão do Kinesis sem atingir os limites de taxa do Kinesis. Se você tiver mais consumidores, deverá ajustar as opções de acordo. Por exemplo, pode ser necessário reduzir maxFetchRate
e aumentar minFetchPeriod
.
Monitoramento e alerta de baixa latência
Se você tiver um caso de uso de alerta, deverá usar latência menor. Para conseguir isso:
Faça com que haja somente um consumidor (ou seja, somente sua consulta de transmissão e mais ninguém) do fluxo do Kinesis, para que possamos otimizar sua única consulta de transmissão para buscar o mais rápido possível sem esbarrar nos limites de taxa do Kinesis.
Defina a opção
maxFetchDuration
com um valor pequeno (digamos, 200 ms) para começar a processar os dados buscados o mais rápido possível. Se você estiver usandoTrigger.AvailableNow
, isso aumenta as chances de não conseguir acompanhar os registros mais recentes na transmissão do Kinesis.Defina a opção
minFetchPeriod
como 210 ms para buscar com a maior frequência possível.Configure a opção
shardsPerTask
ou configure o agrupamento de forma que# cores in cluster >= 2 * (# Kinesis shards) / shardsPerTask
. Isso garante que as tarefas de pré-busca em segundo plano e as tarefas de consulta de transmissão possam ser executadas simultaneamente.
Se você perceber que sua consulta está recebendo dados a cada 5 segundos, é provável que esteja atingindo os limites de taxa do Kinesis. Revise suas configurações.
Quais métricas o Kinesis reporta?
O Kinesis informa o número de milissegundos que um consumidor ficou atrasado em relação ao início de uma transmissão para cada workspace. Você pode obter a média, o mínimo e o máximo do número de milissegundos entre todos os workspace no processo de consulta de transmissão como as métricas avgMsBehindLatest
, maxMsBehindLatest
e minMsBehindLatest
. Consulte o objeto de métricas de fontes (Kinesis).
Se você estiver executando a transmissão em um Notebook, poderá ver as métricas na dados brutos no tab query painel de progresso transmissão, como no exemplo a seguir:
{
"sources" : [ {
"description" : "KinesisV2[stream]",
"metrics" : {
"avgMsBehindLatest" : "32000.0",
"maxMsBehindLatest" : "32000",
"minMsBehindLatest" : "32000"
},
} ]
}
Ingerir registros do Kinesis como um lote incremental
Em Databricks Runtime 13.3 LTS e acima, Databricks suporta o uso de Trigger.AvailableNow
com Kinesis fonte de dados para semântica de lotes incrementais. A seguir, descrevemos a configuração básica:
Quando um micro-batch lê gatilhos no modo disponível agora, a hora atual é registrada pelo cliente Databricks.
O Databricks pesquisa o sistema de origem para todos os registros com carimbos de data/hora entre esse tempo registrado e o ponto de verificação anterior.
O Databricks carrega esses registros com
Trigger.AvailableNow
semântica.
O Databricks usa um mecanismo de melhor esforço para tentar consumir todos os registros existentes na(s) transmissão(ões) do Kinesis quando a query de transmissão é executada. Devido a pequenas diferenças potenciais nos carimbos de data/hora e à falta de garantia na ordenação na fonte de dados, alguns registros podem não ser incluídos em lotes acionados. Os registros omitidos são processados como parte dos próximos microlotes acionados.
Observação
Se a query continuar falhando ao buscar registros da transmissão do Kinesis mesmo que haja registros, tente aumentar o valor maxFetchDuration
.
Consulte Configurando o processamento de lotes incrementais.
Escreva para a Kinesis
O seguinte trecho de código pode ser utilizado como um ForeachSink
para gravar dados no Kinesis. Exige um Dataset[(String, Array[Byte])]
.
Observação
O trecho de código a seguir fornece semântica pelo menos uma vez, não exatamente uma vez.
Recomendações para trabalhar com Kinesis
query do Kinesis pode apresentar latência por vários motivos. Esta seção fornece recomendações para solucionar problemas de latência.
A origem do Kinesis executa Job do Spark em um thread em segundo plano para pré-buscar os dados do Kinesis periodicamente e armazená-los em cache na memória dos executores do Spark. A query transmitida processa os dados em cache após a conclusão de cada passo de pré-busca e disponibiliza os dados para processamento. A passo de pré-busca afeta significativamente a latência de ponta a ponta observada e a Taxa de transferência.
Reduza a latência de pré-busca
Para otimizar a latência mínima query e o uso máximo de recursos, use o seguinte cálculo:
total number of CPU cores in the cluster (across all executors)
>= total number of Kinesis shards
/ shardsPerTask
.
Importante
minFetchPeriod
pode criar várias chamadas de API GetRecords para o fragmento do Kinesis até atingir ReadProvisionedThroughputExceeded
. Se ocorrer uma exceção, isso não indica um problema, pois o conector maximiza a utilização do fragmento do Kinesis.
Evite lentidão causada por muitos erros de limite de taxa
O conector reduz pela metade a quantidade de dados lidos do Kinesis cada vez que encontra um erro de limitação de taxa e registra esse evento nos logs com uma mensagem: "Hit rate limit. Sleeping for 5 seconds."
É comum ver esses erros enquanto uma transmissão está sendo travada, mas depois disso, você não deverá mais ver esses erros. Se fizer isso, talvez seja necessário ajustar do lado do Kinesis (aumentando a capacidade) ou ajustar as opções de pré-busca.
Evite derramar dados no disco
Se você tiver um aumento repentino na transmissão do Kinesis, a capacidade do buffer atribuída poderá ficar cheia e o buffer não ser esvaziado rápido o suficiente para que novos dados sejam adicionados.
Nesses casos, o Spark derrama blocos do buffer para o disco e retarda o processamento, o que afeta o desempenho da transmissão. Este evento aparece nos logs com uma mensagem como esta:
./log4j.txt:879546:20/03/02 17:15:04 INFO BlockManagerInfo: Updated kinesis_49290928_1_ef24cc00-abda-4acd-bb73-cb135aed175c on disk on 10.0.208.13:43458 (current size: 88.4 MB, original size: 0.0 B)
Para resolver esse problema, tente aumentar a capacidade de memória clusters (adicione mais nós ou aumente a memória por nó) ou ajuste o parâmetro de configuração fetchBufferSize
.
Pendurando S3 escrever tarefa
Você pode ativar a especulação do Spark para encerrar a tarefa suspensa que impediria o processamento da transmissão de prosseguir. Para garantir que a tarefa não seja encerrada de forma muito agressiva, ajuste cuidadosamente o quantil e o multiplicador para esta configuração. Um bom ponto de partida é definir spark.speculation.multiplier
como 3
e spark.speculation.quantile
como 0.95
.
Reduza a latência associada ao checkpoint na transmissão stateful
Databricks recomenda usar RocksDB com checkpoint de changelog para query de transmissão com estado. Consulte Habilitar ponto de verificação do changelog.
Configurar a distribuição avançada do Kinesis (EFO) para leituras de consulta de streaming
No Databricks Runtime 11.3e acima, o conector Databricks Runtime Kinesis fornece suporte para usar o recurso de fan-out aprimorado (EFO) do Amazon Kinesis.
O fan-out aprimorado do Kinesis é um recurso que oferece suporte a consumidores de fluxo de fan-out aprimorado com uma taxa de transferência dedicada de 2 MB/s por fragmento, por consumidor (máximo de 20 consumidores por fluxo do Kinesis), e registra a entrega no modo push no lugar do modo pull.
Por meio do site default, uma consulta de transmissão estruturada configurada no modo EFO se registra como um consumidor com uma taxa de transferência dedicada e um nome de consumidor exclusivo e um consumidor ARN (Amazon recurso Number) em Kinesis Data transmission.
Por default, Databricks usa o ID da consulta de transmissão com o prefixo databricks_
para nomear o novo consumidor. Opcionalmente, você pode especificar as opções consumerNamePrefix
ou consumerName
para substituir esse comportamento. O endereço consumerName
deve ser uma cadeia de caracteres composta de letras, números e caracteres especiais _ . -
.
Importante
Um consumidor registrado do EFO incorre em taxas adicionais na Amazon Kinesis. Para cancelar o registro do consumidor automaticamente na desativação da consulta, defina a opção requireConsumerDeregistration
como true
. O Databricks não pode garantir o cancelamento do registro em eventos como falhas no driver ou falhas nos nós. Em caso de falha no trabalho, a Databricks recomenda gerenciar os consumidores registrados diretamente para evitar cobranças excessivas de Kinesis.
Opções avançadas de configuração do consumidor
Em Databricks Runtime 16.1 e acima, o senhor pode configurar as leituras no modo EFO usando os consumidores existentes por meio das opções registeredConsumerId
e registeredConsumerIdType
, como no exemplo a seguir:
df = (spark.readStream
.format("kinesis")
.option("streamName", "mystreamname1,mystreamname2")
.option("registeredConsumerId", "consumer1,consumer2")
.option("registeredConsumerIdType", "name")
.load()
)
val kinesis = spark.readStream
.format("kinesis")
.option("streamName", "mystreamname1,mystreamname2")
.option("registeredConsumerId", "consumer1,consumer2")
.option("registeredConsumerIdType", "name")
.load()
O senhor pode especificar os consumidores existentes usando o nome ou o ARN. O senhor deve fornecer um ID de consumidor para cada fonte de transmissão Kinesis especificada na configuração.
Gerenciamento de consumidores off-line com um notebook Databricks
A Databricks oferece um utilitário de gerenciamento de consumidores para registrar, listar ou cancelar o registro de consumidores associados aos fluxos de dados do Kinesis. O seguinte código demonstra o uso desse utilitário em um notebook Databricks:
Em um novo caderno de dados anexado a um agrupamento ativo, crie um
AWSKinesisConsumerManager
fornecendo as informações de autenticação necessárias.import com.databricks.sql.kinesis.AWSKinesisConsumerManager val manager = AWSKinesisConsumerManager.newManager() .option("awsAccessKey", awsAccessKeyId) .option("awsSecretKey", awsSecretKey) .option("region", kinesisRegion) .create()
Liste e exiba os consumidores.
val consumers = manager.listConsumers("<stream name>") display(consumers)
Registre o consumidor do fluxo informado.
val consumerARN = manager.registerConsumer("<stream name>", "<consumer name>")
Cancele o registro do consumidor da transmissão informada.
manager.deregisterConsumer("<stream name>", "<consumer name>")