Ler e gravar buffers de protocolo

Databricks fornece suporte nativo para serialização e desserialização entre structs Apache Spark e buffers de protocolo (protobuf). O suporte ao Protobuf é implementado como um transformador Apache Spark DataFrame e pode ser usado com transmissão estruturada ou para operações de lotes.

Como desserializar e serializar buffers de protocolo

No Databricks Runtime 12.1e acima, você pode usar as funções from_protobuf e to_protobuf para serializar e desserializar dados. A serialização Protobuf é comumente usada em cargas de trabalho de transmissão.

A sintaxe básica das funções protobuf é semelhante às funções de leitura e gravação. Você deve importar essas funções antes de usar.

from_protobuf converte uma coluna binária em um struct e to_protobuf converte uma coluna struct em binário. Você deve fornecer um registro de esquema especificado com o argumento options ou um arquivo descritor identificado pelo argumento descFilePath .

from_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)

to_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)
// While using with Schema registry:
from_protobuf(data: Column, options: Map[String, String])

// Or with Protobuf descriptor file:
from_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])

// While using with Schema registry:
to_protobuf(data: Column, options: Map[String, String])

// Or with Protobuf descriptor file:
to_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])

Os exemplos a seguir ilustram o processamento de registros protobuf binários com from_protobuf() e a conversão de struct do Spark SQL em protobuf binário com to_protobuf().

Use protobuf com Confluent Schema Registry

O Databricks dá suporte ao uso do Confluent Schema Registry para definir Protobuf.

from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf

schema_registry_options = {
  "schema.registry.subject" : "app-events-value",
  "schema.registry.address" : "https://schema-registry:8081/"
}

# Convert binary Protobuf to SQL struct with from_protobuf():
proto_events_df = (
  input_df
    .select(
      from_protobuf("proto_bytes", options = schema_registry_options)
        .alias("proto_event")
    )
)

# Convert SQL struct to binary Protobuf with to_protobuf():
protobuf_binary_df = (
  proto_events_df
    .selectExpr("struct(name, id, context) as event")
    .select(
      to_protobuf("event", options = schema_registry_options)
        .alias("proto_bytes")
    )
)
import org.apache.spark.sql.protobuf.functions._
import scala.collection.JavaConverters._

val schemaRegistryOptions = Map(
    "schema.registry.subject" -> "app-events-value",
    "schema.registry.address" -> "https://schema-registry:8081/"
)

// Convert binary Protobuf to SQL struct with from_protobuf():
val protoEventsDF = inputDF
    .select(
        from_protobuf($"proto_bytes", options = schemaRegistryOptions.asJava)
            .as("proto_event")
    )

// Convert SQL struct to binary Protobuf with to_protobuf():
val protobufBinaryDF = protoEventsDF
    .selectExpr("struct(name, id, context) as event")
    .select(
        to_protobuf($"event", options = schemaRegistryOptions.asJava)
            .as("proto_bytes")
    )

Autenticar em um Registro de Esquema Confluente externo

Para autenticar em um Confluent Schema Registry externo, atualize suas opções de registro de esquema para incluir credenciais de autenticação e key de API .

schema_registry_options = {
    "schema.registry.subject" : "app-events-value",
    "schema.registry.address" : "https://remote-schema-registry-endpoint",
    "confluent.schema.registry.basic.auth.credentials.source" : "USER_INFO",
    "confluent.schema.registry.basic.auth.user.info" : "confluentApiKey:confluentApiSecret"
  }
val schemaRegistryOptions = Map(
      "schema.registry.subject" -> "app-events-value",
      "schema.registry.address" -> "https://remote-schema-registry-endpoint",
      "confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
      "confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret"
)

Usar arquivos truststore e keystore nos volumes do Unity Catalog

No Databricks Runtime 14.3 LTS e acima, o senhor pode usar arquivos truststore e keystore nos volumes do Unity Catalog para autenticar em um Confluent Schema Registry. Atualize as opções de registro do esquema de acordo com o exemplo a seguir:

schema_registry_options = {
    "schema.registry.subject" : "app-events-value",
    "schema.registry.address" : "https://remote-schema-registry-endpoint",
    "confluent.schema.registry.ssl.truststore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
    "confluent.schema.registry.ssl.truststore.password" : "<password>",
    "confluent.schema.registry.ssl.keystore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
    "confluent.schema.registry.ssl.keystore.password" : "<password>",
    "confluent.schema.registry.ssl.key.password" : "<password>"
  }
val schemaRegistryOptions = Map(
      "schema.registry.subject" -> "app-events-value",
      "schema.registry.address" -> "https://remote-schema-registry-endpoint",
      "confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
      "confluent.schema.registry.ssl.truststore.password" -> "<password>",
      "confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
      "confluent.schema.registry.ssl.keystore.password" -> "<password>",
      "confluent.schema.registry.ssl.key.password" -> "<password>"
)

Use Protobuf com um arquivo descritor

