Databricks での構造化ストリーミング パターン

これには、Databricks で構造化ストリーミングを操作するための一般的なパターンのノートブックとコード サンプルが含まれています。

構造化ストリーミングの使用を開始する

構造化ストリーミングを初めて使用する場合は、「 最初の構造化ストリーミングワークロードを実行する」を参照してください。

Pythonでの構造化ストリーミングのシンクとして Cassandra に書き込む

Apache Cassandra は、分散、低待機時間、スケーラブル、高可用性の OLTP データベースです。

構造化ストリーミングは、 Spark Cassandra コネクタを介して Cassandra と連携します。 このコネクタは、RDD と DataFrame APIsの両方をサポートし、ストリーミング データの書き込みをネイティブにサポートしています。 *重要*対応するバージョンの スパークCassandraコネクタアセンブリを使用する必要があります。

次の例では、Cassandra データベース クラスター内の 1 つ以上のホストに接続します。 また、チェックポイントの場所や特定のキースペース名、テーブル名などの接続構成も指定します。

spark.conf.set("spark.cassandra.connection.host", "host1,host2")

df.writeStream \
  .format("org.apache.spark.sql.cassandra") \
  .outputMode("append") \
  .option("checkpointLocation", "/path/to/checkpoint") \
  .option("keyspace", "keyspace_name") \
  .option("table", "table_name") \
  .start()

foreachBatch()をPythonで使用したAzure Synapse Analyticsへの書き込み

streamingDF.writeStream.foreachBatch() を使用すると、既存のバッチ データ ライターを再利用して、ストリーミングの出力を Azure Synapse アナリティクスに書き込むことができます。 詳細については、 foreachBatch のドキュメント を参照してください。

この例を実行するには、Azure Synapse Analytics コネクタが必要です。 Azure Synapse Analytics コネクタの詳細については、「 Azure Synapse Analytics のクエリー データ」を参照してください。

from pyspark.sql.functions import *
from pyspark.sql import *

def writeToSQLWarehouse(df, epochId):
  df.write \
    .format("com.databricks.spark.sqldw") \
    .mode('overwrite') \
    .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
    .option("forward_spark_azure_storage_credentials", "true") \
    .option("dbtable", "my_table_in_dw_copy") \
    .option("tempdir", "wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<your-directory-name>") \
    .save()

spark.conf.set("spark.sql.shuffle.partitions", "1")

query = (
  spark.readStream.format("rate").load()
    .selectExpr("value % 10 as key")
    .groupBy("key")
    .count()
    .toDF("key", "count")
    .writeStream
    .foreachBatch(writeToSQLWarehouse)
    .outputMode("update")
    .start()
    )

Scala および Python のforeach() を使用して Amazon DynamoDB に書き込む

streamingDF.writeStream.foreach() ストリーミングクエリーの出力を任意の場所に書き込むことができます。

Pythonを使用する

