transmissão estructurada patterns on Databricks
Contém Notebook e exemplos de código para padrões comuns de trabalho com transmissão estruturada em Databricks.
Como começar com a transmissão estruturada
Se o senhor for novato em transmissão estruturada, veja como executar sua primeira carga de trabalho de transmissão estruturada.
Escreva para Cassandra como um sumidouro para transmissão estruturada em Python
O Apache Cassandra é um banco de dados OLTP distribuído, de baixa latência, escalável e altamente disponível.
A transmissão estruturada funciona com o site Cassandra por meio do conectorSpark Cassandra. Esse conector é compatível com RDD e DataFrame APIs, e tem suporte nativo para gravação de dados de transmissão. Importante O senhor deve usar a versão correspondente da montagem do conector de faíscaCassandra.
O exemplo a seguir se conecta a um ou mais hosts em um clustering de banco de dados Cassandra. Ele também especifica configurações de conexão, como a localização do ponto de verificação e os nomes específicos do espaço de teclas e da tabela:
spark.conf.set("spark.cassandra.connection.host", "host1,host2")
df.writeStream \
.format("org.apache.spark.sql.cassandra") \
.outputMode("append") \
.option("checkpointLocation", "/path/to/checkpoint") \
.option("keyspace", "keyspace_name") \
.option("table", "table_name") \
.start()
Escreva para o Azure Synapse Analytics usando foreachBatch()
em Python
streamingDF.writeStream.foreachBatch()
permite que o senhor reutilize os gravadores de dados de lotes existentes para gravar a saída de uma consulta de transmissão em Azure Synapse Analytics. Consulte a documentação do ForEachBatch para obter detalhes.
Para executar este exemplo, o senhor precisa do conector Azure Synapse Analytics. Para obter detalhes sobre o conector do Azure Synapse Analytics, consulte Consultar dados no Azure Synapse Analytics.
from pyspark.sql.functions import *
from pyspark.sql import *
def writeToSQLWarehouse(df, epochId):
df.write \
.format("com.databricks.spark.sqldw") \
.mode('overwrite') \
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
.option("forward_spark_azure_storage_credentials", "true") \
.option("dbtable", "my_table_in_dw_copy") \
.option("tempdir", "wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<your-directory-name>") \
.save()
spark.conf.set("spark.sql.shuffle.partitions", "1")
query = (
spark.readStream.format("rate").load()
.selectExpr("value % 10 as key")
.groupBy("key")
.count()
.toDF("key", "count")
.writeStream
.foreachBatch(writeToSQLWarehouse)
.outputMode("update")
.start()
)
Escreva no Amazon DynamoDB usando foreach()
em Scala e Python
streamingDF.writeStream.foreach()
permite que o senhor grave a saída de uma consulta de transmissão em locais arbitrários.
Use Python
Este exemplo mostra como usar streamingDataFrame.writeStream.foreach()
em Python para gravar no DynamoDB. A primeira etapa obtém o recurso do DynamoDB Boto. Este exemplo foi escrito para usar access_key
e secret_key
, mas o site Databricks recomenda que o senhor use o perfil de instância. Veja o tutorial: Configurar o acesso S3 com um instance profile.
-
Defina alguns métodos auxiliares para criar uma tabela do DynamoDB para executar o exemplo.
Pythontable_name = "PythonForeachTest"
def get_dynamodb():
import boto3
access_key = "<access key>"
secret_key = "<secret key>"
region = "<region name>"
return boto3.resource('dynamodb',
aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
region_name=region)
def createTableIfNotExists():
'''
Create a DynamoDB table if it does not exist.
This must be run on the Spark driver, and not inside foreach.
'''
dynamodb = get_dynamodb()
existing_tables = dynamodb.meta.client.list_tables()['TableNames']
if table_name not in existing_tables:
print("Creating table %s" % table_name)
table = dynamodb.create_table(
TableName=table_name,
KeySchema=[ { 'AttributeName': 'key', 'KeyType': 'HASH' } ],
AttributeDefinitions=[ { 'AttributeName': 'key', 'AttributeType': 'S' } ],
ProvisionedThroughput = { 'ReadCapacityUnits': 5, 'WriteCapacityUnits': 5 }
)
print("Waiting for table to be ready")
table.meta.client.get_waiter('table_exists').wait(TableName=table_name) -
Defina as classes e os métodos que gravam no DynamoDB e, em seguida, chame-os a partir de
foreach
. Há duas maneiras de especificar sua lógica personalizada emforeach
.-
Use uma função: Essa é a abordagem simples que pode ser usada para escrever 1 linha por vez. No entanto, a inicialização do cliente/conexão para escrever uma linha será feita em cada chamada.
Pythondef sendToDynamoDB_simple(row):
'''
Function to send a row to DynamoDB.
When used with `foreach`, this method is going to be called in the executor
with the generated output rows.
'''
# Create client object in the executor,
# do not use client objects created in the driver
dynamodb = get_dynamodb()
dynamodb.Table(table_name).put_item(
Item = { 'key': str(row['key']), 'count': row['count'] }) -
Use uma classe com os métodos
open
,process
eclose
: isso permite uma implementação mais eficiente em que um cliente/conexão é inicializado e várias linhas podem ser gravadas.Pythonclass SendToDynamoDB_ForeachWriter:
'''
Class to send a set of rows to DynamoDB.
When used with `foreach`, copies of this class is going to be used to write
multiple rows in the executor. See the python docs for `DataStreamWriter.foreach`
for more details.
'''
def open(self, partition_id, epoch_id):
# This is called first when preparing to send multiple rows.
# Put all the initialization code inside open() so that a fresh
# copy of this class is initialized in the executor where open()
# will be called.
self.dynamodb = get_dynamodb()
return True
def process(self, row):
# This is called for each row after open() has been called.
# This implementation sends one row at a time.
# For further enhancements, contact the Spark+DynamoDB connector
# team: https://github.com/audienceproject/spark-dynamodb
self.dynamodb.Table(table_name).put_item(
Item = { 'key': str(row['key']), 'count': row['count'] })
def close(self, err):
# This is called after all the rows have been processed.
if err:
raise err
-
-
Invoque o site
foreach
em sua consulta de transmissão com a função ou objeto acima.Pythonfrom pyspark.sql.functions import *
spark.conf.set("spark.sql.shuffle.partitions", "1")
query = (
spark.readStream.format("rate").load()
.selectExpr("value % 10 as key")
.groupBy("key")
.count()
.toDF("key", "count")
.writeStream
.foreach(SendToDynamoDB_ForeachWriter())
#.foreach(sendToDynamoDB_simple) // alternative, use one or the other
.outputMode("update")
.start()
)
Use Scala
Este exemplo mostra como usar streamingDataFrame.writeStream.foreach()
em Scala para gravar no DynamoDB.
Para executar isso, o senhor terá de criar uma tabela DynamoDB que tenha uma única cadeia de caracteres key chamada "value".
-
Defina uma implementação da interface
ForeachWriter
que executa a gravação.Scalaimport org.apache.spark.sql.{ForeachWriter, Row}
import com.amazonaws.AmazonServiceException
import com.amazonaws.auth._
import com.amazonaws.services.dynamodbv2.AmazonDynamoDB
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder
import com.amazonaws.services.dynamodbv2.model.AttributeValue
import com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException
import java.util.ArrayList
import scala.collection.JavaConverters._
class DynamoDbWriter extends ForeachWriter[Row] {
private val tableName = "<table name>"
private val accessKey = "<aws access key>"
private val secretKey = "<aws secret key>"
private val regionName = "<region>"
// This will lazily be initialized only when open() is called
lazy val ddb = AmazonDynamoDBClientBuilder.standard()
.withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials(accessKey, secretKey)))
.withRegion(regionName)
.build()
//
// This is called first when preparing to send multiple rows.
// Put all the initialization code inside open() so that a fresh
// copy of this class is initialized in the executor where open()
// will be called.
//
def open(partitionId: Long, epochId: Long) = {
ddb // force the initialization of the client
true
}
//
// This is called for each row after open() has been called.
// This implementation sends one row at a time.
// A more efficient implementation can be to send batches of rows at a time.
//
def process(row: Row) = {
val rowAsMap = row.getValuesMap(row.schema.fieldNames)
val dynamoItem = rowAsMap.mapValues {
v: Any => new AttributeValue(v.toString)
}.asJava
ddb.putItem(tableName, dynamoItem)
}
//
// This is called after all the rows have been processed.
//
def close(errorOrNull: Throwable) = {
ddb.shutdown()
}
} -
Use o endereço
DynamoDbWriter
para gravar uma transmissão de taxa no DynamoDB.Scalaspark.readStream
.format("rate")
.load()
.select("value")
.writeStream
.foreach(new DynamoDbWriter)
.start()
transmissão-transmissão join
Estes dois Notebooks mostram como usar a união transmissão-transmissão em Python e Scala.