Avro Data and Schema Registry(Scala)
Loading...

Schema Registry integration in Spark Structured Streaming

This notebook demonstrates how to use the from_avro/to_avro functions to read/write data from/to Kafka with Schema Registry support.

Run the following commands one by one while reading the insructions. Do not click Run All.

Step 1: Set up the environment.

%run ./setup
Warning: classes defined within packages cannot be redefined without a cluster restart. Compilation successful.
Warning: classes defined within packages cannot be redefined without a cluster restart. Compilation successful.
Executors = Set(10.0.236.27, 10.0.248.139, 10.0.250.171) Workers = List(10.0.236.27, 10.0.248.139, 10.0.250.171) ls: cannot access '/root/.ssh/id_rsa': No such file or directory executing command - bash /dbfs/home/streaming/benchmark/install-kafka.sh on host: 10.0.236.27 Warning: Permanently added '10.0.236.27' (ECDSA) to the list of known hosts. SUCCESS: command - bash /dbfs/home/streaming/benchmark/install-kafka.sh on host: 10.0.236.27 executing command - bash /dbfs/home/streaming/benchmark/install-kafka.sh on host: 10.0.248.139 Warning: Permanently added '10.0.248.139' (ECDSA) to the list of known hosts. SUCCESS: command - bash /dbfs/home/streaming/benchmark/install-kafka.sh on host: 10.0.248.139 executing command - bash /dbfs/home/streaming/benchmark/install-kafka.sh on host: 10.0.250.171 Warning: Permanently added '10.0.250.171' (ECDSA) to the list of known hosts. SUCCESS: command - bash /dbfs/home/streaming/benchmark/install-kafka.sh on host: 10.0.250.171 executing command - kafka/bin/zookeeper-server-start.sh -daemon kafka/config/zookeeper.properties on host: 10.0.236.27 SUCCESS: command - kafka/bin/zookeeper-server-start.sh -daemon kafka/config/zookeeper.properties on host: 10.0.236.27 executing command - bash /dbfs/home/streaming/benchmark/configure-kafka.sh 10.0.236.27 0 10.0.236.27 on host: 10.0.236.27 SUCCESS: command - bash /dbfs/home/streaming/benchmark/configure-kafka.sh 10.0.236.27 0 10.0.236.27 on host: 10.0.236.27 executing command - kafka/bin/kafka-server-start.sh -daemon kafka/config/server.properties on host: 10.0.236.27 SUCCESS: command - kafka/bin/kafka-server-start.sh -daemon kafka/config/server.properties on host: 10.0.236.27 executing command - schema-registry/bin/schema-registry-start -daemon schema-registry/etc/schema-registry/schema-registry.properties on host: 10.0.236.27 SUCCESS: command - schema-registry/bin/schema-registry-start -daemon schema-registry/etc/schema-registry/schema-registry.properties on host: 10.0.236.27 import sys.process._ kafkaCluster: com.databricks.spark.LocalKafka = com.databricks.spark.LocalKafka@2ec6a0aa
val schemaRegistryAddress = kafkaCluster.schemaRegistryAddress
val schemaRegistryClient = kafkaCluster.schemaRegistryClient
import org.apache.spark.sql.functions._
import org.apache.spark.sql.avro.functions._
schemaRegistryAddress: String = http://10.0.236.27:8081 schemaRegistryClient: org.spark_project.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient = org.spark_project.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient@6c449640 import org.apache.spark.sql.functions._ import org.apache.spark.sql.avro.functions._

Step 2: Register subjects to Schema Registry.

Here we register a string type subject "input1", and a struct type(<a int, b float>) subject "input2"

import org.apache.avro.{Schema, SchemaBuilder}

schemaRegistryClient.register("input1", SchemaBuilder.builder().stringType())
schemaRegistryClient.register("input2", new Schema.Parser().parse(
  s"""
    |{
    |  "type": "record",
    |  "name": "topLevelRecord",
    |  "fields": [
    |    {"name": "a", "type": "int"},
    |    {"name": "b", "type": "float"}
    |  ]
    |}
  """.stripMargin
))
import org.apache.avro.{Schema, SchemaBuilder} res3: Int = 2

Make sure these 2 subjects are successfully registered.

import scala.sys.process._
Seq("curl", "-X", "GET", s"$schemaRegistryAddress/subjects").!
% Total % Received % Xferd Average Speed Time Time Time Current Dload Upload Total Spent Left Speed 0 0 0 0 0 0 0 0 --:--:-- --:--:-- --:--:-- 0["input2","input1"] 100 19 100 19 0 0 300 0 --:--:-- --:--:-- --:--:-- 301 import scala.sys.process._ res4: Int = 0

Step 3: Read data from Kafka(empty now), call from_avro and display.

The schema is correctly detected thanks to Schema Registry.

val input1 = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", kafkaCluster.kafkaNodesString)
  .option("subscribe", "input1")
  .load()

val input2 = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", kafkaCluster.kafkaNodesString)
  .option("subscribe", "input2")
  .load()

val stringDataFrame = input1.select(from_avro($"value", "input1", schemaRegistryAddress))
val intFloatDataFrame = input2.select(from_avro($"value", "input2", schemaRegistryAddress))
input1: org.apache.spark.sql.DataFrame = [key: binary, value: binary ... 5 more fields] input2: org.apache.spark.sql.DataFrame = [key: binary, value: binary ... 5 more fields] stringDataFrame: org.apache.spark.sql.DataFrame = [from_avro(value): string] intFloatDataFrame: org.apache.spark.sql.DataFrame = [from_avro(value): struct<a: int, b: float>]

Step 4: write data to Kafka with to_avro and disaply

Display these 2 dataframes first, to start the streaming query.

display(stringDataFrame)
display_query_1(id: 4d3a455f-bcbf-4a3e-86ab-d8dc21fa4465)
Last updated: 1691 days ago
 
from_avro(value)
1
2
3
4
5
1
3
4
0
2

Showing all 5 rows.

display(intFloatDataFrame)
display_query_2(id: f8707c10-f2c6-4b0f-830f-9a2888cc75e7)
Last updated: 1691 days ago
 
from_avro(value)
1
2
3
4
5
6
7
8
9
10
{"a": 3, "b": 3}
{"a": 4, "b": 4}
{"a": 0, "b": 0}
{"a": 1, "b": 1}
{"a": 2, "b": 2}
{"a": -2147483647, "b": "NaN"}
{"a": -2147483646, "b": "NaN"}
{"a": -2147483645, "b": "NaN"}
{"a": 2147483647, "b": "NaN"}
{"a": -2147483648, "b": "NaN"}

Showing all 10 rows.