Databricks での構造化ストリーミング パターン
これには、Databricks で構造化ストリーミングを操作するための一般的なパターンのノートブックとコード サンプルが含まれています。
構造化ストリーミングの使用を開始する
構造化ストリーミングを初めて使用する場合は、「 最初の構造化ストリーミングワークロードを実行する」を参照してください。
Pythonでの構造化ストリーミングのシンクとして Cassandra に書き込む
Apache Cassandra は、分散、低待機時間、スケーラブル、高可用性の OLTP データベースです。
構造化ストリーミングは、 Spark Cassandra コネクタを介して Cassandra と連携します。 このコネクタは、RDD と DataFrame APIsの両方をサポートし、ストリーミング データの書き込みをネイティブにサポートしています。 *重要*対応するバージョンの スパークCassandraコネクタアセンブリを使用する必要があります。
次の例では、Cassandra データベース クラスター内の 1 つ以上のホストに接続します。 また、チェックポイントの場所や特定のキースペース名、テーブル名などの接続構成も指定します。
spark.conf.set("spark.cassandra.connection.host", "host1,host2")
df.writeStream \
.format("org.apache.spark.sql.cassandra") \
.outputMode("append") \
.option("checkpointLocation", "/path/to/checkpoint") \
.option("keyspace", "keyspace_name") \
.option("table", "table_name") \
.start()
foreachBatch()
をPythonで使用したAzure Synapse Analyticsへの書き込み
streamingDF.writeStream.foreachBatch()
を使用すると、既存のバッチ データ ライターを再利用して、ストリーミングの出力を Azure Synapse アナリティクスに書き込むことができます。 詳細については、 foreachBatch のドキュメント を参照してください。
この例を実行するには、Azure Synapse Analytics コネクタが必要です。 Azure Synapse Analytics コネクタの詳細については、「 Azure Synapse Analytics のクエリー データ」を参照してください。
from pyspark.sql.functions import *
from pyspark.sql import *
def writeToSQLWarehouse(df, epochId):
df.write \
.format("com.databricks.spark.sqldw") \
.mode('overwrite') \
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
.option("forward_spark_azure_storage_credentials", "true") \
.option("dbtable", "my_table_in_dw_copy") \
.option("tempdir", "wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<your-directory-name>") \
.save()
spark.conf.set("spark.sql.shuffle.partitions", "1")
query = (
spark.readStream.format("rate").load()
.selectExpr("value % 10 as key")
.groupBy("key")
.count()
.toDF("key", "count")
.writeStream
.foreachBatch(writeToSQLWarehouse)
.outputMode("update")
.start()
)
Scala および Python のforeach()
を使用して Amazon DynamoDB に書き込む
streamingDF.writeStream.foreach()
ストリーミングクエリーの出力を任意の場所に書き込むことができます。
Pythonを使用する
この例では、Python で streamingDataFrame.writeStream.foreach()
を使用して DynamoDB に書き込む方法を示します。 最初のステップでは、DynamoDB Boto リソースを取得します。 この例は、 access_key
と secret_key
を使用するように記述されていますが、Databricks ではインスタンスプロファイルを使用することをお勧めします。 「 チュートリアル: インスタンスプロファイルを使用して S3 アクセスを設定する」を参照してください。
例を実行するための DynamoDB テーブルを作成するためのヘルパーメソッドをいくつか定義します。
table_name = "PythonForeachTest" def get_dynamodb(): import boto3 access_key = "<access key>" secret_key = "<secret key>" region = "<region name>" return boto3.resource('dynamodb', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name=region) def createTableIfNotExists(): ''' Create a DynamoDB table if it does not exist. This must be run on the Spark driver, and not inside foreach. ''' dynamodb = get_dynamodb() existing_tables = dynamodb.meta.client.list_tables()['TableNames'] if table_name not in existing_tables: print("Creating table %s" % table_name) table = dynamodb.create_table( TableName=table_name, KeySchema=[ { 'AttributeName': 'key', 'KeyType': 'HASH' } ], AttributeDefinitions=[ { 'AttributeName': 'key', 'AttributeType': 'S' } ], ProvisionedThroughput = { 'ReadCapacityUnits': 5, 'WriteCapacityUnits': 5 } ) print("Waiting for table to be ready") table.meta.client.get_waiter('table_exists').wait(TableName=table_name)
DynamoDB に書き込むクラスとメソッドを定義し、
foreach
から呼び出します。foreach
でカスタム ロジックを指定する方法は 2 つあります。関数を使用する:これは、一度に1行を書き込むために使用できる単純なアプローチです。 ただし、行を書き込むためのクライアント/接続の初期化は、すべての呼び出しで実行されます。
def sendToDynamoDB_simple(row): ''' Function to send a row to DynamoDB. When used with `foreach`, this method is going to be called in the executor with the generated output rows. ''' # Create client object in the executor, # do not use client objects created in the driver dynamodb = get_dynamodb() dynamodb.Table(table_name).put_item( Item = { 'key': str(row['key']), 'count': row['count'] })
open
、process
、およびclose
メソッドを持つクラスを使用する: これにより、クライアント/接続が初期化され、複数の行を書き出すことができる、より効率的な実装が可能になります。class SendToDynamoDB_ForeachWriter: ''' Class to send a set of rows to DynamoDB. When used with `foreach`, copies of this class is going to be used to write multiple rows in the executor. See the python docs for `DataStreamWriter.foreach` for more details. ''' def open(self, partition_id, epoch_id): # This is called first when preparing to send multiple rows. # Put all the initialization code inside open() so that a fresh # copy of this class is initialized in the executor where open() # will be called. self.dynamodb = get_dynamodb() return True def process(self, row): # This is called for each row after open() has been called. # This implementation sends one row at a time. # For further enhancements, contact the Spark+DynamoDB connector # team: https://github.com/audienceproject/spark-dynamodb self.dynamodb.Table(table_name).put_item( Item = { 'key': str(row['key']), 'count': row['count'] }) def close(self, err): # This is called after all the rows have been processed. if err: raise err
上記の関数またはオブジェクトを使用してストリーミングクエリーで
foreach
呼び出します。from pyspark.sql.functions import * spark.conf.set("spark.sql.shuffle.partitions", "1") query = ( spark.readStream.format("rate").load() .selectExpr("value % 10 as key") .groupBy("key") .count() .toDF("key", "count") .writeStream .foreach(SendToDynamoDB_ForeachWriter()) #.foreach(sendToDynamoDB_simple) // alternative, use one or the other .outputMode("update") .start() )
Scalaを使用する
この例では、Scala で streamingDataFrame.writeStream.foreach()
を使用して DynamoDB に書き込む方法を示します。
これを実行するには、「value」という名前の単一の文字列キーを持つ DynamoDB テーブルを作成する必要があります。
書き込みを実行する
ForeachWriter
インターフェイスの実装を定義します。import org.apache.spark.sql.{ForeachWriter, Row} import com.amazonaws.AmazonServiceException import com.amazonaws.auth._ import com.amazonaws.services.dynamodbv2.AmazonDynamoDB import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder import com.amazonaws.services.dynamodbv2.model.AttributeValue import com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException import java.util.ArrayList import scala.collection.JavaConverters._ class DynamoDbWriter extends ForeachWriter[Row] { private val tableName = "<table name>" private val accessKey = "<aws access key>" private val secretKey = "<aws secret key>" private val regionName = "<region>" // This will lazily be initialized only when open() is called lazy val ddb = AmazonDynamoDBClientBuilder.standard() .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials(accessKey, secretKey))) .withRegion(regionName) .build() // // This is called first when preparing to send multiple rows. // Put all the initialization code inside open() so that a fresh // copy of this class is initialized in the executor where open() // will be called. // def open(partitionId: Long, epochId: Long) = { ddb // force the initialization of the client true } // // This is called for each row after open() has been called. // This implementation sends one row at a time. // A more efficient implementation can be to send batches of rows at a time. // def process(row: Row) = { val rowAsMap = row.getValuesMap(row.schema.fieldNames) val dynamoItem = rowAsMap.mapValues { v: Any => new AttributeValue(v.toString) }.asJava ddb.putItem(tableName, dynamoItem) } // // This is called after all the rows have been processed. // def close(errorOrNull: Throwable) = { ddb.shutdown() } }
DynamoDbWriter
を使用してレートストリームを DynamoDB に書き込みます。spark.readStream .format("rate") .load() .select("value") .writeStream .foreach(new DynamoDbWriter) .start()