Ler e gravar dados Avro transmitidos

O Apache Avro é um sistema de serialização de dados comumente usado no mundo da transmissão. Uma solução típica é colocar dados no formato Avro no Apache Kafka, metadados no Confluent Schema Registry e, em seguida, executar query com uma estrutura de transmissão que se conecta ao Kafka e ao Schema Registry.

O Databricks dá suporte às funções from_avro e to_avro para criar pipelines de transmissão com dados Avro no Kafka e metadados no Schema Registry. A função to_avro codifica uma coluna como binário no formato Avro e from_avro decodifica dados binários Avro em uma coluna. Ambas as funções transformam uma coluna em outra coluna, e o tipo de dados SQL de entrada/saída pode ser um tipo complexo ou um tipo primitivo.

Observação

As funções from_avro e to_avro :

  • Estão disponíveis em Python, Scala e Java.

  • Pode ser passado para funções SQL tanto em lotes quanto query transmitidas.

Veja também o arquivo Avro fonte de dados.

Exemplo de esquema especificado manualmente

Semelhante a from_json e to_json, você pode usar from_avro e to_avro com qualquer coluna binária. Você pode especificar o esquema Avro manualmente, como no exemplo a seguir:

import org.apache.spark.sql.avro.functions._
import org.apache.avro.SchemaBuilder

// When reading the key and value of a Kafka topic, decode the
// binary (Avro) data into structured data.
// The schema of the resulting DataFrame is: <key: string, value: int>
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro($"key", SchemaBuilder.builder().stringType()).as("key"),
    from_avro($"value", SchemaBuilder.builder().intType()).as("value"))

// Convert structured data to binary from string (key column) and
// int (value column) and save to a Kafka topic.
dataDF
  .select(
    to_avro($"key").as("key"),
    to_avro($"value").as("value"))
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("topic", "t")
  .start()

jsonFormatSchema exemplo

Você também pode especificar um esquema como strings JSON. Por exemplo, se /tmp/user.avsc for:

{
  "namespace": "example.avro",
  "type": "record",
  "name": "User",
  "fields": [
    {"name": "name", "type": "string"},
    {"name": "favorite_color", "type": ["string", "null"]}
  ]
}

Você pode criar strings JSON:

from pyspark.sql.avro.functions import from_avro, to_avro

jsonFormatSchema = open("/tmp/user.avsc", "r").read()

Em seguida, use o esquema em from_avro:

# 1. Decode the Avro data into a struct.
# 2. Filter by column "favorite_color".
# 3. Encode the column "name" in Avro format.

output = df\
  .select(from_avro("value", jsonFormatSchema).alias("user"))\
  .where('user.favorite_color == "red"')\
  .select(to_avro("user.name").alias("value"))

Exemplo com registro de esquema

Se seus clusters tiverem um serviço Schema Registry, from_avro poderá trabalhar com ele para que você não precise especificar o esquema Avro manualmente.

O exemplo a seguir demonstra a leitura de um tópico Kafka “t”, assumindo que a chave e o valor já estão registrados no Schema Registry como assuntos “t-key” e “t-value” dos tipos STRING e INT:

import org.apache.spark.sql.avro.functions._

val schemaRegistryAddr = "https://myhost:8081"
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro($"key", "t-key", schemaRegistryAddr).as("key"),
    from_avro($"value", "t-value", schemaRegistryAddr).as("value"))

Para to_avro, o esquema Avro de saída default pode não corresponder ao esquema do assunto de destino no serviço Schema Registry pelos seguintes motivos:

  • O mapeamento do tipo Spark SQL para o esquema Avro não é um para um. Consulte Tipos suportados para Spark SQL -> conversão Avro.

  • Se o esquema Avro de saída convertido for do tipo de registro, o nome do registro será topLevelRecord e não haverá namespace por default.

Se o esquema de saída default de to_avro corresponder ao esquema do assunto de destino, você poderá fazer o seguinte:

// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()

Caso contrário, você deve fornecer o esquema do assunto de destino na função to_avro :

// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()

Autenticar em um Registro de Esquema Confluente externo

No Databricks Runtime 12.1e acima, você pode autenticar em um Registro de Esquema Confluente externo. Os exemplos a seguir demonstram como configurar suas opções de registro de esquema para incluir credenciais de autenticação e key de API.

import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._

val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
      "confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
      "confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret")

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro($"key", "t-key", schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
    from_avro($"value", "t-value", schemaRegistryAddr, schemaRegistryOptions.asJava).as("value"))

// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...

// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro

schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
  "confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
  "confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}"
}

df = (spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro(
      data = col("key"),
      options = schema_registry_options,
      subject = "t-key",
      schemaRegistryAddress = schema_registry_address
    ).alias("key"),
    from_avro(
      data = col("value"),
      options = schema_registry_options,
      subject = "t-value",
      schemaRegistryAddress = schema_registry_address
    ).alias("value")
  )
)

