Structured Streaming patterns on Databricks
This contains notebooks and code samples for common patterns for working with Structured Streaming on Databricks.
Getting started with Structured Streaming
If you are brand new to Structured Streaming, see Run your first Structured Streaming workload.
Write to Cassandra as a sink for Structured Streaming in Python
Apache Cassandra is a distributed, low-latency, scalable, highly-available OLTP database.
Structured Streaming works with Cassandra through the Spark Cassandra Connector. This connector supports both RDD and DataFrame APIs, and it has native support for writing streaming data. *Important* You must use the corresponding version of the spark-cassandra-connector-assembly.
The following example connects to one or more hosts in a Cassandra database cluster. It also specifies connection configurations such as the checkpoint location and the specific keyspace and table names:
spark.conf.set("spark.cassandra.connection.host", "host1,host2")
df.writeStream \
.format("org.apache.spark.sql.cassandra") \
.outputMode("append") \
.option("checkpointLocation", "/path/to/checkpoint") \
.option("keyspace", "keyspace_name") \
.option("table", "table_name") \
.start()
Write to Azure Synapse Analytics using foreachBatch()
in Python
streamingDF.writeStream.foreachBatch()
allows you to reuse existing batch data writers to write the
output of a streaming query to Azure Synapse Analytics. See the foreachBatch documentation for details.
To run this example, you need the Azure Synapse Analytics connector. For details on the Azure Synapse Analytics connector, see Query data in Azure Synapse Analytics.
from pyspark.sql.functions import *
from pyspark.sql import *
def writeToSQLWarehouse(df, epochId):
df.write \
.format("com.databricks.spark.sqldw") \
.mode('overwrite') \
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
.option("forward_spark_azure_storage_credentials", "true") \
.option("dbtable", "my_table_in_dw_copy") \
.option("tempdir", "wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<your-directory-name>") \
.save()
spark.conf.set("spark.sql.shuffle.partitions", "1")
query = (
spark.readStream.format("rate").load()
.selectExpr("value % 10 as key")
.groupBy("key")
.count()
.toDF("key", "count")
.writeStream
.foreachBatch(writeToSQLWarehouse)
.outputMode("update")
.start()
)
Write to Amazon DynamoDB using foreach()
in Scala and Python
streamingDF.writeStream.foreach()
allows you to write the output of a streaming query to arbitrary locations.
Use Python
This example shows how to use streamingDataFrame.writeStream.foreach()
in Python to write to DynamoDB. The first step gets the DynamoDB boto resource. This example is written to use access_key
and secret_key
, but Databricks recommends that you use instance profiles. See Tutorial: Configure S3 access with an instance profile.
Define a few helper methods to create DynamoDB table for running the example.
table_name = "PythonForeachTest" def get_dynamodb(): import boto3 access_key = "<access key>" secret_key = "<secret key>" region = "<region name>" return boto3.resource('dynamodb', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name=region) def createTableIfNotExists(): ''' Create a DynamoDB table if it does not exist. This must be run on the Spark driver, and not inside foreach. ''' dynamodb = get_dynamodb() existing_tables = dynamodb.meta.client.list_tables()['TableNames'] if table_name not in existing_tables: print("Creating table %s" % table_name) table = dynamodb.create_table( TableName=table_name, KeySchema=[ { 'AttributeName': 'key', 'KeyType': 'HASH' } ], AttributeDefinitions=[ { 'AttributeName': 'key', 'AttributeType': 'S' } ], ProvisionedThroughput = { 'ReadCapacityUnits': 5, 'WriteCapacityUnits': 5 } ) print("Waiting for table to be ready") table.meta.client.get_waiter('table_exists').wait(TableName=table_name)
Define the classes and methods that writes to DynamoDB and then call them from
foreach
. There are two ways to specify your custom logic inforeach
.Use a function: This is the simple approach that can be used to write 1 row at a time. However, client/connection initialization to write a row will be done in every call.
def sendToDynamoDB_simple(row): ''' Function to send a row to DynamoDB. When used with `foreach`, this method is going to be called in the executor with the generated output rows. ''' # Create client object in the executor, # do not use client objects created in the driver dynamodb = get_dynamodb() dynamodb.Table(table_name).put_item( Item = { 'key': str(row['key']), 'count': row['count'] })
Use a class with
open
,process
, andclose
methods: This allows for a more efficient implementation where a client/connection is initialized and multiple rows can be written out.class SendToDynamoDB_ForeachWriter: ''' Class to send a set of rows to DynamoDB. When used with `foreach`, copies of this class is going to be used to write multiple rows in the executor. See the python docs for `DataStreamWriter.foreach` for more details. ''' def open(self, partition_id, epoch_id): # This is called first when preparing to send multiple rows. # Put all the initialization code inside open() so that a fresh # copy of this class is initialized in the executor where open() # will be called. self.dynamodb = get_dynamodb() return True def process(self, row): # This is called for each row after open() has been called. # This implementation sends one row at a time. # For further enhancements, contact the Spark+DynamoDB connector # team: https://github.com/audienceproject/spark-dynamodb self.dynamodb.Table(table_name).put_item( Item = { 'key': str(row['key']), 'count': row['count'] }) def close(self, err): # This is called after all the rows have been processed. if err: raise err
Invoke
foreach
in your streaming query with the above function or object.from pyspark.sql.functions import * spark.conf.set("spark.sql.shuffle.partitions", "1") query = ( spark.readStream.format("rate").load() .selectExpr("value % 10 as key") .groupBy("key") .count() .toDF("key", "count") .writeStream .foreach(SendToDynamoDB_ForeachWriter()) #.foreach(sendToDynamoDB_simple) // alternative, use one or the other .outputMode("update") .start() )
Use Scala
This example shows how to use streamingDataFrame.writeStream.foreach()
in Scala to write to DynamoDB.
To run this you will have to create a DynamoDB table that has a single string key named “value”.
Define an implementation of the
ForeachWriter
interface that performs the write.import org.apache.spark.sql.{ForeachWriter, Row} import com.amazonaws.AmazonServiceException import com.amazonaws.auth._ import com.amazonaws.services.dynamodbv2.AmazonDynamoDB import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder import com.amazonaws.services.dynamodbv2.model.AttributeValue import com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException import java.util.ArrayList import scala.collection.JavaConverters._ class DynamoDbWriter extends ForeachWriter[Row] { private val tableName = "<table name>" private val accessKey = "<aws access key>" private val secretKey = "<aws secret key>" private val regionName = "<region>" // This will lazily be initialized only when open() is called lazy val ddb = AmazonDynamoDBClientBuilder.standard() .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials(accessKey, secretKey))) .withRegion(regionName) .build() // // This is called first when preparing to send multiple rows. // Put all the initialization code inside open() so that a fresh // copy of this class is initialized in the executor where open() // will be called. // def open(partitionId: Long, epochId: Long) = { ddb // force the initialization of the client true } // // This is called for each row after open() has been called. // This implementation sends one row at a time. // A more efficient implementation can be to send batches of rows at a time. // def process(row: Row) = { val rowAsMap = row.getValuesMap(row.schema.fieldNames) val dynamoItem = rowAsMap.mapValues { v: Any => new AttributeValue(v.toString) }.asJava ddb.putItem(tableName, dynamoItem) } // // This is called after all the rows have been processed. // def close(errorOrNull: Throwable) = { ddb.shutdown() } }
Use the
DynamoDbWriter
to write a rate stream into DynamoDB.spark.readStream .format("rate") .load() .select("value") .writeStream .foreach(new DynamoDbWriter) .start()