Inscreva-se no Google Pub/Sub
Databricks fornece um conector integrado para assinar o Google Pub/Sub em Databricks Runtime 13.3 LTS e acima. Esse conector fornece uma semântica de processamento de exatamente uma vez para registros do assinante.
O Pub/Sub pode publicar registros duplicados, e os registros podem chegar ao assinante fora de ordem. O senhor deve escrever o código do Databricks para lidar com registros duplicados e fora de ordem.
Exemplo de sintaxe
O exemplo de código a seguir demonstra a sintaxe básica para configurar uma transmissão estruturada lida a partir do Pub/Sub:
val authOptions: Map[String, String] =
Map("clientId" -> clientId,
"clientEmail" -> clientEmail,
"privateKey" -> privateKey,
"privateKeyId" -> privateKeyId)
val query = spark.readStream
.format("pubsub")
// we will create a Pubsub subscription if none exists with this id
.option("subscriptionId", "mysub") // required
.option("topicId", "mytopic") // required
.option("projectId", "myproject") // required
.options(authOptions)
.load()
Para obter mais opções de configuração, consulte Configurar opções para leitura de transmissão Pub/Sub.
Configurar o acesso ao Pub/Sub
A tabela a seguir descreve as funções necessárias para as credenciais configuradas:
Papéis | Obrigatório ou opcional | Como é usado |
---|---|---|
| Obrigatório | Verificar se a inscrição existe e obter a inscrição |
| Obrigatório | Obter dados de uma inscrição |
| Opcional | Permite a criação de uma inscrição caso não exista e também permite o uso do site |
A Databricks recomenda o uso de segredos ao fornecer opções de autorização. As seguintes opções são necessárias para autorizar uma conexão:
clientEmail
clientId
privateKey
privateKeyId
Esquema Pub/Sub
O esquema da transmissão corresponde aos registros que são obtidos do Pub/Sub, conforme descrito na tabela a seguir:
campo | Tipo |
---|---|
|
|
|
|
|
|
|
|
Configurar opções de leitura de transmissão Pub/Sub
A tabela a seguir descreve as opções compatíveis com o Pub/Sub. Todas as opções são configuradas como parte de uma transmissão estruturada lida usando a sintaxe do site .option("<optionName>", "<optionValue>")
.
Algumas opções de configuração do Pub/Sub usam o conceito de fetches em vez de micro-lotes . Isso reflete detalhes internos de implementação e as opções funcionam de forma semelhante aos corolários em outros conectores de transmissão estruturada, exceto que os registros são obtidos e depois processados.
Opção | Valor padrão | Descrição |
---|---|---|
| Definido como a metade do número de executores presentes na inicialização da transmissão. | O número de Spark tarefas paralelas que buscam registros de uma inscrição. |
|
| Se |
| nenhum | Um limite flexível para o tamanho dos lotes a serem processados durante cada micro-lote acionado. |
| 1000 | O número de registros a serem buscados por tarefa antes de processar os registros. |
| 10 segundos | O tempo de duração de cada tarefa para buscar antes de processar os registros. Databricks recomenda usar o valor default. |
Semântica de processamento de lotes incrementais para Pub/Sub
O senhor pode usar o site Trigger.AvailableNow
para consumir os registros disponíveis das fontes Pub/Sub em lotes incrementais.
O Databricks registra o registro de data e hora quando o usuário inicia uma leitura com a configuração Trigger.AvailableNow
. Os registros processados pelos lotes incluem todos os dados obtidos anteriormente e todos os registros recém-publicados com um carimbo de data/hora menor do que o carimbo de data/hora da transmissão registrada.
Consulte Configuração do processamento de lotes incrementais.
monitoramento de transmissão métricas
As métricas de progresso da transmissão estruturada informam o número de registros obtidos e prontos para serem processados, o tamanho dos registros obtidos e prontos para serem processados e o número de duplicatas vistas desde o início da transmissão. A seguir, um exemplo dessas métricas:
"metrics" : {
"numDuplicatesSinceStreamStart" : "1",
"numRecordsReadyToProcess" : "1",
"sizeOfRecordsReadyToProcess" : "8"
}
Limitações
A execução especulativa (spark.speculation
) não é compatível com o Pub/Sub.