Conecte-se ao Amazon Kinesis
Esta página descreve como usar a transmissão estruturada para ler e gravar dados no Amazon Kinesis.
Databricks recomenda que você habilite o endpoint VPC S3 para que todo o tráfego S3 seja roteado na rede AWS .
Se você excluir e recriar uma transmissão Kinesis , não poderá reutilizar nenhum diretório de ponto de verificação existente para reiniciar uma consulta de transmissão. Você deve excluir os diretórios de ponto de verificação e iniciar essas consultas do zero. Você pode reestruturar a transmissão aumentando o número de shards sem interromper ou reiniciar a transmissão.
Consulte Recomendações para trabalhar com o Kinesis.
Autenticação com o Amazon Kinesis
A Databricks recomenda gerenciar sua conexão com o Kinesis usando uma credencial de serviço da Databricks. Consulte Criar credenciais de serviço. Databricks As credenciais de serviço requerem Databricks Runtime 16.1 e acima.
Para usar uma credencial de serviço, faça o seguinte:
- Crie uma credencial de serviço Databricks usando um IAM role com as permissões necessárias para acessar Kinesis. Consulte a Etapa 1: Criar um IAM role.
- Forneça o nome da credencial do serviço usando a opção
serviceCredentialao definir uma leitura de transmissão.
A fonte Kinesis requer permissões ListShards, GetRecords e GetShardIterator . Se você encontrar Amazon: Access Denied exceções, verifique se sua IAM role tem essas permissões. Consulte Controlando o acesso ao recurso de transmissão de dados Amazon Kinesis usando IAM.
Métodos alternativos de autenticação
Caso as credenciais de serviço do Databricks não estejam disponíveis, o Databricks oferece os seguintes métodos alternativos de autenticação.
perfil da instância
Anexar um instance profile durante a configuração do compute. Veja o perfil da instância.
não são compatíveis com o modo de acesso padrão (antigo modo de acesso compartilhado). Consulte o Padrão compute requisitos e limitações.
Usar a chave diretamente
Defina as opções awsAccessKey e awsSecretKey .
Se usar chaves, armazene-as usando segredos Databricks . Veja Gestão Secreta.
Assumir IAM role
Algumas configurações compute permitem que você assuma uma IAM role usando a opção roleArn . Para assumir uma função, inicie seu cluster com permissões para assumir a função ou forneça a chave de acesso por meio de awsAccessKey e awsSecretKey.
Este método suporta autenticação entreaccount . Para obter mais informações, consulte Delegar acesso em toda a conta AWS usando a função IAM.
Opcionalmente, você pode especificar a ID externa com roleExternalId e um nome de sessão com roleSessionName.
Esquema
O Kinesis retorna registros com o seguinte esquema:
Coluna | Tipo |
|---|---|
| string |
| binário |
| string |
| string |
| string |
| carimbo de data/hora |
Para desserializar os dados na coluna data , converta o campo para strings.
Início rápido
O notebook a seguir demonstra como executar o WordCount com a Transmissão estruturada com Kinesis.
Kinesis WordCount com o Notebook Transmissão Estruturada
Configurar as opções do Kinesis
Em Databricks Runtime 13.3 LTS e acima, o senhor pode usar Trigger.AvailableNow com Kinesis. Consulte Ingerir registros Kinesis como um lote incremental.
No Databricks Runtime 16.1 e no acima, o senhor pode usar streamARN para identificar as fontes do Kinesis.
Para todas as versões do Databricks Runtime, o senhor deve especificar streamName ou streamARN. Você só pode fornecer uma dessas opções.
Não alterne entre streamName e streamARN para uma consulta de transmissão ativa. Databricks não permite alterar essas opções durante a transmissão, e reiniciar a consulta pode resultar em registros duplicados ou perda de dados. Se você precisar mudar de streamName para streamARN, inicie uma nova consulta de transmissão com um novo diretório de ponto de verificação.
Veja a seguir as configurações comuns para as fontes de dados do Kinesis:
Opção | Valor | Padrão | Descrição |
|---|---|---|---|
streamName | Uma lista separada por vírgulas de nomes de transmissão. | Nenhuma | Os nomes dos transmissão para assinar. |
StreamArn | Uma lista separada por vírgulas de Kinesis transmissão ARNs. Por exemplo, | Nenhuma | A ARNs de transmissão a ser assinada. Disponível em Databricks Runtime 16.1 e acima. |
região | Região para as transmissões a serem especificadas. | Região resolvida localmente | A região em que as transmissões são definidas. |
endpoint | Região para o fluxo de dados do Kinesis. | Região resolvida localmente | O ponto final regional do Kinesis Data Streams. |
initialPosition |
|
| Por onde começar a ler na transmissão. Especifique |
maxRecordsPerFetch | Um número inteiro positivo. | 10.000 | Quantos registros devem ser lidos por solicitação de API para o Kinesis? O número de registros retornados pode ser maior dependendo se os sub-registros foram agregados em um único registro usando a biblioteca Kinesis Producer. |
maxFetchRate | Número decimal positivo representando a taxa de dados em MB/s. | 1,0 (máx = 2,0) | Qual a velocidade de pré-busca de dados por fragmento? Essa opção limita a taxa de requisições e evita a limitação de requisições pelo Kinesis. 2,0 MB/s é a taxa máxima permitida pelo Kinesis. |
minFetchPeriod | Uma string de duração, por exemplo, | 400ms (min = 200ms) | Quanto tempo esperar entre tentativas consecutivas de pré-busca? Essa opção limita a frequência de requisições e evita a limitação de taxa do Kinesis. 200 ms é o mínimo, pois o Kinesis permite um máximo de 5 buscas por segundo. |
maxFetchDuration | Uma string de duração, por exemplo, | 10s | Por quanto tempo você deve armazenar em buffer os novos dados pré-configurados antes de disponibilizá-los para processamento. |
fetchBufferSize | Uma sequência de bytes, por exemplo, | 20 Gb | Quantidade de dados a serem armazenados em buffer para o próximo gatilho. Essa opção é uma condição de parada, não um limite superior estrito. É possível que mais dados sejam armazenados em buffer do que o especificado para este valor. |
shardsPerTask | Um número inteiro positivo. | 5 | De quantos shards do Kinesis devem ser pré-buscados em paralelo por tarefa do Spark? Idealmente, |
shardFetchInterval | Uma string de duração, por exemplo, | 1s | Com que frequência pesquisar o Kinesis para resharding. |
Credencial de serviço | String | Nenhum padrão. | O nome de sua credencial de serviço Databricks. Consulte Criar credenciais de serviço. |
awsAccessKey | String | Nenhum padrão. | Chave de acesso AWS. |
awsSecretKey | String | Nenhum padrão. | Chave de acesso secreta AWS correspondente à chave de acesso. |
roleArn | String | Nenhum padrão. | O nome do recurso da Amazon (ARN) do papel a ser assumido ao acessar o Kinesis. |
roleExternalId | String | Nenhum padrão. | Um valor opcional que pode ser usado ao delegar acesso à conta AWS. Consulte Como usar um ID externo. |
roleSessionName | String | Nenhum padrão. | Um identificador para a sessão de função assumida que identifica exclusivamente uma sessão quando a mesma função é assumida por diferentes princípios ou por diferentes motivos. |
coalesceThresholdBlockSize | Um número inteiro positivo. | 10.000.000 | O limite no qual ocorre a coalescência automática. Se o tamanho médio do bloco for menor que esse valor, os blocos pré-buscados serão agrupados em |
coalesceBinSize | Um número inteiro positivo. | 128.000.000 | O tamanho aproximado do bloco após a coalescência. |
consumerMode |
|
| Tipo de consumidor para executar a consulta de transmissão. Consulte Configurar Kinesis enhanced fan-out (EFO) para leituras de consulta de transmissão. Disponível em Databricks Runtime 11.3 LTS e acima. |
requireConsumerDeregistration |
|
| Se o registro do consumidor de fan-out avançado deve ser cancelado no término da consulta. Requer |
Máximo de fragmentos por descrição | Um número inteiro positivo (máx. 10000). | 100 | O número máximo de fragmentos a serem lidos por chamada de API ao listar fragmentos. |
Nome do consumidor | Nome único do consumidor a ser usado em todas as transmissões ou lista de nomes de consumidores separados por vírgula. | ID da consulta de transmissão | Nome do consumidor usado para registrar a consulta com Kinesis serviço no modo EFO. Disponível em Databricks Runtime 11.3 LTS e acima. |
Prefixo do nome do consumidor | Strings vazias ou strings de prefixo personalizadas. |
| Prefixo usado junto com consumerName para registrar consumidores com o Kinesis serviço no modo EFO. Disponível em Databricks Runtime 16.0 e acima. |
Intervalo de atualização do consumidor | A duração é definida, por exemplo, "1s" para 1 segundo (máx. 3600s). | 300s | O intervalo em que o registro do consumidor do Kinesis EFO é verificado e atualizado. Disponível em Databricks Runtime 11.3 LTS e acima. |
ID de consumidor registrado | Uma lista separada por vírgulas de nomes de consumidores ou ARNs. | Nenhuma | Identificadores para consumidores existentes no modo EFO. Disponível em Databricks Runtime 16.1 e acima. |
Tipo de ID de consumidor registrado |
| Nenhuma | Especifica se as IDs do consumidor são nomes ou ARNs. Disponível em Databricks Runtime 16.1 e acima. |
Os valores default das opções são configurados para que dois leitores (Spark ou outros) possam consumir simultaneamente uma transmissão Kinesis sem atingir os limites de taxa Kinesis . Se você tiver mais consumidores, precisará ajustar as opções de acordo. Por exemplo, você pode precisar reduzir maxFetchRate e aumentar minFetchPeriod.
Monitoramento e alertas de baixa latência
Os casos de uso de alertas exigem baixa latência. Para minimizar a latência:
- Verifique se sua consulta de transmissão é a única consumidora da transmissão Kinesis para otimizar o desempenho de busca e evitar os limites de taxa Kinesis .
- Defina a opção
maxFetchDurationcom um valor pequeno (por exemplo, 200 ms) para processar os dados obtidos o mais rápido possível. Essa opção representa um equilíbrio: ela prioriza uma velocidade de processamento mais rápida por lote em vez de garantir que os registros mais recentes sejam consumidos em cada lote. Por exemplo, se você usarTrigger.AvailableNow, um valor pequeno pode fazer com que sua consulta fique desatualizada em relação aos registros mais recentes na transmissão Kinesis . - Defina a opção
minFetchPeriodpara 210 ms para buscar com a maior frequência possível. - Defina a opção
shardsPerTaskou configure o cluster de forma que# cores in cluster >= 2 * (# Kinesis shards) / shardsPerTask. Isso garante que a tarefa de pré-busca em segundo plano e a tarefa de consulta de transmissão sejam executadas simultaneamente.
Se sua consulta estiver recebendo dados a cada 5 segundos, você poderá atingir os limites de taxa do Kinesis. Revise suas configurações.
Quais são as métricas relatadas pelo Kinesis?
Kinesis informa o número de milissegundos que um consumidor ficou atrasado em relação ao início de uma transmissão para cada workspace. As métricas avgMsBehindLatest, maxMsBehindLatest e minMsBehindLatest fornecem a média, o mínimo e o máximo em milissegundos em todo o espaço de trabalho no processo de consulta de transmissão. Veja consultas de monitoramento transmissão estruturada no Databricks.
Se você estiver executando a transmissão em um Notebook, consulte as métricas na tab dados brutos no painel de progresso da consulta de transmissão. Eis um exemplo:
{
"sources": [
{
"description": "KinesisV2[stream]",
"metrics": {
"avgMsBehindLatest": "32000.0",
"maxMsBehindLatest": "32000",
"minMsBehindLatest": "32000"
}
}
]
}
Ingerir registros do Kinesis como lotes incrementais
Em Databricks Runtime 13.3 LTS e acima, Databricks suporta o uso de Trigger.AvailableNow com Kinesis fonte de dados para semântica de lotes incrementais. A seguir, descrevemos a configuração básica:
- Quando um micro-batch lê gatilhos no modo disponível agora, a hora atual é registrada pelo cliente Databricks.
- O Databricks pesquisa o sistema de origem para todos os registros com carimbos de data/hora entre esse tempo registrado e o ponto de verificação anterior.
- O Databricks carrega esses registros com
Trigger.AvailableNowsemântica.
Databricks usa um mecanismo de melhor esforço para tentar consumir todos os registros existentes em Kinesis transmissão(ões) quando a consulta de transmissão é executada. Devido a pequenas diferenças potenciais nos carimbos de data e à falta de garantia de ordenação na fonte de dados, alguns registros podem não ser incluídos em um lote acionado. Os registros omitidos são processados como parte das próximas microlotes acionadas.
Se a consulta continuar falhando na busca de registros da transmissão Kinesis mesmo que haja registros, tente aumentar o valor maxFetchDuration.
Veja AvailableNow: Processamento incremental de lotes.
Escreva para a Kinesis
O seguinte trecho de código pode ser utilizado como um ForeachSink para gravar dados no Kinesis. Exige um Dataset[(String, Array[Byte])].
O trecho de código a seguir fornece semântica pelo menos uma vez , não exatamente uma vez.
Kinesis Notebook Foreach Sink
Recomendações para trabalhar com o Kinesis
As consultas do Kinesis podem apresentar latência por vários motivos. Esta seção fornece recomendações para solucionar problemas de latência.
A origem do Kinesis executa Job do Spark em um thread em segundo plano para pré-buscar os dados do Kinesis periodicamente e armazená-los em cache na memória dos executores do Spark. A query transmitida processa os dados em cache após a conclusão de cada passo de pré-busca e disponibiliza os dados para processamento. A passo de pré-busca afeta significativamente a latência de ponta a ponta observada e a Taxa de transferência.
Reduza a latência de pré-busca
Para otimizar a latência mínima da consulta e o uso máximo do recurso, use o cálculo a seguir:
total number of CPU cores in the cluster (across all executors) > = total number of Kinesis shards / shardsPerTask.
minFetchPeriod pode criar várias chamadas da API GetRecords para o fragmento do Kinesis até atingir ReadProvisionedThroughputExceeded. Se ocorrer uma exceção, isso não é indicativo de um problema, pois o conector maximiza a utilização do fragmento do Kinesis.
Evite lentidões causadas por muitos erros de limite de taxa
O conector reduz a quantidade de dados lidos do Kinesis pela metade sempre que encontra um erro de limitação de taxa e registra esse evento no log com uma mensagem: "Hit rate limit. Sleeping for 5 seconds."
É comum ver esses erros quando uma transmissão está sendo recuperada, mas depois que ela é recuperada, o senhor não deve mais ver esses erros. Se isso acontecer, talvez o senhor precise ajustar o Kinesis (aumentando a capacidade) ou ajustar as opções de pré-busca.
Evite derramar dados no disco
Se o senhor tiver um pico repentino na transmissão Kinesis, a capacidade do buffer atribuído poderá ficar cheia e o buffer não será esvaziado com rapidez suficiente para que novos dados sejam adicionados.
Nesses casos, o site Spark transfere blocos do buffer para o disco e torna o processamento mais lento, o que afeta o desempenho da transmissão. Esse evento aparece no site log com uma mensagem como esta:
./log4j.txt:879546:20/03/02 17:15:04 INFO BlockManagerInfo: Updated kinesis_49290928_1_ef24cc00-abda-4acd-bb73-cb135aed175c on disk on 10.0.208.13:43458 (current size: 88.4 MB, original size: 0.0 B)
Para resolver esse problema, tente aumentar a capacidade de memória do clustering (adicione mais nós ou aumente a memória por nó) ou ajuste o parâmetro de configuração fetchBufferSize.
Hanging S3 write tarefa
O senhor pode ativar a especulação Spark para encerrar a tarefa pendente que impediria o processamento da transmissão. Para garantir que a tarefa não seja encerrada de forma muito agressiva, ajuste cuidadosamente o quantil e o multiplicador para essa configuração. Um bom ponto de partida é definir spark.speculation.multiplier como 3 e spark.speculation.quantile como 0.95.
Reduzir a latência associada ao checkpointing na transmissão com estado
Databricks recomenda o uso do site RocksDB com checkpointing de changelog para consultas de transmissão com estado. Consulte Habilitar o checkpoint do registro de alterações.
Configure o Kinesis enhanced fan-out (EFO) para leituras de consulta de transmissão
No Databricks Runtime 11.3e acima, o conector Databricks Runtime Kinesis fornece suporte para usar o recurso de fan-out aprimorado (EFO) do Amazon Kinesis.
O fan-out aprimorado do Kinesis é um recurso que oferece suporte a consumidores de fluxo de fan-out aprimorado com uma taxa de transferência dedicada de 2 MB/s por fragmento, por consumidor (máximo de 20 consumidores por fluxo do Kinesis), e registra a entrega no modo push no lugar do modo pull.
Por meio do site default, uma consulta de transmissão estruturada configurada no modo EFO se registra como um consumidor com uma taxa de transferência dedicada e um nome de consumidor exclusivo e um consumidor ARN (Amazon recurso Number) em Kinesis Data transmission.
Por default, Databricks usa o ID da consulta de transmissão com o prefixo databricks_ para nomear o novo consumidor. Opcionalmente, você pode especificar as opções consumerNamePrefix ou consumerName para substituir esse comportamento. O endereço consumerName deve ser uma cadeia de caracteres composta de letras, números e caracteres especiais _ . -.
Ao reiniciar a consulta, a fonte Kinesis usa o modo de polling para reproduzir os últimos lotes não confirmados, caso existam. Após a transmissão reproduzir os lotes não confirmados, a fonte retorna ao modo EFO para as leituras subsequentes.
Um consumidor EFO registrado incorre em cobranças adicionais no Amazon Kinesis. Para cancelar o registro do consumidor automaticamente na desmontagem da consulta, defina a opção requireConsumerDeregistration como true. A Databricks não pode garantir o cancelamento do registro em eventos como falhas de driver ou falhas de nó. Em caso de falha no trabalho, o site Databricks recomenda gerenciar os consumidores registrados diretamente para evitar cobranças excessivas no site Kinesis.
Opções avançadas de configuração do consumidor
Em Databricks Runtime 16.1 e acima, o senhor pode configurar as leituras no modo EFO usando os consumidores existentes por meio das opções registeredConsumerId e registeredConsumerIdType, como no exemplo a seguir:
- Python
- Scala
df = (spark.readStream
.format("kinesis")
.option("streamName", "mystreamname1,mystreamname2")
.option("registeredConsumerId", "consumer1,consumer2")
.option("registeredConsumerIdType", "name")
.load()
)
val kinesis = spark.readStream
.format("kinesis")
.option("streamName", "mystreamname1,mystreamname2")
.option("registeredConsumerId", "consumer1,consumer2")
.option("registeredConsumerIdType", "name")
.load()
O senhor pode especificar os consumidores existentes usando o nome ou o ARN. O senhor deve fornecer um ID de consumidor para cada fonte de transmissão Kinesis especificada na configuração.
Gerenciamento de consumidores off-line usando um notebook Databricks
A Databricks oferece um utilitário de gerenciamento de consumidores para registrar, listar ou cancelar o registro de consumidores associados aos fluxos de dados do Kinesis. O seguinte código demonstra o uso desse utilitário em um notebook Databricks:
-
Em um novo caderno de dados anexado a um agrupamento ativo, crie um
AWSKinesisConsumerManagerfornecendo as informações de autenticação necessárias.Scalaimport com.databricks.sql.kinesis.AWSKinesisConsumerManager
val manager = AWSKinesisConsumerManager.newManager()
.option("serviceCredential", serviceCredentialName)
.option("region", kinesisRegion)
.create() -
Liste e exiba os consumidores.
Scalaval consumers = manager.listConsumers("<stream name>")
display(consumers) -
Registre o consumidor do fluxo informado.
Scalaval consumerARN = manager.registerConsumer("<stream name>", "<consumer name>") -
Cancele o registro do consumidor da transmissão informada.
Scalamanager.deregisterConsumer("<stream name>", "<consumer name>")