この例では、Python で streamingDataFrame.writeStream.foreach() を使用して DynamoDB に書き込む方法を示します。 最初のステップでは、DynamoDB Boto リソースを取得します。 この例は、 access_keysecret_keyを使用するように記述されていますが、Databricks ではインスタンスプロファイルを使用することをお勧めします。 「 チュートリアル: インスタンスプロファイルを使用して S3 アクセスを設定する」を参照してください。

  1. 例を実行するための DynamoDB テーブルを作成するためのヘルパーメソッドをいくつか定義します。

    table_name = "PythonForeachTest"
    
    def get_dynamodb():
      import boto3
    
      access_key = "<access key>"
      secret_key = "<secret key>"
      region = "<region name>"
      return boto3.resource('dynamodb',
                     aws_access_key_id=access_key,
                     aws_secret_access_key=secret_key,
                     region_name=region)
    
    def createTableIfNotExists():
        '''
        Create a DynamoDB table if it does not exist.
        This must be run on the Spark driver, and not inside foreach.
        '''
        dynamodb = get_dynamodb()
    
        existing_tables = dynamodb.meta.client.list_tables()['TableNames']
        if table_name not in existing_tables:
          print("Creating table %s" % table_name)
          table = dynamodb.create_table(
              TableName=table_name,
              KeySchema=[ { 'AttributeName': 'key', 'KeyType': 'HASH' } ],
              AttributeDefinitions=[ { 'AttributeName': 'key', 'AttributeType': 'S' } ],
              ProvisionedThroughput = { 'ReadCapacityUnits': 5, 'WriteCapacityUnits': 5 }
          )
    
          print("Waiting for table to be ready")
    
    table.meta.client.get_waiter('table_exists').wait(TableName=table_name)
    
  2. DynamoDB に書き込むクラスとメソッドを定義し、 foreachから呼び出します。 foreachでカスタム ロジックを指定する方法は 2 つあります。

    • 関数を使用する:これは、一度に1行を書き込むために使用できる単純なアプローチです。 ただし、行を書き込むためのクライアント/接続の初期化は、すべての呼び出しで実行されます。

      def sendToDynamoDB_simple(row):
        '''
        Function to send a row to DynamoDB.
        When used with `foreach`, this method is going to be called in the executor
        with the generated output rows.
        '''
        # Create client object in the executor,
        # do not use client objects created in the driver
        dynamodb = get_dynamodb()
      
        dynamodb.Table(table_name).put_item(
            Item = { 'key': str(row['key']), 'count': row['count'] })
      
    • openprocess、および close メソッドを持つクラスを使用する: これにより、クライアント/接続が初期化され、複数の行を書き出すことができる、より効率的な実装が可能になります。

      class SendToDynamoDB_ForeachWriter:
        '''
        Class to send a set of rows to DynamoDB.
        When used with `foreach`, copies of this class is going to be used to write
        multiple rows in the executor. See the python docs for `DataStreamWriter.foreach`
        for more details.
        '''
      
        def open(self, partition_id, epoch_id):
          # This is called first when preparing to send multiple rows.
          # Put all the initialization code inside open() so that a fresh
          # copy of this class is initialized in the executor where open()
          # will be called.
          self.dynamodb = get_dynamodb()
          return True
      
        def process(self, row):
          # This is called for each row after open() has been called.
          # This implementation sends one row at a time.
          # For further enhancements, contact the Spark+DynamoDB connector
          # team: https://github.com/audienceproject/spark-dynamodb
          self.dynamodb.Table(table_name).put_item(
              Item = { 'key': str(row['key']), 'count': row['count'] })
      
        def close(self, err):
          # This is called after all the rows have been processed.
          if err:
            raise err
      
  3. 上記の関数またはオブジェクトを使用してストリーミングクエリーで foreach 呼び出します。

    from pyspark.sql.functions import *
    
    spark.conf.set("spark.sql.shuffle.partitions", "1")
    
    query = (
      spark.readStream.format("rate").load()
        .selectExpr("value % 10 as key")
        .groupBy("key")
        .count()
        .toDF("key", "count")
        .writeStream
        .foreach(SendToDynamoDB_ForeachWriter())
        #.foreach(sendToDynamoDB_simple)  // alternative, use one or the other
        .outputMode("update")
        .start()
    )
    

Scalaを使用する

この例では、Scala で streamingDataFrame.writeStream.foreach() を使用して DynamoDB に書き込む方法を示します。

これを実行するには、「value」という名前の単一の文字列キーを持つ DynamoDB テーブルを作成する必要があります。

  1. 書き込みを実行する ForeachWriter インターフェイスの実装を定義します。

    import org.apache.spark.sql.{ForeachWriter, Row}
    import com.amazonaws.AmazonServiceException
    import com.amazonaws.auth._
    import com.amazonaws.services.dynamodbv2.AmazonDynamoDB
    import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder
    import com.amazonaws.services.dynamodbv2.model.AttributeValue
    import com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException
    import java.util.ArrayList
    
    import scala.collection.JavaConverters._
    
    class DynamoDbWriter extends ForeachWriter[Row] {
      private val tableName = "<table name>"
      private val accessKey = "<aws access key>"
      private val secretKey = "<aws secret key>"
      private val regionName = "<region>"
    
      // This will lazily be initialized only when open() is called
      lazy val ddb = AmazonDynamoDBClientBuilder.standard()
        .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials(accessKey, secretKey)))
        .withRegion(regionName)
        .build()
    
      //
      // This is called first when preparing to send multiple rows.
      // Put all the initialization code inside open() so that a fresh
      // copy of this class is initialized in the executor where open()
      // will be called.
      //
      def open(partitionId: Long, epochId: Long) = {
        ddb  // force the initialization of the client
        true
      }
    
      //
      // This is called for each row after open() has been called.
      // This implementation sends one row at a time.
      // A more efficient implementation can be to send batches of rows at a time.
      //
      def process(row: Row) = {
        val rowAsMap = row.getValuesMap(row.schema.fieldNames)
        val dynamoItem = rowAsMap.mapValues {
          v: Any => new AttributeValue(v.toString)
        }.asJava
    
        ddb.putItem(tableName, dynamoItem)
      }
    
      //
      // This is called after all the rows have been processed.
      //
      def close(errorOrNull: Throwable) = {
        ddb.shutdown()
      }
    }
    
  2. DynamoDbWriter を使用してレートストリームを DynamoDB に書き込みます。

    spark.readStream
      .format("rate")
      .load()
      .select("value")
      .writeStream
      .foreach(new DynamoDbWriter)
      .start()
    

ストリーム-ストリーム結合

これら 2 つのノートブックは、Python と Scala でストリームとストリームの結合を使用する方法を示しています。

Python ノートブックでのStream-Strem joins

ノートブックを新しいタブで開く

ストリーム-ストリームjoinのScalaノートブック

ノートブックを新しいタブで開く