メインコンテンツまでスキップ

Databricks の構造化ストリーミングパターン

これには、Databricks で構造化ストリーミングを操作するための一般的なパターンのノートブックとコード サンプルが含まれています。

構造化ストリーミングの使用を開始する

構造化ストリーミングを初めて使用する場合は、「 初めての構造化ストリーミングワークロードを実行する」を参照してください。

Python での構造化ストリーミングのシンクとして Cassandra に書き込む

Apache Cassandra は、分散型、低遅延、スケーラブル、高可用性の OLTP データベースです。

構造化ストリーミングは、 Spark Cassandra コネクタを介して Cassandra と連携します。 このコネクタは、 RDD と データフレーム APIsの両方をサポートしており、ストリーミング データの書き込みをネイティブにサポートしています。 大事な 対応するバージョンの spark-Cassandra-connector-assembly を使用する必要があります。

次の例では、 Cassandra データベース クラスター内の 1 つ以上のホストに接続します。 また、チェックポイントの場所や特定のキースペースおよびテーブル名などの接続構成も指定します。

Python
spark.conf.set("spark.cassandra.connection.host", "host1,host2")

df.writeStream \
.format("org.apache.spark.sql.cassandra") \
.outputMode("append") \
.option("checkpointLocation", "/path/to/checkpoint") \
.option("keyspace", "keyspace_name") \
.option("table", "table_name") \
.start()

Python で foreachBatch() を使用して Azure Synapse Analytics に書き込む

streamingDF.writeStream.foreachBatch() 既存のバッチデータライターを再利用して、 ストリーミング クエリの出力を Azure Synapse Analytics に出力します。 詳細については、 foreachBatch のドキュメント を参照してください。

この例を実行するには、Azure Synapse Analytics コネクタが必要です。 Azure Synapse Analytics コネクタの詳細については、「 Azure Synapse Analytics でデータのクエリを実行する」を参照してください。

Python
from pyspark.sql.functions import *
from pyspark.sql import *

def writeToSQLWarehouse(df, epochId):
df.write \
.format("com.databricks.spark.sqldw") \
.mode('overwrite') \
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
.option("forward_spark_azure_storage_credentials", "true") \
.option("dbtable", "my_table_in_dw_copy") \
.option("tempdir", "wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<your-directory-name>") \
.save()

spark.conf.set("spark.sql.shuffle.partitions", "1")

query = (
spark.readStream.format("rate").load()
.selectExpr("value % 10 as key")
.groupBy("key")
.count()
.toDF("key", "count")
.writeStream
.foreachBatch(writeToSQLWarehouse)
.outputMode("update")
.start()
)

Scala と Python の foreach() を使用して Amazon DynamoDB に書き込む

streamingDF.writeStream.foreach() ストリーミング クエリの出力を任意の場所に書き込むことができます。

Pythonを使用する

