Pular para o conteúdo principal

Transmissão de leitura e gravação Avro data

Apache Avro é um sistema de serialização de dados comumente usado no mundo da transmissão. Uma solução típica é colocar os dados no formato Avro em Apache Kafka, metadados no Confluent Schema Registry e, em seguida, executar consultas com uma estrutura de transmissão que se conecta a Kafka e ao Schema Registry.

Databricks suporta as funções from_avro e to_avro para criar pipeline de transmissão com dados Avro em Kafka e metadados no Schema Registry. A função to_avro codifica uma coluna como binária no formato Avro e from_avro decodifica dados binários Avro em uma coluna. Ambas as funções transformam uma coluna em outra coluna, e o tipo de dados SQL de entrada/saída pode ser um tipo complexo ou um tipo primitivo.

nota

As funções from_avro e to_avro:

  • Estão disponíveis em Python, Scala e Java.
  • Pode ser passado para as funções SQL em consultas de lotes e transmissão.

Consulte também Avro file fonte de dados.

Exemplo de esquema especificado manualmente

Semelhante a from_json e to_json, você pode usar from_avro e to_avro com qualquer coluna binária. O senhor pode especificar o esquema Avro manualmente, como no exemplo a seguir:

Scala
import org.apache.spark.sql.avro.functions._
import org.apache.avro.SchemaBuilder

// When reading the key and value of a Kafka topic, decode the
// binary (Avro) data into structured data.
// The schema of the resulting DataFrame is: <key: string, value: int>
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro($"key", SchemaBuilder.builder().stringType()).as("key"),
from_avro($"value", SchemaBuilder.builder().intType()).as("value"))

// Convert structured data to binary from string (key column) and
// int (value column) and save to a Kafka topic.
dataDF
.select(
to_avro($"key").as("key"),
to_avro($"value").as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()

Exemplo de JSONFormatSchema

O senhor também pode especificar um esquema como uma cadeia de caracteres JSON. Por exemplo, se /tmp/user.avsc for:

JSON
{
"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{ "name": "name", "type": "string" },
{ "name": "favorite_color", "type": ["string", "null"] }
]
}

O senhor pode criar um JSON strings:

Python
from pyspark.sql.avro.functions import from_avro, to_avro

jsonFormatSchema = open("/tmp/user.avsc", "r").read()

Em seguida, use o esquema em from_avro:

Python
# 1. Decode the Avro data into a struct.
# 2. Filter by column "favorite_color".
# 3. Encode the column "name" in Avro format.

output = df\
.select(from_avro("value", jsonFormatSchema).alias("user"))\
.where('user.favorite_color == "red"')\
.select(to_avro("user.name").alias("value"))

Exemplo com Schema Registry

Se o seu clustering tiver um serviço Schema Registry, from_avro poderá trabalhar com ele para que o senhor não precise especificar o esquema Avro manualmente.

O exemplo a seguir demonstra a leitura de um tópico "t" do site Kafka, supondo que o key e o valor já estejam registrados no Schema Registry como assuntos "t-key" e "t-value" dos tipos STRING e INT:

Scala
import org.apache.spark.sql.avro.functions._

val schemaRegistryAddr = "https://myhost:8081"
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro(data = $"key", subject = "t-key", schemaRegistryAddress = schemaRegistryAddr).as("key"),
from_avro(data = $"value", subject = "t-value", schemaRegistryAddress = schemaRegistryAddr).as("value"))

Para to_avro, o esquema default output Avro pode não corresponder ao esquema do assunto de destino no serviço Schema Registry pelos seguintes motivos:

  • O mapeamento do tipo Spark SQL para o esquema Avro não é um para um. Consulte Tipos compatíveis com a conversão Spark SQL -> Avro.
  • Se o esquema Avro de saída convertido for do tipo registro, o nome do registro será topLevelRecord e não haverá namespace em default.

Se o esquema de saída default de to_avro corresponder ao esquema do assunto de destino, o senhor poderá fazer o seguinte:

Scala
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()

Caso contrário, você deve fornecer o esquema do assunto de destino na função to_avro:

Scala
// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()

Autentique-se em um Confluent Schema Registry externo

