Examples

Write to Cassandra using foreachBatch() in Scala

streamingDF.writeStream.foreachBatch() allows you to reuse existing batch data writers to write the output of a streaming query to Cassandra. The following notebook shows this by using the Spark Cassandra connector from Scala to write the key-value output of an aggregation query to Cassandra. See the foreachBatch documentation for details.

To run this example, you need to install the appropriate Cassandra Spark connector for your Spark version as a Maven library.

In this example, we create a table, and then start a Structured Streaming query to write to that table. We then use foreachBatch() to write the streaming output using a batch DataFrame connector.

import org.apache.spark.sql._
import org.apache.spark.sql.cassandra._

import com.datastax.spark.connector.cql.CassandraConnectorConf
import com.datastax.spark.connector.rdd.ReadConf
import com.datastax.spark.connector._


val host = "<ip address>"
val clusterName = "<cluster name>"
val keyspace = "<keyspace>"
val tableName = "<tableName>"

spark.setCassandraConf(clusterName, CassandraConnectorConf.ConnectionHostParam.option(host))
spark.readStream.format("rate").load()
  .selectExpr("value % 10 as key")
  .groupBy("key")
  .count()
  .toDF("key", "value")
  .writeStream
  .foreachBatch { (batchDF: DataFrame, batchId: Long) =>

    batchDF.write       // Use Cassandra batch data source to write streaming out
      .cassandraFormat(tableName, keyspace)
      .option("cluster", clusterName)
      .mode("append")
      .save()
  }
  .outputMode("update")
  .start()

Write to Azure SQL Data Warehouse using foreachBatch() in Python

streamingDF.writeStream.foreachBatch() allows you to reuse existing batch data writers to write the output of a streaming query to Azure SQL Data Warehouse. See the foreachBatch documentation for details.

To run this example, you need the Azure SQL Data Warehouse connector. For details on the Azure SQL Data Warehouse connector, see Azure SQL Data Warehouse.

from pyspark.sql.functions import *
from pyspark.sql import *

def writeToSQLWarehose(df, epochId):
  df.write \
    .format("com.databricks.spark.sqldw") \
    .mode('overwrite') \
    .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
    .option("forward_spark_azure_storage_credentials", "true") \
    .option("dbtable", "my_table_in_dw_copy") \
    .option("tempdir", "wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<your-directory-name>") \
    .save()

spark.conf.set("spark.sql.shuffle.partitions", "1")

query = (
  spark.readStream.format("rate").load()
    .selectExpr("value % 10 as key")
    .groupBy("key")
    .count()
    .toDF("key", "count")
    .writeStream
    .foreachBatch(writeToSQLWarehose)
    .outputMode("update")
    .start()
    )

Write to Amazon DynamoDB using foreach() in Scala and Python

streamingDF.writeStream.foreach() allows you to write the output of a streaming query to arbitrary locations.

Using Python

