transmissão estructurada checkpoints
Os pontos de controle e o write-ahead logs trabalham juntos para oferecer garantias de processamento para cargas de trabalho de transmissão estruturada. O ponto de verificação rastreia as informações que identificam a consulta, incluindo informações de estado e registros processados. Quando o senhor exclui os arquivos em um diretório de ponto de verificação ou muda para um novo local de ponto de verificação, a próxima execução da consulta começa do zero.
Cada consulta deve ter uma localização de ponto de verificação diferente. Várias consultas nunca devem compartilhar o mesmo local.
Ativar o checkpointing para consultas de transmissão estruturada
O senhor deve especificar a opção checkpointLocation
antes de executar uma consulta de transmissão, como no exemplo a seguir:
(df.writeStream
.option("checkpointLocation", "/Volumes/catalog/schema/volume/path")
.toTable("catalog.schema.table")
)
df.writeStream
.option("checkpointLocation", "/Volumes/catalog/schema/volume/path")
.toTable("catalog.schema.table")
Observação
Alguns coletores, como a saída para display()
no Notebook e o coletor memory
, geram automaticamente um local de ponto de verificação temporário se o usuário omitir essa opção. Esses locais temporários de pontos de verificação não garantem nenhuma tolerância a falhas ou garantias de consistência de dados e podem não ser limpos adequadamente. A Databricks recomenda sempre especificar um local de ponto de verificação para esses sumidouros.
Recuperação após alterações em uma consulta de transmissão estruturada
Há limitações quanto às alterações permitidas em uma consulta de transmissão entre reinicializações a partir do mesmo local de ponto de verificação. Aqui estão algumas mudanças que não são permitidas ou o efeito da alteração não está bem definido. Para todos eles:
O termo permitido significa que você pode fazer a alteração especificada, mas se a semântica de seu efeito está bem definida depende da consulta e da alteração.
O termo não permitido significa que você não deve fazer a alteração especificada, pois é provável que a consulta reiniciada falhe com erros imprevisíveis.
sdf
representa uma transmissão DataFrame/conjunto de dados gerado comsparkSession.readStream
.
Tipos de alterações nas consultas de transmissão estruturada
Alterações no número ou tipo (ou seja, fonte diferente) das fontes de entrada: isso não é permitido.
Mudanças nos parâmetros das fontes de entrada: se isso é permitido e se a semântica da alteração está bem definida depende da fonte e da consulta. Aqui estão alguns exemplos.
É permitida a adição, exclusão e modificação dos limites de taxa:
spark.readStream.format("kafka").option("subscribe", "article")
para
spark.readStream.format("kafka").option("subscribe", "article").option("maxOffsetsPerTrigger", ...)
Geralmente, não são permitidas alterações em artigos e arquivos inscritos, pois os resultados são imprevisíveis:
spark.readStream.format("kafka").option("subscribe", "article")
paraspark.readStream.format("kafka").option("subscribe", "newarticle")
Alterações no intervalo de acionamento: O senhor pode alterar os acionadores entre lotes incrementais e intervalos de tempo. Consulte Alteração dos intervalos de acionamento entre as execuções.
Mudanças no tipo de coletor de saída: Alterações entre algumas combinações específicas de coletores são permitidas. Isso precisa ser verificado caso a caso. Aqui estão alguns exemplos.
O sink de arquivo para o sink do Kafka é permitido. O Kafka verá apenas os novos dados.
O sink do Kafka para o sink do arquivo não é permitido.
O Kafka sink foi alterado para foreach, ou vice-versa.
Mudanças nos parâmetros do coletor de saída: se isso é permitido e se a semântica da alteração está bem definida depende do coletor e da consulta. Aqui estão alguns exemplos.
Alterações no diretório de saída de um coletor de arquivos não são permitidas:
sdf.writeStream.format("parquet").option("path", "/somePath")
tosdf.writeStream.format("parquet").option("path", "/anotherPath")
Alterações no tópico de saída são permitidas:
sdf.writeStream.format("kafka").option("topic", "topic1")
parasdf.writeStream.format("kafka").option("topic", "topic2")
Alterações no coletor foreach definido pelo usuário (ou seja, o código
ForeachWriter
) são permitidas, mas a semântica da alteração depende do código.
Alterações na projeção / filtro / operações semelhantes a mapas: Alguns casos são permitidos. Por exemplo:
A adição/exclusão de filtros é permitida:
sdf.selectExpr("a")
asdf.where(...).selectExpr("a").filter(...)
.Mudanças nas projeções com o mesmo esquema de saída são permitidas:
sdf.selectExpr("stringColumn AS json").writeStream
asdf.select(to_json(...).as("json")).writeStream
.Alterações em projeções com esquemas de saída diferentes são permitidas condicionalmente:
sdf.selectExpr("a").writeStream
asdf.selectExpr("b").writeStream
só é permitida se o coletor de saída permitir a alteração do esquema de"a"
para"b"
.
Alterações em operações com estado: Algumas operações em consultas de transmissão precisam manter dados de estado para atualizar continuamente o resultado. A transmissão estruturada faz automaticamente o checkpoint dos dados de estado em um armazenamento tolerante a falhas (por exemplo, DBFS, AWS S3, Azure Blob storage) e os restaura após a reinicialização. No entanto, isso pressupõe que o esquema dos dados de estado permaneça o mesmo nas reinicializações. Isso significa que quaisquer alterações (ou seja, adições, exclusões ou modificações de esquema) nas operações de estado de uma consulta de transmissão não são permitidas entre as reinicializações. Aqui está a lista de operações com estado cujo esquema não deve ser alterado entre as reinicializações para garantir a recuperação do estado:
agregação de transmissão: Por exemplo,
sdf.groupBy("a").agg(...)
. Não é permitida nenhuma alteração no número ou tipo de chave de agrupamento ou agregados.transmissão de deduplicação: Por exemplo,
sdf.dropDuplicates("a")
. Não é permitida nenhuma alteração no número ou tipo de chave de agrupamento ou agregados.transmissão-transmissão join: Por exemplo,
sdf1.join(sdf2, ...)
(ou seja, ambas as entradas são geradas comsparkSession.readStream
). Alterações no esquema ou nas colunas de junção equiparável não são permitidas. Não são permitidas alterações no tipo de join (externo ou interno). Outras alterações na condição join são mal definidas.Operações arbitrárias com estado: Por exemplo,
sdf.groupByKey(...).mapGroupsWithState(...)
ousdf.groupByKey(...).flatMapGroupsWithState(...)
. Qualquer alteração no esquema do estado definido pelo usuário e no tipo de tempo limite não é permitida. Qualquer alteração na função de mapeamento de estado definida pelo usuário é permitida, mas o efeito semântico da alteração depende da lógica definida pelo usuário. Se você realmente quiser oferecer suporte a mudanças no esquema de estado, poderá codificar/decodificar explicitamente suas estruturas de dados de estado complexas em bytes usando um esquema de codificação/decodificação que ofereça suporte à migração de esquemas. Por exemplo, se você salvar seu estado como bytes codificados em Avro, poderá alterar o esquema Avro-State entre as reinicializações da consulta, pois isso restaura o estado binário.