Loading...

Schema Registry integration in Spark Structured Streaming

This notebook demonstrates how to use the from_avro/to_avro functions to read/write data from/to Kafka with Schema Registry support.

Run the following commands one by one while reading the insructions. Do not click Run All.

Step 1: Set up the environment.

%run ./setup
Warning: classes defined within packages cannot be redefined without a cluster restart. Compilation successful.
Warning: classes defined within packages cannot be redefined without a cluster restart. Compilation successful.
Executors = Set(10.0.236.27, 10.0.248.139, 10.0.250.171) Workers = List(10.0.236.27, 10.0.248.139, 10.0.250.171) ls: cannot access '/root/.ssh/id_rsa': No such file or directory executing command - bash /dbfs/home/streaming/benchmark/install-kafka.sh on host: 10.0.236.27 Warning: Permanently added '10.0.236.27' (ECDSA) to the list of known hosts. SUCCESS: command - bash /dbfs/home/streaming/benchmark/install-kafka.sh on host: 10.0.236.27 executing command - bash /dbfs/home/streaming/benchmark/install-kafka.sh on host: 10.0.248.139 Warning: Permanently added '10.0.248.139' (ECDSA) to the list of known hosts. SUCCESS: command - bash /dbfs/home/streaming/benchmark/install-kafka.sh on host: 10.0.248.139 executing command - bash /dbfs/home/streaming/benchmark/install-kafka.sh on host: 10.0.250.171 Warning: Permanently added '10.0.250.171' (ECDSA) to the list of known hosts. SUCCESS: command - bash /dbfs/home/streaming/benchmark/install-kafka.sh on host: 10.0.250.171 executing command - kafka/bin/zookeeper-server-start.sh -daemon kafka/config/zookeeper.properties on host: 10.0.236.27 SUCCESS: command - kafka/bin/zookeeper-server-start.sh -daemon kafka/config/zookeeper.properties on host: 10.0.236.27 executing command - bash /dbfs/home/streaming/benchmark/configure-kafka.sh 10.0.236.27 0 10.0.236.27 on host: 10.0.236.27 SUCCESS: command - bash /dbfs/home/streaming/benchmark/configure-kafka.sh 10.0.236.27 0 10.0.236.27 on host: 10.0.236.27 executing command - kafka/bin/kafka-server-start.sh -daemon kafka/config/server.properties on host: 10.0.236.27 SUCCESS: command - kafka/bin/kafka-server-start.sh -daemon kafka/config/server.properties on host: 10.0.236.27 executing command - schema-registry/bin/schema-registry-start -daemon schema-registry/etc/schema-registry/schema-registry.properties on host: 10.0.236.27 SUCCESS: command - schema-registry/bin/schema-registry-start -daemon schema-registry/etc/schema-registry/schema-registry.properties on host: 10.0.236.27 import sys.process._ kafkaCluster: com.databricks.spark.LocalKafka = com.databricks.spark.LocalKafka@2ec6a0aa
val schemaRegistryAddress = kafkaCluster.schemaRegistryAddress
val schemaRegistryClient = kafkaCluster.schemaRegistryClient
import org.apache.spark.sql.functions._
import org.apache.spark.sql.avro.functions._
schemaRegistryAddress: String = http://10.0.236.27:8081 schemaRegistryClient: org.spark_project.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient = org.spark_project.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient@6c449640 import org.apache.spark.sql.functions._ import org.apache.spark.sql.avro.functions._

Step 2: Register subjects to Schema Registry.

Here we register a string type subject "input1", and a struct type(<a int, b float>) subject "input2"

import org.apache.avro.{Schema, SchemaBuilder}

schemaRegistryClient.register("input1", SchemaBuilder.builder().stringType())
schemaRegistryClient.register("input2", new Schema.Parser().parse(
  s"""
    |{
    |  "type": "record",
    |  "name": "topLevelRecord",
    |  "fields": [
    |    {"name": "a", "type": "int"},
    |    {"name": "b", "type": "float"}
    |  ]
    |}
  """.stripMargin
))
import org.apache.avro.{Schema, SchemaBuilder} res3: Int = 2

Make sure these 2 subjects are successfully registered.

import scala.sys.process._
Seq("curl", "-X", "GET", s"$schemaRegistryAddress/subjects").!
% Total % Received % Xferd Average Speed Time Time Time Current Dload Upload Total Spent Left Speed 0 0 0 0 0 0 0 0 --:--:-- --:--:-- --:--:-- 0["input2","input1"] 100 19 100 19 0 0 300 0 --:--:-- --:--:-- --:--:-- 301 import scala.sys.process._ res4: Int = 0

Step 3: Read data from Kafka(empty now), call from_avro and display.

The schema is correctly detected thanks to Schema Registry.

val input1 = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", kafkaCluster.kafkaNodesString)
  .option("subscribe", "input1")
  .load()

val input2 = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", kafkaCluster.kafkaNodesString)
  .option("subscribe", "input2")
  .load()