This example shows how to use streamingDataFrame.writeStream.foreach() in Python to write to DynamoDB. The first step gets the DynamoDB boto resource. This example is written to use access_key and secret_key, but Databricks recommends that you use Secure Access to S3 Buckets Using IAM Roles.

  1. Define a few helper methods to create DynamoDB table for running the example.

    table_name = "PythonForeachTest"
    
    def get_dynamodb():
      import boto3
    
      access_key = "<access key>"
      secret_key = "<secret key>"
      region = "<region name>"
      return boto3.resource('dynamodb',
                     aws_access_key_id=access_key,
                     aws_secret_access_key=secret_key,
                     region_name=region)
    
    def createTableIfNotExists():
        '''
        Create a DynamoDB table if it does not exist.
        This must be run on the Spark driver, and not inside foreach.
        '''
        dynamodb = get_dynamodb()
    
        existing_tables = dynamodb.meta.client.list_tables()['TableNames']
        if table_name not in existing_tables:
          print("Creating table %s" % table_name)
          table = dynamodb.create_table(
              TableName=table_name,
              KeySchema=[ { 'AttributeName': 'key', 'KeyType': 'HASH' } ],
              AttributeDefinitions=[ { 'AttributeName': 'key', 'AttributeType': 'S' } ],
              ProvisionedThroughput = { 'ReadCapacityUnits': 5, 'WriteCapacityUnits': 5 }
          )
    
          print("Waiting for table to be ready")
    
    table.meta.client.get_waiter('table_exists').wait(TableName=table_name)
    
  2. Define the classes and methods that writes to DynamoDB and then call them from foreach. There are two ways to specify your custom logic in foreach.

    • Use a function: This is the simple approach that can be used to write 1 row a time. However, client/connection initialization to write a row will be done in every call.

      def sendToDynamoDB_simple(row):
        '''
        Function to send a row to DynamoDB.
        When used with `foreach`, this method is going to be called in the executor
        with the generated output rows.
        '''
        # Create client object in the executor,
        # do not use client objects created in the driver
        dynamodb = get_dynamodb()
      
        dynamodb.Table(table_name).put_item(
            Item = { 'key': str(row['key']), 'count': row['count'] })
      
    • Use a class with open, process, and close methods: This allows for a more efficient implementation where a client/connection is initialized and multiple rows can be written out.

      class SendToDynamoDB_ForeachWriter:
        '''
        Class to send a set of rows to DynamoDB.
        When used with `foreach`, copies of this class is going to be used to write
        multiple rows in the executor. See the python docs for `DataStreamWriter.foreach`
        for more details.
        '''
      
        def open(self, partition_id, epoch_id):
          # This is called first when preparing to send multiple rows.
          # Put all the initialization code inside open() so that a fresh
          # copy of this class is initialized in the executor where open()
          # will be called.
          self.dynamodb = get_dynamodb()
          return True
      
        def process(self, row):
          # This is called for each row after open() has been called.
          # This implementation sends one row at a time.
          # A more efficient implementation can be to send batches of rows at a time.
          self.dynamodb.Table(table_name).put_item(
              Item = { 'key': str(row['key']), 'count': row['count'] })
      
        def close(self, err):
          # This is called after all the rows have been processed.
          if err:
            raise err
      
  3. Invoke foreach in your streaming query with the above function or object.

    from pyspark.sql.functions import *
    
    spark.conf.set("spark.sql.shuffle.partitions", "1")
    
    query = (
      spark.readStream.format("rate").load()
        .selectExpr("value % 10 as key")
        .groupBy("key")
        .count()
        .toDF("key", "count")
        .writeStream
        .foreach(SendToDynamoDB_ForeachWriter())
        #.foreach(sendToDynamoDB_simple)  // alternative, use one or the other
        .outputMode("update")
        .start()
    )
    

Using Scala

This example shows how to use streamingDataFrame.writeStream.foreach() in Scala to write to DynamoDB.

To run this you will have to create a DynamoDB table that has a single string key named “value”.

  1. Define an implementation of the ForeachWriter interface that performs the write.

     import org.apache.spark.sql.{ForeachWriter, Row}
     import com.amazonaws.AmazonServiceException;
     import com.amazonaws.auth._;
     import com.amazonaws.services.dynamodbv2.AmazonDynamoDB;
     import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder;
     import com.amazonaws.services.dynamodbv2.model.AttributeValue;
     import com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException;
     import java.util.ArrayList;
    
     import scala.collection.JavaConverters._
    
    
     class DynamoDbWriter extends ForeachWriter[Row] {
        private val tableName = "<table name>"
        private val accessKey = "<aws access key>"
        private val secretKey = "<aws secret key>"
        private val regionName = "<region>"
    
        // This will lazily be initialized only when open() is called
        lazy val ddb = AmazonDynamoDBClientBuilder.standard()
          .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials(accessKey, secretKey)))
          .withRegion(regionName)
          .build()
    
        //
        // This is called first when preparing to send multiple rows.
        // Put all the initialization code inside open() so that a fresh
        // copy of this class is initialized in the executor where open()
        // will be called.
        //
        def open(partitionId: Long, epochId: Long) = {
          ddb  // force the initialization of the client
          true
        }
    
        //
        // This is called for each row after open() has been called.
        // This implementation sends one row at a time.
        // A more efficient implementation can be to send batches of rows at a time.
        //
         def process(row: Row) = {
          val rowAsMap = row.getValuesMap(row.schema.fieldNames)
          val dynamoItem = rowAsMap.mapValues {
            v: Any => new AttributeValue(v.toString)
          }.asJava
    
         ddb.putItem(tableName, dynamoItem)
      }
    
      //
      // This is called after all the rows have been processed.
      //
      def close(errorOrNull: Throwable) = {
        ddb.shutdown()
      }
    }
    
  2. Use the DynamoDbWriter to write a rate stream into DynamoDB.

    spark.readStream
      .format("rate")
      .load()
      .select('value')
      .writeStream
      .foreach(new DynamoDbWriter)
      .start()
    

Amazon CloudTrail ETL

The following notebooks show how you can easily transform your Amazon CloudTrail logs from JSON into Parquet for efficient ad-hoc querying. See Real-time Streaming ETL with Structured Streaming for details.

ETL of Amazon CloudTrail Logs using Structured Streaming in Python

ETL of Amazon CloudTrail Logs using Structured Streaming in Scala

Stream-Stream Joins

These two notebooks show how to use stream-stream joins in Python and Scala.

Stream-Stream Joins in Python

Stream-Stream Joins in Scala