Em Databricks Runtime 12.2 LTS e acima, o senhor pode se autenticar em um Confluent Schema Registry externo. Os exemplos a seguir demonstram como configurar as opções de registro do esquema para incluir credenciais de autenticação e a chave API.

Scala
import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._

val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
"confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret")

val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro(data = $"key", subject = "t-key", schemaRegistryAddress = schemaRegistryAddr, options = schemaRegistryOptions.asJava).as("key"),
from_avro(data = $"value", subject = "t-value", schemaRegistryAddress = schemaRegistryAddr, options = schemaRegistryOptions.asJava).as("value"))

// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...

// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()

Usar arquivos truststore e keystore nos volumes do Unity Catalog

Em Databricks Runtime 14.3 LTS e acima, o senhor pode usar arquivos truststore e keystore em volumes Unity Catalog para autenticar em um Confluent Schema Registry. Atualize a configuração no exemplo anterior usando a seguinte sintaxe:

Scala
val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
"confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/truststore.jks",
"confluent.schema.registry.ssl.truststore.password" -> "truststorePassword",
"confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/keystore.jks",
"confluent.schema.registry.ssl.truststore.password" -> "keystorePassword",
"confluent.schema.registry.ssl.key.password" -> "keyPassword")

Usar o modo de evolução do esquema com from_avro

Em Databricks Runtime 14.2 e acima, o senhor pode usar o modo de evolução do esquema com from_avro. A ativação do modo de evolução do esquema faz com que o Job lance um UnknownFieldException após detectar a evolução do esquema. Databricks recomenda a configuração do Job com o modo de evolução do esquema para reiniciar automaticamente em caso de falha da tarefa. Consulte Considerações sobre produção para transmissão estruturada.

evolução do esquema é útil se o senhor espera que o esquema de seus dados de origem evolua ao longo do tempo e ingira todos os campos de sua fonte de dados. Se suas consultas já especificarem explicitamente quais campos devem ser consultados em sua fonte de dados, os campos adicionados serão ignorados independentemente da evolução do esquema.

Use a opção avroSchemaEvolutionMode para ativar a evolução do esquema. A tabela a seguir descreve as opções do modo de evolução do esquema:

Opção

Comportamento

none

padrão . Ignora a evolução do esquema e o trabalho continua.

restart

Lança um UnknownFieldException ao detectar a evolução do esquema. É necessário reiniciar o trabalho.

nota

O senhor pode alterar essa configuração entre os trabalhos de transmissão e reutilizar o mesmo ponto de verificação. A desativação da evolução do esquema pode resultar em colunas descartadas.

Configurar o modo de análise

É possível configurar o modo de análise para determinar se deseja falhar ou emitir registros nulos quando o modo de evolução do esquema estiver desativado e o esquema evoluir de forma não compatível com versões anteriores. Com as configurações de default, from_avro falha quando observa alterações de esquema incompatíveis.

Use a opção mode para especificar o modo de análise. A tabela a seguir descreve a opção para o modo de análise:

Opção

Comportamento

FAILFAST

padrão . Um erro de análise gera um SparkException com errorClass de MALFORMED_AVRO_MESSAGE.

PERMISSIVE

Um erro de análise é ignorado e um registro nulo é emitido.

nota

Com a evolução do esquema ativada, o site FAILFAST só lança exceções se um registro estiver corrompido.

Exemplo usando a evolução do esquema e definindo o modo de análise

O exemplo a seguir demonstra como ativar a evolução do esquema e especificar o modo de análise FAILFAST com um Confluent Schema Registry:

Scala
import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._

val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
"confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret",
"avroSchemaEvolutionMode" -> "restart",
"mode" -> "FAILFAST")

val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
// We read the "key" binary column from the subject "t-key" in the schema
// registry at schemaRegistryAddr. We provide schemaRegistryOptions,
// which has avroSchemaEvolutionMode -> "restart". This instructs from_avro
// to fail the query if the schema for the subject t-key evolves.
from_avro(
data = $"key",
subject = "t-key",
schemaRegistryAddress = schemaRegistryAddr,
options = schemaRegistryOptions.asJava).as("key"))