val stringDataFrame = input1.select(from_avro($"value", "input1", schemaRegistryAddress))
val intFloatDataFrame = input2.select(from_avro($"value", "input2", schemaRegistryAddress))
input1: org.apache.spark.sql.DataFrame = [key: binary, value: binary ... 5 more fields] input2: org.apache.spark.sql.DataFrame = [key: binary, value: binary ... 5 more fields] stringDataFrame: org.apache.spark.sql.DataFrame = [from_avro(value): string] intFloatDataFrame: org.apache.spark.sql.DataFrame = [from_avro(value): struct<a: int, b: float>]

Step 4: write data to Kafka with to_avro and disaply

Display these 2 dataframes first, to start the streaming query.

display(stringDataFrame)
display_query_1(id: 4d3a455f-bcbf-4a3e-86ab-d8dc21fa4465)
Last updated: 1692 days ago
 
from_avro(value)
1
2
3
4
5
1
3
4
0
2

Showing all 5 rows.

display(intFloatDataFrame)
display_query_2(id: f8707c10-f2c6-4b0f-830f-9a2888cc75e7)
Last updated: 1692 days ago
 
from_avro(value)
1
2
3
4
5
6
7
8
9
10
{"a": 3, "b": 3}
{"a": 4, "b": 4}
{"a": 0, "b": 0}
{"a": 1, "b": 1}
{"a": 2, "b": 2}
{"a": -2147483647, "b": "NaN"}
{"a": -2147483646, "b": "NaN"}
{"a": -2147483645, "b": "NaN"}
{"a": 2147483647, "b": "NaN"}
{"a": -2147483648, "b": "NaN"}

Showing all 10 rows.

Write data to Kafka. After running the below commands, go back and see the display result.

spark.range(5)
  .select('id.cast("STRING").as('str))
  .select(to_avro('str, lit("input1"), schemaRegistryAddress).as('value))
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", kafkaCluster.kafkaNodesString)
  .option("topic", "input1")
  .save()

spark.range(5)
  .select(struct('id.cast("int").as('a), 'id.cast("float").as('b)).as('struct))
  .select(to_avro('struct, lit("input2"), schemaRegistryAddress).as('value))
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", kafkaCluster.kafkaNodesString)
  .option("topic", "input2")
  .save()

Step 5: Change the subject schema in Schema Registry.

Change from <a int, b float> to <a long, b double>, which is backward compatible.

import org.apache.avro.{Schema, SchemaBuilder}

schemaRegistryClient.register("input2", new Schema.Parser().parse(
  s"""
    |{
    |  "type": "record",
    |  "name": "topLevelRecord",
    |  "fields": [
    |    {"name": "a", "type": "long"},
    |    {"name": "b", "type": "double"}
    |  ]
    |}
  """.stripMargin
))
import org.apache.avro.{Schema, SchemaBuilder} res9: Int = 3

Step 6: Create a new stream to display data from Kafka.

The schema is correctly detected as <a bigint, b double>.

val longDoubleDataFrame = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", kafkaCluster.kafkaNodesString)
  .option("subscribe", "input2")
  .option("startingoffsets", "earliest")
  .load()
  .select(from_avro($"value", "input2", schemaRegistryAddress))
longDoubleDataFrame: org.apache.spark.sql.DataFrame = [from_avro(value): struct<a: bigint, b: double>]

The new stream can correctly display the old data that were wriiten as <a int, b float>.

display(longDoubleDataFrame)
display_query_3(id: 73391fc9-ab95-486e-85de-673e52792b4c)
Last updated: 1692 days ago
 
from_avro(value)
1
2
3
4
5
6
7
8
9
10
{"a": 3, "b": 3}
{"a": 4, "b": 4}
{"a": 0, "b": 0}
{"a": 1, "b": 1}
{"a": 2, "b": 2}
{"a": 2147483649, "b": 11.9999999999999}
{"a": 2147483650, "b": 12.9999999999999}
{"a": 2147483651, "b": 13.9999999999999}
{"a": 2147483647, "b": 9.9999999999999}
{"a": 2147483648, "b": 10.9999999999999}

Showing all 10 rows.

Let's write some new data, that are really long and double. Go back and see the display result.

spark.range(5)
  .select(struct(('id.cast("long") + Int.MaxValue).as('a), ('id.cast("double") + 9.9999999999999).as('b)).as('struct))
  .select(to_avro('struct, lit("input2"), schemaRegistryAddress).as('value))
  .write
  .format("kafka")
  .option("kafka.bootstrap.servers", kafkaCluster.kafkaNodesString)
  .option("topic", "input2")
  .save()

from_avro/to_avro can be used in batch queries as well.

val staticDF = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", kafkaCluster.kafkaNodesString)
  .option("subscribe", "input2")
  .option("startingoffsets", "earliest")
  .load()
  .select(from_avro($"value", "input2", schemaRegistryAddress))
display(staticDF)

Showing all 10 rows.