Databricks での構造化ストリーミング パターン

これには、Databricks で構造化ストリーミングを操作するための一般的なパターンのノートブックとコード サンプルが含まれています。

構造化ストリーミング の使用を開始する

構造化ストリーミングを初めて使用する場合は、「 最初の構造化ストリーミングワークロードを実行する」を参照してください。

Python での構造化ストリーミングのシンクとして Cassandra に書き込む

Apache Cassandra は、分散、低待機時間、スケーラブル、高可用性の OLTP データベースです。

構造化ストリーミングは、 Spark Cassandra コネクタを介して Cassandra と連携します。 このコネクタは、RDD と DataFrame APIsの両方をサポートし、ストリーミング データの書き込みをネイティブにサポートしています。 *重要*対応するバージョンの スパークCassandraコネクタアセンブリを使用する必要があります。

次の例では、Cassandra データベース クラスター内の 1 つ以上のホストに接続します。 また、チェックポイントの場所や特定のキースペース名、テーブル名などの接続構成も指定します。

spark.conf.set("spark.cassandra.connection.host", "host1,host2")

df.writeStream \
  .format("org.apache.spark.sql.cassandra") \
  .outputMode("append") \
  .option("checkpointLocation", "/path/to/checkpoint") \
  .option("keyspace", "keyspace_name") \
  .option("table", "table_name") \
  .start()

Azure Synapse アナリティクスへの書き込み using foreachBatch() in Python

streamingDF.writeStream.foreachBatch() を使用すると、既存のバッチ データ ライターを再利用して、ストリーミングの出力を Azure Synapse アナリティクスに書き込むことができます。 詳細については、 foreachBatch のドキュメント を参照してください。

この例を実行するには、Azure Synapse Analytics コネクタが必要です。 Azure Synapse Analytics コネクタの詳細については、「 Azure Synapse Analytics のクエリー データ」を参照してください。

from pyspark.sql.functions import *
from pyspark.sql import *

def writeToSQLWarehouse(df, epochId):
  df.write \
    .format("com.databricks.spark.sqldw") \
    .mode('overwrite') \
    .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
    .option("forward_spark_azure_storage_credentials", "true") \
    .option("dbtable", "my_table_in_dw_copy") \
    .option("tempdir", "wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<your-directory-name>") \
    .save()

spark.conf.set("spark.sql.shuffle.partitions", "1")

query = (
  spark.readStream.format("rate").load()
    .selectExpr("value % 10 as key")
    .groupBy("key")
    .count()
    .toDF("key", "count")
    .writeStream
    .foreachBatch(writeToSQLWarehouse)
    .outputMode("update")
    .start()
    )

Scala および Python foreach() を使用して Amazon DynamoDB に書き込む

streamingDF.writeStream.foreach() ストリーミングクエリーの出力を任意の場所に書き込むことができます。

Python を使用する

この例では、Python で streamingDataFrame.writeStream.foreach() を使用して DynamoDB に書き込む方法を示します。 最初のステップでは、DynamoDB Boto リソースを取得します。 この例は、 access_keysecret_keyを使用するように記述されていますが、Databricks ではインスタンスプロファイルを使用することをお勧めします。 「 チュートリアル: インスタンスプロファイルを使用して S3 アクセスを設定する」を参照してください。

  1. 例を実行するための DynamoDB テーブルを作成するためのヘルパーメソッドをいくつか定義します。

    table_name = "PythonForeachTest"
    
    def get_dynamodb():
      import boto3
    
      access_key = "<access key>"
      secret_key = "<secret key>"
      region = "<region name>"
      return boto3.resource('dynamodb',
                     aws_access_key_id=access_key,
                     aws_secret_access_key=secret_key,
                     region_name=region)
    
    def createTableIfNotExists():
        '''
        Create a DynamoDB table if it does not exist.
        This must be run on the Spark driver, and not inside foreach.
        '''
        dynamodb = get_dynamodb()
    
        existing_tables = dynamodb.meta.client.list_tables()['TableNames']
        if table_name not in existing_tables:
          print("Creating table %s" % table_name)
          table = dynamodb.create_table(
              TableName=table_name,
              KeySchema=[ { 'AttributeName': 'key', 'KeyType': 'HASH' } ],
              AttributeDefinitions=[ { 'AttributeName': 'key', 'AttributeType': 'S' } ],
              ProvisionedThroughput = { 'ReadCapacityUnits': 5, 'WriteCapacityUnits': 5 }
          )
    
          print("Waiting for table to be ready")
    
    table.meta.client.get_waiter('table_exists').wait(TableName=table_name)
    
  2. DynamoDB に書き込むクラスとメソッドを定義し、 foreachから呼び出します。 foreachでカスタム ロジックを指定する方法は 2 つあります。

    • 関数を使用する:これは、一度に1行を書き込むために使用できる単純なアプローチです。 ただし、行を書き込むためのクライアント/接続の初期化は、すべての呼び出しで実行されます。

      def sendToDynamoDB_simple(row):
        '''
        Function to send a row to DynamoDB.
        When used with `foreach`, this method is going to be called in the executor
        with the generated output rows.
        '''
        # Create client object in the executor,
        # do not use client objects created in the driver
        dynamodb = get_dynamodb()
      
        dynamodb.Table(table_name).put_item(
            Item = { 'key': str(row['key']), 'count': row['count'] })
      
    • openprocess、および close メソッドを持つクラスを使用する: これにより、クライアント/接続が初期化され、複数の行を書き出すことができる、より効率的な実装が可能になります。

      class SendToDynamoDB_ForeachWriter:
        '''
        Class to send a set of rows to DynamoDB.
        When used with `foreach`, copies of this class is going to be used to write
        multiple rows in the executor. See the python docs for `DataStreamWriter.foreach`
        for more details.
        '''
      
        def open(self, partition_id, epoch_id):
          # This is called first when preparing to send multiple rows.
          # Put all the initialization code inside open() so that a fresh
          # copy of this class is initialized in the executor where open()
          # will be called.
          self.dynamodb = get_dynamodb()
          return True
      
        def process(self, row):
          # This is called for each row after open() has been called.
          # This implementation sends one row at a time.
          # For further enhancements, contact the Spark+DynamoDB connector
          # team: https://github.com/audienceproject/spark-dynamodb
          self.dynamodb.Table(table_name).put_item(
              Item = { 'key': str(row['key']), 'count': row['count'] })
      
        def close(self, err):
          # This is called after all the rows have been processed.
          if err:
            raise err
      
  3. 上記の関数またはオブジェクトを使用してストリーミングクエリーで foreach 呼び出します。

    from pyspark.sql.functions import *
    
    spark.conf.set("spark.sql.shuffle.partitions", "1")
    
    query = (
      spark.readStream.format("rate").load()
        .selectExpr("value % 10 as key")
        .groupBy("key")
        .count()
        .toDF("key", "count")
        .writeStream
        .foreach(SendToDynamoDB_ForeachWriter())
        #.foreach(sendToDynamoDB_simple)  // alternative, use one or the other
        .outputMode("update")
        .start()
    )
    

