Transmissão de leitura e gravação Avro data
Apache Avro é um sistema de serialização de dados comumente usado no mundo da transmissão. Uma solução típica é colocar os dados no formato Avro em Apache Kafka, metadados no Confluent Schema Registry e, em seguida, executar consultas com uma estrutura de transmissão que se conecta a Kafka e ao Schema Registry.
Databricks suporta as funções from_avro
e to_avro
para criar pipeline de transmissão com dados Avro em Kafka e metadados no Schema Registry. A função to_avro
codifica uma coluna como binária no formato Avro e from_avro
decodifica dados binários Avro em uma coluna. Ambas as funções transformam uma coluna em outra coluna, e o tipo de dados SQL de entrada/saída pode ser um tipo complexo ou um tipo primitivo.
As funções from_avro
e to_avro
:
- Estão disponíveis em Python, Scala e Java.
- Pode ser passado para as funções SQL em consultas de lotes e transmissão.
Consulte também Avro file fonte de dados.
Exemplo de esquema especificado manualmente
Semelhante a from_json e to_json, você pode usar from_avro
e to_avro
com qualquer coluna binária. O senhor pode especificar o esquema Avro manualmente, como no exemplo a seguir:
import org.apache.spark.sql.avro.functions._
import org.apache.avro.SchemaBuilder
// When reading the key and value of a Kafka topic, decode the
// binary (Avro) data into structured data.
// The schema of the resulting DataFrame is: <key: string, value: int>
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro($"key", SchemaBuilder.builder().stringType()).as("key"),
from_avro($"value", SchemaBuilder.builder().intType()).as("value"))
// Convert structured data to binary from string (key column) and
// int (value column) and save to a Kafka topic.
dataDF
.select(
to_avro($"key").as("key"),
to_avro($"value").as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()
Exemplo de JSONFormatSchema
O senhor também pode especificar um esquema como uma cadeia de caracteres JSON. Por exemplo, se /tmp/user.avsc
for:
{
"namespace": "example.avro",
"type": "record",
"name": "User",
"fields": [
{ "name": "name", "type": "string" },
{ "name": "favorite_color", "type": ["string", "null"] }
]
}
O senhor pode criar um JSON strings:
from pyspark.sql.avro.functions import from_avro, to_avro
jsonFormatSchema = open("/tmp/user.avsc", "r").read()
Em seguida, use o esquema em from_avro
:
# 1. Decode the Avro data into a struct.
# 2. Filter by column "favorite_color".
# 3. Encode the column "name" in Avro format.
output = df\
.select(from_avro("value", jsonFormatSchema).alias("user"))\
.where('user.favorite_color == "red"')\
.select(to_avro("user.name").alias("value"))
Exemplo com Schema Registry
Se o seu clustering tiver um serviço Schema Registry, from_avro
poderá trabalhar com ele para que o senhor não precise especificar o esquema Avro manualmente.
O exemplo a seguir demonstra a leitura de um tópico "t" do site Kafka, supondo que o key e o valor já estejam registrados no Schema Registry como assuntos "t-key" e "t-value" dos tipos STRING
e INT
:
import org.apache.spark.sql.avro.functions._
val schemaRegistryAddr = "https://myhost:8081"
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro(data = $"key", subject = "t-key", schemaRegistryAddress = schemaRegistryAddr).as("key"),
from_avro(data = $"value", subject = "t-value", schemaRegistryAddress = schemaRegistryAddr).as("value"))
Para to_avro
, o esquema default output Avro pode não corresponder ao esquema do assunto de destino no serviço Schema Registry pelos seguintes motivos:
- O mapeamento do tipo Spark SQL para o esquema Avro não é um para um. Consulte Tipos compatíveis com a conversão Spark SQL -> Avro.
- Se o esquema Avro de saída convertido for do tipo registro, o nome do registro será
topLevelRecord
e não haverá namespace em default.
Se o esquema de saída default de to_avro
corresponder ao esquema do assunto de destino, o senhor poderá fazer o seguinte:
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()
Caso contrário, você deve fornecer o esquema do assunto de destino na função to_avro
:
// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.start()
Autentique-se em um Confluent Schema Registry externo
Em Databricks Runtime 12.2 LTS e acima, o senhor pode se autenticar em um Confluent Schema Registry externo. Os exemplos a seguir demonstram como configurar as opções de registro do esquema para incluir credenciais de autenticação e a chave API.
- Scala
- Python
import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._
val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
"confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret")
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro(data = $"key", subject = "t-key", schemaRegistryAddress = schemaRegistryAddr, options = schemaRegistryOptions.asJava).as("key"),
from_avro(data = $"value", subject = "t-value", schemaRegistryAddress = schemaRegistryAddr, options = schemaRegistryOptions.asJava).as("value"))
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
// The Avro schema of subject "t-value" in JSON string format.
val avroSchema = ...
// The converted data is saved to Kafka as a Kafka topic "t".
dataDF
.select(
to_avro($"key", lit("t-key"), schemaRegistryAddr, schemaRegistryOptions.asJava).as("key"),
to_avro($"value", lit("t-value"), schemaRegistryAddr, schemaRegistryOptions.asJava, avroSchema).as("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro
schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
"confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
"confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}"
}
df = (spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro(
data = col("key"),
jsonFormatSchema = None,
options = schema_registry_options,
subject = "t-key",
schemaRegistryAddress = schema_registry_address
).alias("key"),
from_avro(
data = col("value"),
jsonFormatSchema = None,
options = schema_registry_options,
subject = "t-value",
schemaRegistryAddress = schema_registry_address
).alias("value")
)
)
# The converted data is saved to Kafka as a Kafka topic "t".
data_df
.select(
to_avro(
data = col("key"),
subject = lit("t-key"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options
).alias("key"),
to_avro(
data = col("value"),
subject = lit("t-value"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options
).alias("value")
)
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
# The Avro schema of subject "t-value" in JSON string format.
avro_schema = ...
# The converted data is saved to Kafka as a Kafka topic "t".
data_df
.select(
to_avro(
data = col("key"),
subject = lit("t-key"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options
).alias("key"),
to_avro(
data = col("value"),
subject = lit("t-value"),
schemaRegistryAddress = schema_registry_address,
options = schema_registry_options,
jsonFormatSchema = avro_schema).alias("value"))
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("topic", "t")
.save()
Usar arquivos truststore e keystore nos volumes do Unity Catalog
Em Databricks Runtime 14.3 LTS e acima, o senhor pode usar arquivos truststore e keystore em volumes Unity Catalog para autenticar em um Confluent Schema Registry. Atualize a configuração no exemplo anterior usando a seguinte sintaxe:
val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
"confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/truststore.jks",
"confluent.schema.registry.ssl.truststore.password" -> "truststorePassword",
"confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/keystore.jks",
"confluent.schema.registry.ssl.truststore.password" -> "keystorePassword",
"confluent.schema.registry.ssl.key.password" -> "keyPassword")
Usar o modo de evolução do esquema com from_avro
Em Databricks Runtime 14.2 e acima, o senhor pode usar o modo de evolução do esquema com from_avro
. A ativação do modo de evolução do esquema faz com que o Job lance um UnknownFieldException
após detectar a evolução do esquema. Databricks recomenda a configuração do Job com o modo de evolução do esquema para reiniciar automaticamente em caso de falha da tarefa. Consulte Considerações sobre produção para transmissão estruturada.
evolução do esquema é útil se o senhor espera que o esquema de seus dados de origem evolua ao longo do tempo e ingira todos os campos de sua fonte de dados. Se suas consultas já especificarem explicitamente quais campos devem ser consultados em sua fonte de dados, os campos adicionados serão ignorados independentemente da evolução do esquema.
Use a opção avroSchemaEvolutionMode
para ativar a evolução do esquema. A tabela a seguir descreve as opções do modo de evolução do esquema:
Opção | Comportamento |
---|---|
| padrão . Ignora a evolução do esquema e o trabalho continua. |
| Lança um |
O senhor pode alterar essa configuração entre os trabalhos de transmissão e reutilizar o mesmo ponto de verificação. A desativação da evolução do esquema pode resultar em colunas descartadas.
Configurar o modo de análise
É possível configurar o modo de análise para determinar se deseja falhar ou emitir registros nulos quando o modo de evolução do esquema estiver desativado e o esquema evoluir de forma não compatível com versões anteriores. Com as configurações de default, from_avro
falha quando observa alterações de esquema incompatíveis.
Use a opção mode
para especificar o modo de análise. A tabela a seguir descreve a opção para o modo de análise:
Opção | Comportamento |
---|---|
| padrão . Um erro de análise gera um |
| Um erro de análise é ignorado e um registro nulo é emitido. |
Com a evolução do esquema ativada, o site FAILFAST
só lança exceções se um registro estiver corrompido.
Exemplo usando a evolução do esquema e definindo o modo de análise
O exemplo a seguir demonstra como ativar a evolução do esquema e especificar o modo de análise FAILFAST
com um Confluent Schema Registry:
- Scala
- Python
import org.apache.spark.sql.avro.functions._
import scala.collection.JavaConverters._
val schemaRegistryAddr = "https://confluent-schema-registry-endpoint"
val schemaRegistryOptions = Map(
"confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret",
"avroSchemaEvolutionMode" -> "restart",
"mode" -> "FAILFAST")
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
// We read the "key" binary column from the subject "t-key" in the schema
// registry at schemaRegistryAddr. We provide schemaRegistryOptions,
// which has avroSchemaEvolutionMode -> "restart". This instructs from_avro
// to fail the query if the schema for the subject t-key evolves.
from_avro(
data = $"key",
subject = "t-key",
schemaRegistryAddress = schemaRegistryAddr,
options = schemaRegistryOptions.asJava).as("key"))
from pyspark.sql.functions import col, lit
from pyspark.sql.avro.functions import from_avro, to_avro
schema_registry_address = "https://confluent-schema-registry-endpoint"
schema_registry_options = {
"confluent.schema.registry.basic.auth.credentials.source": 'USER_INFO',
"confluent.schema.registry.basic.auth.user.info": f"{key}:{secret}",
"avroSchemaEvolutionMode": "restart",
"mode": "FAILFAST",
}
df = (spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", servers)
.option("subscribe", "t")
.load()
.select(
from_avro(
data = col("key"),
jsonFormatSchema = None,
options = schema_registry_options,
subject = "t-key",
schemaRegistryAddress = schema_registry_address
).alias("key")
)
)