Você também pode fazer referência a um arquivo descritor protobuf que está disponível para seus clusters compute. Certifique-se de ter as permissões adequadas para ler o arquivo, dependendo de sua localização.

from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf

descriptor_file = "/path/to/proto_descriptor.desc"

proto_events_df = (
    input_df.select(
      from_protobuf(input_df.value, "BasicMessage", descFilePath=descriptor_file).alias("proto")
    )
)

proto_binary_df = (
  proto_events_df
    .select(
      to_protobuf(proto_events_df.proto, "BasicMessage", descriptor_file).alias("bytes")
    )
)
import org.apache.spark.sql.protobuf.functions._

val descriptorFile = "/path/to/proto_descriptor.desc"

val protoEventsDF = inputDF
  .select(
    from_protobuf($"value", "BasicMessage", descFilePath=descriptorFile).as("proto")
  )

val protoBytesDF = protoEventsDF
  .select(
    to_protobuf($"proto", "BasicMessage", descriptorFile).as("bytes")
  )

Opções suportadas em funções Protobuf

As opções a seguir são suportadas em funções Protobuf.

  • mode: determina como os erros durante a desserialização dos registros Protobuf são tratados. Os erros podem ser causados por vários tipos de registros malformados, incluindo uma incompatibilidade entre o esquema real do registro e o esquema esperado fornecido em from_protobuf().

    • Valores:

      • FAILFASTdefault): um erro é gerado quando um registro malformado é encontrado e a tarefa falha.

      • PERMISSIVE: Um NULL é retornado para registros malformados. Use esta opção com cuidado, pois ela pode resultar na eliminação de muitos registros. Isto é útil quando uma pequena fração dos registros na fonte está incorreta.

  • recursivo.fields.max.profundidade: Adiciona suporte para campos recursivos. Os esquemas Spark SQL não suportam campos recursivos. Quando esta opção não é especificada, campos recursivos não são permitidos. Para suportar campos recursivos em Protobufs, eles precisam ser expandidos até uma profundidade especificada.

    • Valores:

      • -1 (default): campos recursivos não são permitidos.

      • 0: Os campos recursivos são eliminados.

      • 1: Permite um único nível de recursão.

      • [2-10]: Especifique um limite para recursão múltipla, até 10 níveis.

        Definir um valor maior que 0 permite campos recursivos expandindo os campos aninhados até a profundidade configurada. Valores maiores que 10 não são permitidos para evitar a criação inadvertida de esquemas muito grandes. Se uma mensagem Protobuf tiver profundidade além do limite configurado, a estrutura Spark retornada será truncada após o limite de recursão.

    • Exemplo: Considere um Protobuf com o seguinte campo recursivo:

      message Person { string name = 1; Person friend = 2; }
      

      A seguir está uma lista do esquema final com valores diferentes para esta configuração:

      • Opção definida como 1: STRUCT<name: STRING>

      • Opção definida como 2: STRUCT<name STRING, friend: STRUCT<name: STRING>>

      • Opção definida como 3: STRUCT<name STRING, friend: STRUCT<name STRING, friend: STRUCT<name: STRING>>>

  • converter.quaisquer.campos.em.JSON: Esta opção permite converter campos Protobuf Any em JSON. Este recurso deve ser habilitado com cuidado. A conversão e o processamento de JSON são ineficientes. Além disso, o campo strings JSON perde a segurança do esquema Protobuf, tornando o processamento downstream sujeito a erros.

    • Valores:

      • Falso (default): Em tempo de execução, esses campos curinga podem conter mensagens Protobuf arbitrárias como dados binários. Por default esses campos são tratados como uma mensagem normal do Protobuf. Possui dois campos com esquema (STRUCT<type_url: STRING, value: BINARY>). Por default, o campo binário value não é interpretado de forma alguma. Mas os dados binários podem não ser convenientes na prática para funcionar em algumas aplicações.

      • True: definir esse valor como True permite a conversão de campos Any em strings JSON em tempo de execução. Com esta opção, o binário é analisado e a mensagem Protobuf é desserializada em strings JSON.

    • Exemplo: Considere dois tipos de Protobuf definidos da seguinte forma:

      message ProtoWithAny {
         string event_name = 1;
         google.protobuf.Any details = 2;
      }
      
      message Person {
         string name = 1;
         int32 id = 2;
      }
      

      Com esta opção habilitada, o esquema para from_protobuf("col", messageName ="ProtoWithAny") seria: STRUCT<event_name: STRING, details: STRING>.

      No tempo de execução, se o campo details contiver a mensagem Protobuf Person , o valor retornado será semelhante a este: ('click', '{"@type":"type.googleapis.com/...ProtoWithAny","name":"Mario","id":100}').

    • Requisitos:

      • As definições para todos os tipos de Protobuf possíveis usados nos campos Any devem estar disponíveis no arquivo descritor de Protobuf passado para from_protobuf().

      • Se Any Protobuf não for encontrado, isso resultará em um erro para esse registro.

      • Atualmente, esse recurso não é compatível com registro de esquema.

  • emitir.default.values: Habilita a renderização de campos com valores zero ao desserializar Protobuf para uma estrutura Spark. Esta opção deve ser usada com moderação. Geralmente não é aconselhável depender de diferenças tão sutis na semântica.

    • Valores

      • Falso (default): Quando um campo está vazio no Protobuf serializado, o campo resultante na estrutura Spark é nulo por default . É mais simples não ativar esta opção e tratar null como valor default .

      • Verdadeiro: Quando esta opção está habilitada, tais campos são preenchidos com os valores default correspondentes.

    • Exemplo: Considere o seguinte Protobuf com o Protobuf construído como Person(age=0, middle_name=""):

      syntax = "proto3";
      
      message Person {
         string name = 1;
         int64 age = 2;
         optional string middle_name = 3;
         optional int64 salary = 4;
      }
      
      • Com esta opção definida como False, a estrutura do Spark após chamar from_protobuf() seria toda nula: {"name": null, "age": null, "middle_name": "", "salary": null}. Embora dois campos (age e middle_name) tenham valores definidos, o Protobuf não os inclui no formato wire, pois são valores default .

      • Com esta opção definida como True, a estrutura do Spark após chamar from_protobuf() seria: {"name": "", "age": 0, "middle_name": "", "salary": null}. O campo salary permanece nulo porque foi explicitamente declarado optional e não está definido no registro de entrada.

  • enums.as.ints: Quando habilitados, os campos enum no Protobuf são renderizados como campos inteiros no Spark.

    • Valores

      • Falso (default)

      • Verdadeiro: quando habilitado, os campos enum no Protobuf são renderizados como campos inteiros no Spark.

    • Exemplo: Considere o seguinte Protobuf:

      syntax = "proto3";
      
      message Person {
         enum Job {
           NONE = 0;
           ENGINEER = 1;
           DOCTOR = 2;
           NURSE = 3;
         }
         Job job = 1;
      }
      

      Dada uma mensagem Protobuf como Person(job = ENGINEER):

      • Com esta opção desabilitada, a estrutura Spark correspondente seria {"job": "ENGINEER"}.

      • Com esta opção habilitada, a estrutura Spark correspondente seria {"job": 1}.

      Observe que o esquema para esses campos é diferente em cada caso (inteiro em vez de default strings). Tal alteração pode afetar o esquema das tabelas downstream.