Scala を使用する

この例では、Scala で streamingDataFrame.writeStream.foreach() を使用して DynamoDB に書き込む方法を示します。

これを実行するには、「value」という名前の単一の文字列キーを持つ DynamoDB テーブルを作成する必要があります。

  1. 書き込みを実行する ForeachWriter インターフェイスの実装を定義します。

    import org.apache.spark.sql.{ForeachWriter, Row}
    import com.amazonaws.AmazonServiceException
    import com.amazonaws.auth._
    import com.amazonaws.services.dynamodbv2.AmazonDynamoDB
    import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder
    import com.amazonaws.services.dynamodbv2.model.AttributeValue
    import com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException
    import java.util.ArrayList
    
    import scala.collection.JavaConverters._
    
    class DynamoDbWriter extends ForeachWriter[Row] {
      private val tableName = "<table name>"
      private val accessKey = "<aws access key>"
      private val secretKey = "<aws secret key>"
      private val regionName = "<region>"
    
      // This will lazily be initialized only when open() is called
      lazy val ddb = AmazonDynamoDBClientBuilder.standard()
        .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials(accessKey, secretKey)))
        .withRegion(regionName)
        .build()
    
      //
      // This is called first when preparing to send multiple rows.
      // Put all the initialization code inside open() so that a fresh
      // copy of this class is initialized in the executor where open()
      // will be called.
      //
      def open(partitionId: Long, epochId: Long) = {
        ddb  // force the initialization of the client
        true
      }
    
      //
      // This is called for each row after open() has been called.
      // This implementation sends one row at a time.
      // A more efficient implementation can be to send batches of rows at a time.
      //
      def process(row: Row) = {
        val rowAsMap = row.getValuesMap(row.schema.fieldNames)
        val dynamoItem = rowAsMap.mapValues {
          v: Any => new AttributeValue(v.toString)
        }.asJava
    
        ddb.putItem(tableName, dynamoItem)
      }
    
      //
      // This is called after all the rows have been processed.
      //
      def close(errorOrNull: Throwable) = {
        ddb.shutdown()
      }
    }
    
  2. DynamoDbWriter を使用してレートストリームを DynamoDB に書き込みます。

    spark.readStream
      .format("rate")
      .load()
      .select("value")
      .writeStream
      .foreach(new DynamoDbWriter)
      .start()
    

Amazon CloudTrail ETL

次のノートブックは、Amazon CloudTrail ログを JSON から Parquet に簡単に変換して、効率的なアドホッククエリを行う方法を示しています。 詳細については、「 構造化ストリーミングを使用したリアルタイム ストリーミング ETL 」を参照してください。

ETL of Amazon CloudTrail logs using 構造化ストリーミング Python notebook

ノートブックを新しいタブで開く

ETL of Amazon CloudTrail logs using 構造化ストリーミング Scala notebook

ノートブックを新しいタブで開く

ストリーム-ストリーム結合

これら 2 つのノートブックは、Python と Scala でストリームとストリームの結合を使用する方法を示しています。

ストリーム-ストリームは Python ノートブック に参加します

ノートブックを新しいタブで開く

ストリームストリームはScalaノートブック に参加します

ノートブックを新しいタブで開く