Conecte-se ao Amazon Kinesis
Este artigo descreve como o senhor pode usar a transmissão estruturada para ler e gravar dados em Amazon Kinesis.
Databricks recomenda que o senhor ative o endpoint S3 VPC para garantir que todo o tráfego S3 seja roteado na rede AWS.
Se o senhor excluir e recriar uma transmissão Kinesis, não poderá reutilizar nenhum diretório de ponto de verificação existente para reiniciar uma consulta de transmissão. O senhor deve excluir os diretórios de ponto de verificação e iniciar essas consultas do zero. O senhor pode fazer o reshard com a transmissão estruturada aumentando o número de shards sem interromper ou reiniciar a transmissão.
Consulte Recomendações para trabalhar com o Kinesis.
Autenticação com o Amazon Kinesis
A Databricks recomenda gerenciar sua conexão com o Kinesis usando uma credencial de serviço da Databricks. Consulte gerenciar o acesso ao serviço de nuvem externo usando credenciais de serviço. Databricks As credenciais de serviço requerem Databricks Runtime 16.2 e acima.
Para usar uma credencial de serviço, faça o seguinte:
- Crie uma credencial de serviço Databricks usando um IAM role com as permissões necessárias para acessar Kinesis. Consulte a Etapa 1: Criar um IAM role.
- Forneça o nome da credencial de serviço usando a opção
serviceCredential
ao definir uma transmissão lida.
A fonte Kinesis requer as permissões ListShards
, GetRecords
e GetShardIterator
. Se o senhor encontrar Amazon: Access Denied
exceções, verifique se o site IAM role tem essas permissões. Consulte Controle de acesso a Amazon Kinesis Transmissão de dados recurso Usando IAM para obter mais detalhes.
Métodos alternativos de autenticação
Se as credenciais de serviço da Databricks não estiverem disponíveis, a Databricks fornece os seguintes métodos alternativos de autenticação.
perfil da instância
Anexar um instance profile durante a configuração do compute. Veja o perfil da instância.
não são compatíveis com o modo de acesso padrão (antigo modo de acesso compartilhado). Consulte as limitações do modo de acesso à computação para Unity Catalog.
Usar a chave diretamente
Para usar a chave de acesso, forneça-a durante a configuração da leitura usando as opções awsAccessKey
e awsSecretKey
.
Se estiver usando chave, o senhor deve armazená-las usando Databricks secrets. Veja Gerenciamento secreto.
Assumir IAM role
Algumas configurações do compute permitem que o senhor assuma um IAM role usando a opção roleArn
. Para assumir uma função, o senhor pode iniciar o clustering com permissões para assumir a função ou fornecer a chave de acesso por meio de awsAccessKey
e awsSecretKey
.
Esse método é compatível com a autenticação entreaccount. Para obter mais informações sobreaccount autenticação entre contas, consulte Delegar acesso entre AWS contas usando IAMa função.
Opcionalmente, você pode especificar a ID externa com roleExternalId
e um nome de sessão com roleSessionName
.
Esquema
O Kinesis retorna registros com o seguinte esquema:
Coluna | Tipo |
---|---|
partitionKey | string |
dados | binário |
transmissão | string |
shardId | string |
sequenceNumber | string |
approximateArrivalTimestamp | carimbo de data/hora |
Para desserializar os dados da coluna data
, o senhor pode converter o campo em uma cadeia de caracteres.
Início rápido
O notebook a seguir demonstra como executar o WordCount com a Transmissão estruturada com Kinesis.
Kinesis WordCount com o Notebook Transmissão Estruturada
Configurar as opções do Kinesis
Em Databricks Runtime 13.3 LTS e acima, o senhor pode usar Trigger.AvailableNow
com Kinesis. Consulte Ingerir registros Kinesis como um lote incremental.
No Databricks Runtime 16.1 e no acima, o senhor pode usar streamARN
para identificar as fontes do Kinesis.
Para todas as versões do Databricks Runtime, o senhor deve especificar streamName
ou streamARN
. Você só pode fornecer uma dessas opções.
Veja a seguir as configurações comuns para as fontes de dados do Kinesis:
Opção | Valor | Padrão | Descrição |
---|---|---|---|
streamName | Uma lista separada por vírgulas de nomes de transmissão. | Nenhuma | Os nomes dos transmissão para assinar. |
StreamArn | Uma lista separada por vírgulas de Kinesis transmissão ARNs. Por exemplo, | Nenhuma | A ARNs de transmissão a ser assinada. Disponível em Databricks Runtime 16.1 e acima. |
região | Região para as transmissões a serem especificadas. | Região resolvida localmente | A região em que as transmissões são definidas. |
endpoint | Região para o fluxo de dados do Kinesis. | Região resolvida localmente | O ponto final regional do Kinesis Data Streams. |
initialPosition |
|
| Por onde começar a ler na transmissão. Especifique |
maxRecordsPerFetch | Um número inteiro positivo. | 10.000 | Quantos registros serão lidos por solicitação de API ao Kinesis. O número de registros retornados pode, na verdade, ser maior, dependendo do fato de os sub-registros terem sido agregados em um único registro com a Kinesis Producer Library. |
maxFetchRate | Número decimal positivo representando a taxa de dados em MB/s. | 1,0 (máx = 2,0) | Quão rápido é a pré-busca de dados por fragmento. Isso serve para limitar a taxa de buscas e evitar a limitação do Kinesis. 2,0 Mb/s é a taxa máxima permitida pelo Kinesis. |
minFetchPeriod | Uma string de duração, por exemplo, | 400ms (min = 200ms) | Quanto tempo esperar entre as tentativas consecutivas de pré-busca. O objetivo é limitar a frequência das buscas e evitar a limitação do Kinesis. 200 ms é o mínimo, pois o Kinesis permite no máximo 5 buscas por segundo. |
maxFetchDuration | Uma string de duração, por exemplo, | 10s | Por quanto tempo você deve armazenar em buffer os novos dados pré-configurados antes de disponibilizá-los para processamento. |
fetchBufferSize | Uma sequência de bytes, por exemplo, | 20 Gb | Quantos dados devem ser armazenados em buffer para o próximo gatilho. Isso é usado como uma condição de parada e não como um limite superior estrito, portanto mais dados podem ser armazenados em buffer do que o especificado para esse valor. |
shardsPerTask | Um número inteiro positivo. | 5 | Quantos fragmentos do Kinesis devem ser buscados previamente em paralelo por tarefa do Spark. O ideal é |
shardFetchInterval | Uma string de duração, por exemplo, | 1s | Com que frequência pesquisar o Kinesis para resharding. |
Credencial de serviço | String | Nenhum padrão. | O nome de sua credencial de serviço Databricks. Consulte gerenciar o acesso ao serviço de nuvem externo usando credenciais de serviço. |
awsAccessKey | String | Nenhum padrão. | Chave de acesso AWS. |
awsSecretKey | String | Nenhum padrão. | Chave de acesso secreta AWS correspondente à chave de acesso. |
roleArn | String | Nenhum padrão. | O nome do recurso da Amazon (ARN) do papel a ser assumido ao acessar o Kinesis. |
roleExternalId | String | Nenhum padrão. | Um valor opcional que pode ser usado ao delegar acesso à conta AWS. Consulte Como usar um ID externo. |
roleSessionName | String | Nenhum padrão. | Um identificador para a sessão de função assumida que identifica exclusivamente uma sessão quando a mesma função é assumida por diferentes princípios ou por diferentes motivos. |
coalesceThresholdBlockSize | Um número inteiro positivo. | 10.000.000 | O limite no qual ocorre a coalescência automática. Se o tamanho médio do bloco for menor que esse valor, os blocos pré-buscados serão agrupados em |
coalesceBinSize | Um número inteiro positivo. | 128.000.000 | O tamanho aproximado do bloco após a coalescência. |
consumerMode |
|
| Tipo de consumidor para executar a consulta de transmissão. Consulte Configurar Kinesis enhanced fan-out (EFO) para leituras de consulta de transmissão. Disponível em Databricks Runtime 11.3 LTS e acima. |
requireConsumerDeregistration |
|
| Se o registro do consumidor de fan-out avançado deve ser cancelado no término da consulta. Requer |
Máximo de fragmentos por descrição | Um número inteiro positivo (máx. 10000). | 100 | O número máximo de fragmentos a serem lidos por chamada de API ao listar fragmentos. |
Nome do consumidor | Nome único do consumidor a ser usado em todas as transmissões ou lista de nomes de consumidores separados por vírgula. | ID da consulta de transmissão | Nome do consumidor usado para registrar a consulta com Kinesis serviço no modo EFO. Disponível em Databricks Runtime 11.3 LTS e acima. |
Prefixo do nome do consumidor | Strings vazias ou strings de prefixo personalizadas. | bancos de dados_ | Prefixo usado junto com consumerName para registrar consumidores com o Kinesis serviço no modo EFO. Disponível em Databricks Runtime 16.0 e acima. |
Intervalo de atualização do consumidor | A duração é definida, por exemplo, "1s" para 1 segundo (máx. 3600s). | 300s | O intervalo em que o registro do consumidor do Kinesis EFO é verificado e atualizado. Disponível em Databricks Runtime 11.3 LTS e acima. |
ID de consumidor registrado | Uma lista separada por vírgulas de nomes de consumidores ou ARNs. | Nenhuma | Identificadores para consumidores existentes no modo EFO. Disponível em Databricks Runtime 16.1 e acima. |
Tipo de ID de consumidor registrado |
| Nenhuma | Especifica se as IDs do consumidor são nomes ou ARNs. Disponível em Databricks Runtime 16.1 e acima. |
Os valores padrão das opções foram escolhidos de modo que dois leitores (Spark ou outros) possam consumir simultaneamente uma transmissão do Kinesis sem atingir os limites de taxa do Kinesis. Se você tiver mais consumidores, deverá ajustar as opções de acordo. Por exemplo, pode ser necessário reduzir maxFetchRate
e aumentar minFetchPeriod
.
Monitoramento e alertas de baixa latência
Se você tiver um caso de uso de alerta, deverá usar latência menor. Para conseguir isso:
- Faça com que haja somente um consumidor (ou seja, somente sua consulta de transmissão e mais ninguém) do fluxo do Kinesis, para que possamos otimizar sua única consulta de transmissão para buscar o mais rápido possível sem esbarrar nos limites de taxa do Kinesis.
- Defina a opção
maxFetchDuration
como um valor pequeno (digamos, 200 ms) para começar a processar os dados obtidos o mais rápido possível. Se o senhor estiver usandoTrigger.AvailableNow
, isso aumenta as chances de não conseguir acompanhar os registros mais recentes na transmissão Kinesis. - Defina a opção
minFetchPeriod
para 210 ms para buscar com a maior frequência possível. - Configure a opção
shardsPerTask
ou configure o agrupamento de forma que# cores in cluster >= 2 * (# Kinesis shards) / shardsPerTask
. Isso garante que as tarefas de pré-busca em segundo plano e as tarefas de consulta de transmissão possam ser executadas simultaneamente.
Se você perceber que sua consulta está recebendo dados a cada 5 segundos, é provável que esteja atingindo os limites de taxa do Kinesis. Revise suas configurações.
Quais são as métricas relatadas pelo Kinesis?
Kinesis informa o número de milissegundos que um consumidor ficou atrasado em relação ao início de uma transmissão para cada workspace. O senhor pode obter a média, o mínimo e o máximo do número de milissegundos entre todos os espaços de trabalho no processo de consulta de transmissão, como as métricas avgMsBehindLatest
, maxMsBehindLatest
e minMsBehindLatest
. Consulte o objeto de métricas de fontes (Kinesis).
Se o senhor estiver executando a transmissão em um Notebook, poderá ver métricas nos dados brutos tab no painel de progresso da consulta de transmissão, como no exemplo a seguir:
{
"sources": [
{
"description": "KinesisV2[stream]",
"metrics": {
"avgMsBehindLatest": "32000.0",
"maxMsBehindLatest": "32000",
"minMsBehindLatest": "32000"
}
}
]
}
Ingerir registros do Kinesis como lotes incrementais
Em Databricks Runtime 13.3 LTS e acima, Databricks suporta o uso de Trigger.AvailableNow
com Kinesis fonte de dados para semântica de lotes incrementais. A seguir, descrevemos a configuração básica:
- Quando um micro-batch lê gatilhos no modo disponível agora, a hora atual é registrada pelo cliente Databricks.
- O Databricks pesquisa o sistema de origem para todos os registros com carimbos de data/hora entre esse tempo registrado e o ponto de verificação anterior.
- O Databricks carrega esses registros com
Trigger.AvailableNow
semântica.
Databricks usa um mecanismo de melhor esforço para tentar consumir todos os registros existentes em Kinesis transmissão(ões) quando a consulta de transmissão é executada. Devido a pequenas diferenças potenciais nos carimbos de data e à falta de garantia de ordenação na fonte de dados, alguns registros podem não ser incluídos em um lote acionado. Os registros omitidos são processados como parte das próximas microlotes acionadas.
Se a consulta continuar falhando na busca de registros da transmissão Kinesis mesmo que haja registros, tente aumentar o valor maxFetchDuration
.
Consulte Configuração do processamento de lotes incrementais.
Escreva para a Kinesis
O seguinte trecho de código pode ser utilizado como um ForeachSink
para gravar dados no Kinesis. Exige um Dataset[(String, Array[Byte])]
.
O trecho de código a seguir fornece semântica pelo menos uma vez , não exatamente uma vez.
Kinesis Notebook Foreach Sink
Recomendações para trabalhar com o Kinesis
As consultas do Kinesis podem apresentar latência por vários motivos. Esta seção fornece recomendações para solucionar problemas de latência.
A origem do Kinesis executa Job do Spark em um thread em segundo plano para pré-buscar os dados do Kinesis periodicamente e armazená-los em cache na memória dos executores do Spark. A query transmitida processa os dados em cache após a conclusão de cada passo de pré-busca e disponibiliza os dados para processamento. A passo de pré-busca afeta significativamente a latência de ponta a ponta observada e a Taxa de transferência.
Reduza a latência de pré-busca
Para otimizar a latência mínima da consulta e o uso máximo do recurso, use o cálculo a seguir:
total number of CPU cores in the cluster (across all executors)
> = total number of Kinesis shards
/ shardsPerTask
.
minFetchPeriod
pode criar várias chamadas da API GetRecords para o fragmento do Kinesis até atingir ReadProvisionedThroughputExceeded
. Se ocorrer uma exceção, isso não é indicativo de um problema, pois o conector maximiza a utilização do fragmento do Kinesis.
Evite lentidões causadas por muitos erros de limite de taxa
O conector reduz a quantidade de dados lidos do Kinesis pela metade sempre que encontra um erro de limitação de taxa e registra esse evento no log com uma mensagem: "Hit rate limit. Sleeping for 5 seconds."
É comum ver esses erros quando uma transmissão está sendo recuperada, mas depois que ela é recuperada, o senhor não deve mais ver esses erros. Se isso acontecer, talvez o senhor precise ajustar o Kinesis (aumentando a capacidade) ou ajustar as opções de pré-busca.
Evite derramar dados no disco
Se o senhor tiver um pico repentino na transmissão Kinesis, a capacidade do buffer atribuído poderá ficar cheia e o buffer não será esvaziado com rapidez suficiente para que novos dados sejam adicionados.
Nesses casos, o site Spark transfere blocos do buffer para o disco e torna o processamento mais lento, o que afeta o desempenho da transmissão. Esse evento aparece no site log com uma mensagem como esta:
./log4j.txt:879546:20/03/02 17:15:04 INFO BlockManagerInfo: Updated kinesis_49290928_1_ef24cc00-abda-4acd-bb73-cb135aed175c on disk on 10.0.208.13:43458 (current size: 88.4 MB, original size: 0.0 B)
Para resolver esse problema, tente aumentar a capacidade de memória do clustering (adicione mais nós ou aumente a memória por nó) ou ajuste o parâmetro de configuração fetchBufferSize
.
Hanging S3 write tarefa
O senhor pode ativar a especulação Spark para encerrar a tarefa pendente que impediria o processamento da transmissão. Para garantir que a tarefa não seja encerrada de forma muito agressiva, ajuste cuidadosamente o quantil e o multiplicador para essa configuração. Um bom ponto de partida é definir spark.speculation.multiplier
como 3
e spark.speculation.quantile
como 0.95
.
Reduzir a latência associada ao checkpointing na transmissão com estado
Databricks recomenda o uso do site RocksDB com checkpointing de changelog para consultas de transmissão com estado. Consulte Habilitar o checkpoint do registro de alterações.
Configure o Kinesis enhanced fan-out (EFO) para leituras de consulta de transmissão
No Databricks Runtime 11.3e acima, o conector Databricks Runtime Kinesis fornece suporte para usar o recurso de fan-out aprimorado (EFO) do Amazon Kinesis.
O fan-out aprimorado do Kinesis é um recurso que oferece suporte a consumidores de fluxo de fan-out aprimorado com uma taxa de transferência dedicada de 2 MB/s por fragmento, por consumidor (máximo de 20 consumidores por fluxo do Kinesis), e registra a entrega no modo push no lugar do modo pull.
Por meio do site default, uma consulta de transmissão estruturada configurada no modo EFO se registra como um consumidor com uma taxa de transferência dedicada e um nome de consumidor exclusivo e um consumidor ARN (Amazon recurso Number) em Kinesis Data transmission.
Por default, Databricks usa o ID da consulta de transmissão com o prefixo databricks_
para nomear o novo consumidor. Opcionalmente, você pode especificar as opções consumerNamePrefix
ou consumerName
para substituir esse comportamento. O endereço consumerName
deve ser uma cadeia de caracteres composta de letras, números e caracteres especiais _ . -
.
Um consumidor EFO registrado incorre em cobranças adicionais no Amazon Kinesis. Para cancelar o registro do consumidor automaticamente na desmontagem da consulta, defina a opção requireConsumerDeregistration
como true
. A Databricks não pode garantir o cancelamento do registro em eventos como falhas de driver ou falhas de nó. Em caso de falha no trabalho, o site Databricks recomenda gerenciar os consumidores registrados diretamente para evitar cobranças excessivas no site Kinesis.
Opções avançadas de configuração do consumidor
Em Databricks Runtime 16.1 e acima, o senhor pode configurar as leituras no modo EFO usando os consumidores existentes por meio das opções registeredConsumerId
e registeredConsumerIdType
, como no exemplo a seguir:
- Python
- Scala
df = (spark.readStream
.format("kinesis")
.option("streamName", "mystreamname1,mystreamname2")
.option("registeredConsumerId", "consumer1,consumer2")
.option("registeredConsumerIdType", "name")
.load()
)
val kinesis = spark.readStream
.format("kinesis")
.option("streamName", "mystreamname1,mystreamname2")
.option("registeredConsumerId", "consumer1,consumer2")
.option("registeredConsumerIdType", "name")
.load()
O senhor pode especificar os consumidores existentes usando o nome ou o ARN. O senhor deve fornecer um ID de consumidor para cada fonte de transmissão Kinesis especificada na configuração.
Gerenciamento de consumidores off-line usando um notebook Databricks
A Databricks oferece um utilitário de gerenciamento de consumidores para registrar, listar ou cancelar o registro de consumidores associados aos fluxos de dados do Kinesis. O seguinte código demonstra o uso desse utilitário em um notebook Databricks:
-
Em um novo caderno de dados anexado a um agrupamento ativo, crie um
AWSKinesisConsumerManager
fornecendo as informações de autenticação necessárias.Scalaimport com.databricks.sql.kinesis.AWSKinesisConsumerManager
val manager = AWSKinesisConsumerManager.newManager()
.option("serviceCredential", serviceCredentialName)
.option("region", kinesisRegion)
.create() -
Liste e exiba os consumidores.
Scalaval consumers = manager.listConsumers("<stream name>")
display(consumers) -
Registre o consumidor do fluxo informado.
Scalaval consumerARN = manager.registerConsumer("<stream name>", "<consumer name>")
-
Cancele o registro do consumidor da transmissão informada.
Scalamanager.deregisterConsumer("<stream name>", "<consumer name>")