Opções de registro de esquema

As opções de registro de esquema a seguir são relevantes ao usar o registro de esquema com funções Protobuf.

  • esquema.registry.subject

    • Obrigatório

    • Especifica o assunto do esquema no Schema Registry, como “evento do cliente”

  • esquema.registry.address

    • Obrigatório

    • URL para registro de esquema, como https://schema-registry.example.com:8081

  • esquema.registry.protobuf.name

    • Opcional

    • default: <NONE>.

    • Uma entrada de registro de esquema para um assunto pode conter diversas definições de Protobuf, assim como um único arquivo proto . Quando esta opção não é especificada, o primeiro Protobuf é usado para o esquema. Especifique o nome da mensagem Protobuf quando não for a primeira da entrada. Por exemplo, considere uma entrada com duas definições do Protobuf: “Pessoa” e “Local”, nesta ordem. Se a transmissão corresponder a “Local” em vez de “Pessoa”, defina esta opção como “Local” (ou seu nome completo incluindo o pacote “com.example.protos.Location”).

  • esquema.registry.evolução do esquema.mode

    • default: “reiniciar”.

    • Modos suportados:

      • "reiniciar"

      • "nenhum"

    • Esta opção define o modo de evolução do esquema para from_protobuf(). No início de uma query, o Spark registra o ID do esquema mais recente para o assunto fornecido. Isso determina o esquema para from_protobuf(). Um novo esquema pode ser publicado no registro de esquema após o início da query . Quando um ID de esquema mais recente é notado em um registro recebido, isso indica uma alteração no esquema. Esta opção determina como tal alteração no esquema é tratada:

      • restart (default): aciona um UnknownFieldException quando um ID de esquema mais recente é detectado. Isso encerra a query. A Databricks recomenda configurar o fluxo de trabalho para reiniciar em caso de falha query para captar alterações de esquema.

      • none: as alterações no ID do esquema são ignoradas. Os registros com schema-id mais recente são analisados com o mesmo esquema que foi observado no início da query. Espera-se que as definições mais recentes do Protobuf sejam compatíveis com versões anteriores e os novos campos sejam ignorados.

  • confluent.schema.registry.<schema-registy-client-option>

    • Opcional

    • O registro de esquema se conecta ao registro de esquema Confluent usando o cliente Confluent Schema Registry. Quaisquer opções de configuração suportadas pelo cliente podem ser especificadas com o prefixo “confluent.schema.registry”. Por exemplo, as duas configurações a seguir fornecem credenciais de autenticação “USER_INFO”:

      • “confluent.schema.registry.basic.auth.credentials.source”: 'INFORMAÇÃO DE USUÁRIO'

      • “confluent.schema.registry.basic.auth.user.info”: “<KEY> : <SECRET>