Databricks の構造化ストリーミングパターン
これには、Databricks で構造化ストリーミングを操作するための一般的なパターンのノートブックとコード サンプルが含まれています。
構造化ストリーミングの使用を開始する
構造化ストリーミングを初めて使用する場合は、「 初めての構造化ストリーミングワークロードを実行する」を参照してください。
Python での構造化ストリーミングのシンクとして Cassandra に書き込む
Apache Cassandra は、分散型、低遅延、スケーラブル、高可用性の OLTP データベースです。
構造化ストリーミングは、 Spark Cassandra コネクタを介して Cassandra と連携します。 このコネクタは、 RDD と データフレーム APIsの両方をサポートしており、ストリーミング データの書き込みをネイティブにサポートしています。 大事な 対応するバージョンの spark-Cassandra-connector-assembly を使用する必要があります。
次の例では、 Cassandra データベース クラスター内の 1 つ以上のホストに接続します。 また、チェックポイントの場所や特定のキースペースおよびテーブル名などの接続構成も指定します。
spark.conf.set("spark.cassandra.connection.host", "host1,host2")
df.writeStream \
.format("org.apache.spark.sql.cassandra") \
.outputMode("append") \
.option("checkpointLocation", "/path/to/checkpoint") \
.option("keyspace", "keyspace_name") \
.option("table", "table_name") \
.start()
Python で foreachBatch()
を使用して Azure Synapse Analytics に書き込む
streamingDF.writeStream.foreachBatch()
既存のバッチデータライターを再利用して、
ストリーミング クエリの出力を Azure Synapse Analytics に出力します。 詳細については、 foreachBatch のドキュメント を参照してください。
この例を実行するには、Azure Synapse Analytics コネクタが必要です。 Azure Synapse Analytics コネクタの詳細については、「 Azure Synapse Analytics でデータのクエリを実行する」を参照してください。
from pyspark.sql.functions import *
from pyspark.sql import *
def writeToSQLWarehouse(df, epochId):
df.write \
.format("com.databricks.spark.sqldw") \
.mode('overwrite') \
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
.option("forward_spark_azure_storage_credentials", "true") \
.option("dbtable", "my_table_in_dw_copy") \
.option("tempdir", "wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<your-directory-name>") \
.save()
spark.conf.set("spark.sql.shuffle.partitions", "1")
query = (
spark.readStream.format("rate").load()
.selectExpr("value % 10 as key")
.groupBy("key")
.count()
.toDF("key", "count")
.writeStream
.foreachBatch(writeToSQLWarehouse)
.outputMode("update")
.start()
)
Scala と Python の foreach()
を使用して Amazon DynamoDB に書き込む
streamingDF.writeStream.foreach()
ストリーミング クエリの出力を任意の場所に書き込むことができます。
Pythonを使用する
この例では、Python で streamingDataFrame.writeStream.foreach()
を使用して DynamoDB に書き込む方法を示します。 最初のステップでは、DynamoDB Boto リソースを取得します。 この例は access_key
と secret_key
を使用するように書かれていますが、インスタンスプロファイルを使用すること Databricks 推奨します。 「 チュートリアル: インスタンスプロファイルを使用した S3 アクセスの設定」を参照してください。
-
例を実行するための DynamoDB テーブルを作成するためのいくつかのヘルパーメソッドを定義します。
Pythontable_name = "PythonForeachTest"
def get_dynamodb():
import boto3
access_key = "<access key>"
secret_key = "<secret key>"
region = "<region name>"
return boto3.resource('dynamodb',
aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
region_name=region)
def createTableIfNotExists():
'''
Create a DynamoDB table if it does not exist.
This must be run on the Spark driver, and not inside foreach.
'''
dynamodb = get_dynamodb()
existing_tables = dynamodb.meta.client.list_tables()['TableNames']
if table_name not in existing_tables:
print("Creating table %s" % table_name)
table = dynamodb.create_table(
TableName=table_name,
KeySchema=[ { 'AttributeName': 'key', 'KeyType': 'HASH' } ],
AttributeDefinitions=[ { 'AttributeName': 'key', 'AttributeType': 'S' } ],
ProvisionedThroughput = { 'ReadCapacityUnits': 5, 'WriteCapacityUnits': 5 }
)
print("Waiting for table to be ready")
table.meta.client.get_waiter('table_exists').wait(TableName=table_name) -
DynamoDB に書き込むクラスとメソッドを定義し、それらを
foreach
から呼び出します。foreach
でカスタムロジックを指定するには、2 つの方法があります。-
関数を使用する: これは、一度に 1 行を書き込むために使用できる簡単なアプローチです。 ただし、ローを書き込むためのクライアント/接続の初期化は、すべての呼び出しで行われます。
Pythondef sendToDynamoDB_simple(row):
'''
Function to send a row to DynamoDB.
When used with `foreach`, this method is going to be called in the executor
with the generated output rows.
'''
# Create client object in the executor,
# do not use client objects created in the driver
dynamodb = get_dynamodb()
dynamodb.Table(table_name).put_item(
Item = { 'key': str(row['key']), 'count': row['count'] }) -
open
、process
、およびclose
メソッドを持つクラスを使用する: これにより、クライアント/接続が初期化され、複数の行を書き出すことができる、より効率的な実装が可能になります。Pythonclass SendToDynamoDB_ForeachWriter:
'''
Class to send a set of rows to DynamoDB.
When used with `foreach`, copies of this class is going to be used to write
multiple rows in the executor. See the python docs for `DataStreamWriter.foreach`
for more details.
'''
def open(self, partition_id, epoch_id):
# This is called first when preparing to send multiple rows.
# Put all the initialization code inside open() so that a fresh
# copy of this class is initialized in the executor where open()
# will be called.
self.dynamodb = get_dynamodb()
return True
def process(self, row):
# This is called for each row after open() has been called.
# This implementation sends one row at a time.
# For further enhancements, contact the Spark+DynamoDB connector
# team: https://github.com/audienceproject/spark-dynamodb
self.dynamodb.Table(table_name).put_item(
Item = { 'key': str(row['key']), 'count': row['count'] })
def close(self, err):
# This is called after all the rows have been processed.
if err:
raise err
-
-
ストリーミング クエリで
foreach
を呼び出すには、上記の関数またはオブジェクトを使用します。Pythonfrom pyspark.sql.functions import *
spark.conf.set("spark.sql.shuffle.partitions", "1")
query = (
spark.readStream.format("rate").load()
.selectExpr("value % 10 as key")
.groupBy("key")
.count()
.toDF("key", "count")
.writeStream
.foreach(SendToDynamoDB_ForeachWriter())
#.foreach(sendToDynamoDB_simple) // alternative, use one or the other
.outputMode("update")
.start()
)
Scala を使用する
この例では、Scala で streamingDataFrame.writeStream.foreach()
を使用して DynamoDB に書き込む方法を示します。
これを実行するには、「value」という名前の単一の文字列キーを持つDynamoDBテーブルを作成する必要があります。
-
書き込みを行う
ForeachWriter
インターフェースの実装を定義してください。Scalaimport org.apache.spark.sql.{ForeachWriter, Row}
import com.amazonaws.AmazonServiceException
import com.amazonaws.auth._
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder
import com.amazonaws.services.dynamodbv2.model.AttributeValue
import com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException
import java.util.ArrayList
import scala.collection.JavaConverters._
class DynamoDbWriter extends ForeachWriter[Row] {
private val tableName = "<table name>"
private val accessKey = "<aws access key>"
private val secretKey = "<aws secret key>"
private val regionName = "<region>"
// This will lazily be initialized only when open() is called
lazy val ddb = AmazonDynamoDBClientBuilder.standard()
.withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials(accessKey, secretKey)))
.withRegion(regionName)
.build()
//
// This is called first when preparing to send multiple rows.
// Put all the initialization code inside open() so that a fresh
// copy of this class is initialized in the executor where open()
// will be called.
//
def open(partitionId: Long, epochId: Long) = {
ddb // force the initialization of the client
true
}
//
// This is called for each row after open() has been called.
// This implementation sends one row at a time.
// A more efficient implementation can be to send batches of rows at a time.
//
def process(row: Row) = {
val rowAsMap = row.getValuesMap(row.schema.fieldNames)
val dynamoItem = rowAsMap.mapValues {
v: Any => new AttributeValue(v.toString)
}.asJava
ddb.putItem(tableName, dynamoItem)
}
//
// This is called after all the rows have been processed.
//
def close(errorOrNull: Throwable) = {
ddb.shutdown()
}
} -
DynamoDbWriter
を使用して、レートストリームを DynamoDB に書き込みます。Scalaspark.readStream
.format("rate")
.load()
.select("value")
.writeStream
.foreach(new DynamoDbWriter)
.start()
ストリームとストリームの結合
これらの 2 つのノートブックは、 Python と Scalaでストリーム-ストリーム結合を使用する方法を示しています。