%run ./setup
Warning: classes defined within packages cannot be redefined without a cluster restart.
Compilation successful.
Warning: classes defined within packages cannot be redefined without a cluster restart.
Compilation successful.
Executors = Set(10.0.236.27, 10.0.248.139, 10.0.250.171)
Workers = List(10.0.236.27, 10.0.248.139, 10.0.250.171)
ls: cannot access '/root/.ssh/id_rsa': No such file or directory
executing command - bash /dbfs/home/streaming/benchmark/install-kafka.sh on host: 10.0.236.27
Warning: Permanently added '10.0.236.27' (ECDSA) to the list of known hosts.
SUCCESS: command - bash /dbfs/home/streaming/benchmark/install-kafka.sh on host: 10.0.236.27
executing command - bash /dbfs/home/streaming/benchmark/install-kafka.sh on host: 10.0.248.139
Warning: Permanently added '10.0.248.139' (ECDSA) to the list of known hosts.
SUCCESS: command - bash /dbfs/home/streaming/benchmark/install-kafka.sh on host: 10.0.248.139
executing command - bash /dbfs/home/streaming/benchmark/install-kafka.sh on host: 10.0.250.171
Warning: Permanently added '10.0.250.171' (ECDSA) to the list of known hosts.
SUCCESS: command - bash /dbfs/home/streaming/benchmark/install-kafka.sh on host: 10.0.250.171
executing command - kafka/bin/zookeeper-server-start.sh -daemon kafka/config/zookeeper.properties on host: 10.0.236.27
SUCCESS: command - kafka/bin/zookeeper-server-start.sh -daemon kafka/config/zookeeper.properties on host: 10.0.236.27
executing command - bash /dbfs/home/streaming/benchmark/configure-kafka.sh 10.0.236.27 0 10.0.236.27 on host: 10.0.236.27
SUCCESS: command - bash /dbfs/home/streaming/benchmark/configure-kafka.sh 10.0.236.27 0 10.0.236.27 on host: 10.0.236.27
executing command - kafka/bin/kafka-server-start.sh -daemon kafka/config/server.properties on host: 10.0.236.27
SUCCESS: command - kafka/bin/kafka-server-start.sh -daemon kafka/config/server.properties on host: 10.0.236.27
executing command - schema-registry/bin/schema-registry-start -daemon schema-registry/etc/schema-registry/schema-registry.properties on host: 10.0.236.27
SUCCESS: command - schema-registry/bin/schema-registry-start -daemon schema-registry/etc/schema-registry/schema-registry.properties on host: 10.0.236.27
import sys.process._
kafkaCluster: com.databricks.spark.LocalKafka = com.databricks.spark.LocalKafka@2ec6a0aa
val schemaRegistryAddress = kafkaCluster.schemaRegistryAddress val schemaRegistryClient = kafkaCluster.schemaRegistryClient import org.apache.spark.sql.functions._ import org.apache.spark.sql.avro.functions._
schemaRegistryAddress: String = http://10.0.236.27:8081
schemaRegistryClient: org.spark_project.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient = org.spark_project.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient@6c449640
import org.apache.spark.sql.functions._
import org.apache.spark.sql.avro.functions._
import org.apache.avro.{Schema, SchemaBuilder} schemaRegistryClient.register("input1", SchemaBuilder.builder().stringType()) schemaRegistryClient.register("input2", new Schema.Parser().parse( s""" |{ | "type": "record", | "name": "topLevelRecord", | "fields": [ | {"name": "a", "type": "int"}, | {"name": "b", "type": "float"} | ] |} """.stripMargin ))
import org.apache.avro.{Schema, SchemaBuilder}
res3: Int = 2
import scala.sys.process._ Seq("curl", "-X", "GET", s"$schemaRegistryAddress/subjects").!
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
0 0 0 0 0 0 0 0 --:--:-- --:--:-- --:--:-- 0["input2","input1"]
100 19 100 19 0 0 300 0 --:--:-- --:--:-- --:--:-- 301
import scala.sys.process._
res4: Int = 0
val input1 = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", kafkaCluster.kafkaNodesString) .option("subscribe", "input1") .load() val input2 = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", kafkaCluster.kafkaNodesString) .option("subscribe", "input2") .load() val stringDataFrame = input1.select(from_avro($"value", "input1", schemaRegistryAddress)) val intFloatDataFrame = input2.select(from_avro($"value", "input2", schemaRegistryAddress))
input1: org.apache.spark.sql.DataFrame = [key: binary, value: binary ... 5 more fields]
input2: org.apache.spark.sql.DataFrame = [key: binary, value: binary ... 5 more fields]
stringDataFrame: org.apache.spark.sql.DataFrame = [from_avro(value): string]
intFloatDataFrame: org.apache.spark.sql.DataFrame = [from_avro(value): struct<a: int, b: float>]
Schema Registry integration in Spark Structured Streaming
This notebook demonstrates how to use the
from_avro
/to_avro
functions to read/write data from/to Kafka with Schema Registry support.Run the following commands one by one while reading the insructions. Do not click Run All.
Last refresh: Never