# The converted data is saved to Kafka as a Kafka topic "t".
data_df
  .select(
    to_avro(
      data = col("key"),
      subject = lit("t-key"),
      schemaRegistryAddress = schema_registry_address,
      options = schema_registry_options
    ).alias("key"),
    to_avro(
      data = col("value"),
      subject = lit("t-value"),
      schemaRegistryAddress = schema_registry_address,
      options = schema_registry_options
    ).alias("value")
  )
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

# The Avro schema of subject "t-value" in JSON string format.
avro_schema = ...

# The converted data is saved to Kafka as a Kafka topic "t".
data_df
  .select(
    to_avro(col("key"), lit("t-key"), schema_registry_address, schema_registry_options).alias("key"),
    to_avro(col("value"), lit("t-value"), schema_registry_address, schema_registry_options, avro_schema).alias("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

Usar arquivos truststore e keystore nos volumes do Unity Catalog

No Databricks Runtime 14.3 LTS e acima, o senhor pode usar arquivos truststore e keystore nos volumes do Unity Catalog para autenticar em um Confluent Schema Registry. Atualize a configuração do exemplo anterior usando a seguinte sintaxe:

val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
      "confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/truststore.jks",
      "confluent.schema.registry.ssl.truststore.password" -> "truststorePassword",
      "confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/keystore.jks",
      "confluent.schema.registry.ssl.truststore.password" -> "keystorePassword",
      "confluent.schema.registry.ssl.key.password" -> "keyPassword")

Use o modo evolução do esquema com from_avro

No Databricks Runtime 14.2 e acima, você pode usar o modo de evolução do esquema com from_avro. A ativação do modo de evolução do esquema faz com que o Job gere um UnknownFieldException após detectar a evolução do esquema. A Databricks recomenda configurar o Job com o modo evolução do esquema para reiniciar automaticamente em caso de falha da tarefa. Consulte Configurar Job de transmissão estruturada para reiniciar consultas de transmissão em caso de falha.

A evolução do esquema é útil se você espera que o esquema dos seus dados de origem evolua ao longo do tempo e ingira todos os campos da sua fonte de dados. Se a sua query já especifica explicitamente quais campos query na sua fonte de dados, os campos adicionados serão ignorados independentemente da evolução do esquema.

Use a opção avroSchemaEvolutionMode para ativar a evolução do esquema. A tabela a seguir descreve as opções para o modo de evolução do esquema:

Opção

Comportamento

none

default. Ignora a evolução do esquema e o Job continua.

restart

Lança um UnknownFieldException ao detectar a evolução do esquema. Requer uma reinicialização Job .

Observação

Você pode alterar esta configuração entre Job de transmissão e reutilizar o mesmo checkpoint. Desabilitar a evolução do esquema pode resultar na eliminação de colunas.

Configurar o modo de análise

Você pode configurar o modo de análise para determinar se deseja falhar ou emitir registros nulos quando o modo de evolução do esquema estiver desabilitado e o esquema evoluir de uma forma não compatível com versões anteriores. Com configurações default , from_avro falha quando observa alterações de esquema incompatíveis.

Use a opção mode para especificar o modo de análise. A tabela a seguir descreve a opção do modo de análise:

Opção

Comportamento

FAILFAST

default. Um erro de análise gera um SparkException com um errorClass de MALFORMED_AVRO_MESSAGE.

PERMISSIVE

Um erro de análise é ignorado e um registro nulo é emitido.

Observação

Com a evolução do esquema ativada, FAILFAST só gera exceções se um registro estiver corrompido.

Exemplo usando evolução do esquema e configurando modo de análise

O exemplo a seguir demonstra a ativação da evolução do esquema e a especificação do modo de análise FAILFAST com um registro de esquema confluente:

import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._

val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
      "confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
      "confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret",
      "avroSchemaEvolutionMode" -> "restart",
      "mode" -> "FAILFAST")

val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    // We read the "key" binary column from the subject "t-key" in the schema
    // registry at schemaRegistryAddr. We provide schemaRegistryOptions,
    // which has avroSchemaEvolutionMode -> "restart". This instructs from_avro
    // to fail the query if the schema for the subject t-key evolves.
    from_avro(
            $"key",
            "t-key",
            schemaRegistryAddr,
            schemaRegistryOptions.asJava).as("key"))
from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro

schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
  "confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
  "confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}",
  "avroSchemaEvolutionMode": "restart",
  "mode": "FAILFAST",
}

df = (spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("subscribe", "t")
  .load()
  .select(
    from_avro(
      data = col("key"),
      options = schema_registry_options,
      subject = "t-key",
      schemaRegistryAddress = schema_registry_address
    ).alias("key")
  )
)