この例では、Python で streamingDataFrame.writeStream.foreach() を使用して DynamoDB に書き込む方法を示します。 最初のステップでは、DynamoDB Boto リソースを取得します。 この例は access_keysecret_key を使用するように書かれていますが、インスタンスプロファイルを使用すること Databricks 推奨します。 「 チュートリアル: インスタンスプロファイルを使用した S3 アクセスの設定」を参照してください。

  1. 例を実行するための DynamoDB テーブルを作成するためのいくつかのヘルパーメソッドを定義します。

    Python
    table_name = "PythonForeachTest"

    def get_dynamodb():
    import boto3

    access_key = "<access key>"
    secret_key = "<secret key>"
    region = "<region name>"
    return boto3.resource('dynamodb',
    aws_access_key_id=access_key,
    aws_secret_access_key=secret_key,
    region_name=region)

    def createTableIfNotExists():
    '''
    Create a DynamoDB table if it does not exist.
    This must be run on the Spark driver, and not inside foreach.
    '''
    dynamodb = get_dynamodb()

    existing_tables = dynamodb.meta.client.list_tables()['TableNames']
    if table_name not in existing_tables:
    print("Creating table %s" % table_name)
    table = dynamodb.create_table(
    TableName=table_name,
    KeySchema=[ { 'AttributeName': 'key', 'KeyType': 'HASH' } ],
    AttributeDefinitions=[ { 'AttributeName': 'key', 'AttributeType': 'S' } ],
    ProvisionedThroughput = { 'ReadCapacityUnits': 5, 'WriteCapacityUnits': 5 }
    )

    print("Waiting for table to be ready")

    table.meta.client.get_waiter('table_exists').wait(TableName=table_name)
  2. DynamoDB に書き込むクラスとメソッドを定義し、それらを foreachから呼び出します。 foreachでカスタムロジックを指定するには、2 つの方法があります。

    • 関数を使用する: これは、一度に 1 行を書き込むために使用できる簡単なアプローチです。 ただし、ローを書き込むためのクライアント/接続の初期化は、すべての呼び出しで行われます。

      Python
      def sendToDynamoDB_simple(row):
      '''
      Function to send a row to DynamoDB.
      When used with `foreach`, this method is going to be called in the executor
      with the generated output rows.
      '''
      # Create client object in the executor,
      # do not use client objects created in the driver
      dynamodb = get_dynamodb()

      dynamodb.Table(table_name).put_item(
      Item = { 'key': str(row['key']), 'count': row['count'] })
    • openprocess、および close メソッドを持つクラスを使用する: これにより、クライアント/接続が初期化され、複数の行を書き出すことができる、より効率的な実装が可能になります。

      Python
      class SendToDynamoDB_ForeachWriter:
      '''
      Class to send a set of rows to DynamoDB.
      When used with `foreach`, copies of this class is going to be used to write
      multiple rows in the executor. See the python docs for `DataStreamWriter.foreach`
      for more details.
      '''

      def open(self, partition_id, epoch_id):
      # This is called first when preparing to send multiple rows.
      # Put all the initialization code inside open() so that a fresh
      # copy of this class is initialized in the executor where open()
      # will be called.
      self.dynamodb = get_dynamodb()
      return True

      def process(self, row):
      # This is called for each row after open() has been called.
      # This implementation sends one row at a time.
      # For further enhancements, contact the Spark+DynamoDB connector
      # team: https://github.com/audienceproject/spark-dynamodb
      self.dynamodb.Table(table_name).put_item(
      Item = { 'key': str(row['key']), 'count': row['count'] })

      def close(self, err):
      # This is called after all the rows have been processed.
      if err:
      raise err
  3. ストリーミング クエリで foreach を呼び出すには、上記の関数またはオブジェクトを使用します。

    Python
    from pyspark.sql.functions import *

    spark.conf.set("spark.sql.shuffle.partitions", "1")

    query = (
    spark.readStream.format("rate").load()
    .selectExpr("value % 10 as key")
    .groupBy("key")
    .count()
    .toDF("key", "count")
    .writeStream
    .foreach(SendToDynamoDB_ForeachWriter())
    #.foreach(sendToDynamoDB_simple) // alternative, use one or the other
    .outputMode("update")
    .start()
    )

Scala を使用する

この例では、Scala で streamingDataFrame.writeStream.foreach() を使用して DynamoDB に書き込む方法を示します。

これを実行するには、「value」という名前の単一の文字列キーを持つDynamoDBテーブルを作成する必要があります。

  1. 書き込みを行う ForeachWriter インターフェースの実装を定義してください。

    Scala
    import org.apache.spark.sql.{ForeachWriter, Row}
    import com.amazonaws.AmazonServiceException
    import com.amazonaws.auth._
    import com.amazonaws.services.dynamodbv2.AmazonDynamoDB
    import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder
    import com.amazonaws.services.dynamodbv2.model.AttributeValue
    import com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException
    import java.util.ArrayList

    import scala.collection.JavaConverters._

    class DynamoDbWriter extends ForeachWriter[Row] {
    private val tableName = "<table name>"
    private val accessKey = "<aws access key>"
    private val secretKey = "<aws secret key>"
    private val regionName = "<region>"

    // This will lazily be initialized only when open() is called
    lazy val ddb = AmazonDynamoDBClientBuilder.standard()
    .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials(accessKey, secretKey)))
    .withRegion(regionName)
    .build()

    //
    // This is called first when preparing to send multiple rows.
    // Put all the initialization code inside open() so that a fresh
    // copy of this class is initialized in the executor where open()
    // will be called.
    //
    def open(partitionId: Long, epochId: Long) = {
    ddb // force the initialization of the client
    true
    }

    //
    // This is called for each row after open() has been called.
    // This implementation sends one row at a time.
    // A more efficient implementation can be to send batches of rows at a time.
    //
    def process(row: Row) = {
    val rowAsMap = row.getValuesMap(row.schema.fieldNames)
    val dynamoItem = rowAsMap.mapValues {
    v: Any => new AttributeValue(v.toString)
    }.asJava

    ddb.putItem(tableName, dynamoItem)
    }

    //
    // This is called after all the rows have been processed.
    //
    def close(errorOrNull: Throwable) = {
    ddb.shutdown()
    }
    }
  2. DynamoDbWriterを使用して、レートストリームを DynamoDB に書き込みます。

    Scala
    spark.readStream
    .format("rate")
    .load()
    .select("value")
    .writeStream
    .foreach(new DynamoDbWriter)
    .start()

ストリームとストリームの結合

これらの 2 つのノートブックは、 Python と Scalaでストリーム-ストリーム結合を使用する方法を示しています。

ストリーム-ストリーム join Python ノートブック

Open notebook in new tab

ストリーム-ストリーム join Scala ノートブック

Open notebook in new tab