padrões estruturados transmitidos em Databricks
Contém Notebook e amostras de código para padrões comuns para trabalhar com transmissão estruturada em Databricks.
Começando a começar com transmissão estruturada
Se você é novo na transmissão estruturada, veja execução sua primeira carga de trabalho da transmissão estruturada.
Escreva para Cassandra como um coletor para transmissão estruturada em Python
O Apache Cassandra é um banco de dados OLTP distribuído, de baixa latência, escalável e altamente disponível.
transmissão estructurada funciona com Cassandra através do Spark Cassandra Connector. Esse conector oferece suporte a APIs RDD e DataFrame e possui suporte nativo para gravar dados transmitidos. *Importante* Você deve usar a versão correspondente do spark-Cassandra-connector-assembly.
O exemplo a seguir se conecta a um ou mais hosts em clusters de banco de dados Cassandra. Ele também especifica as configurações de conexão, como a localização do ponto de verificação e o espaço-chave específico e os nomes das tabelas:
spark.conf.set("spark.cassandra.connection.host", "host1,host2")
df.writeStream \
.format("org.apache.spark.sql.cassandra") \
.outputMode("append") \
.option("checkpointLocation", "/path/to/checkpoint") \
.option("keyspace", "keyspace_name") \
.option("table", "table_name") \
.start()
Gravar no Azure Synapse analítico usando foreachBatch()
em Python
streamingDF.writeStream.foreachBatch()
permite que você reutilize gravadores de dados de lotes existentes para gravar a saída de uma query de transmissão para o Azure Synapse analítico. Consulte a documentação do foreachBatch para obter detalhes.
Para executar este exemplo, você precisa do conector Azure Synapse Analytics. Para obter detalhes sobre o conector Azure Synapse Analytics, consulte dadosquery em Azure Synapse Analytics.
from pyspark.sql.functions import *
from pyspark.sql import *
def writeToSQLWarehouse(df, epochId):
df.write \
.format("com.databricks.spark.sqldw") \
.mode('overwrite') \
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
.option("forward_spark_azure_storage_credentials", "true") \
.option("dbtable", "my_table_in_dw_copy") \
.option("tempdir", "wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<your-directory-name>") \
.save()
spark.conf.set("spark.sql.shuffle.partitions", "1")
query = (
spark.readStream.format("rate").load()
.selectExpr("value % 10 as key")
.groupBy("key")
.count()
.toDF("key", "count")
.writeStream
.foreachBatch(writeToSQLWarehouse)
.outputMode("update")
.start()
)
Gravar no Amazon DynamoDB usando foreach()
em Scala e Python
streamingDF.writeStream.foreach()
permite gravar a saída de uma query de transmissão em locais arbitrários.
Usar Python
Este exemplo mostra como usar streamingDataFrame.writeStream.foreach()
em Python para gravar no DynamoDB. O primeiro passo obtém o recurso DynamoDB Boto . Este exemplo foi escrito para usar access_key
e secret_key
, mas a Databricks recomenda que você use instance profile. Consulte Tutorial: Configurar o acesso ao S3 com um instance profile.
Defina alguns métodos auxiliares para criar a tabela do DynamoDB para executar o exemplo.
table_name = "PythonForeachTest" def get_dynamodb(): import boto3 access_key = "<access key>" secret_key = "<secret key>" region = "<region name>" return boto3.resource('dynamodb', aws_access_key_id=access_key, aws_secret_access_key=secret_key, region_name=region) def createTableIfNotExists(): ''' Create a DynamoDB table if it does not exist. This must be run on the Spark driver, and not inside foreach. ''' dynamodb = get_dynamodb() existing_tables = dynamodb.meta.client.list_tables()['TableNames'] if table_name not in existing_tables: print("Creating table %s" % table_name) table = dynamodb.create_table( TableName=table_name, KeySchema=[ { 'AttributeName': 'key', 'KeyType': 'HASH' } ], AttributeDefinitions=[ { 'AttributeName': 'key', 'AttributeType': 'S' } ], ProvisionedThroughput = { 'ReadCapacityUnits': 5, 'WriteCapacityUnits': 5 } ) print("Waiting for table to be ready") table.meta.client.get_waiter('table_exists').wait(TableName=table_name)
Defina as classes e métodos que gravam no DynamoDB e chame-os de
foreach
. Há duas maneiras de especificar sua lógica personalizada emforeach
.Use uma função: esta é a abordagem simples que pode ser usada para escrever uma linha por vez. No entanto, a inicialização do cliente/conexão para escrever uma linha será feita em todas as chamadas.
def sendToDynamoDB_simple(row): ''' Function to send a row to DynamoDB. When used with `foreach`, this method is going to be called in the executor with the generated output rows. ''' # Create client object in the executor, # do not use client objects created in the driver dynamodb = get_dynamodb() dynamodb.Table(table_name).put_item( Item = { 'key': str(row['key']), 'count': row['count'] })
Use uma classe com os métodos
open
,process
eclose
: isso permite uma implementação mais eficiente em que um cliente/conexão é inicializado e várias linhas podem ser gravadas.class SendToDynamoDB_ForeachWriter: ''' Class to send a set of rows to DynamoDB. When used with `foreach`, copies of this class is going to be used to write multiple rows in the executor. See the python docs for `DataStreamWriter.foreach` for more details. ''' def open(self, partition_id, epoch_id): # This is called first when preparing to send multiple rows. # Put all the initialization code inside open() so that a fresh # copy of this class is initialized in the executor where open() # will be called. self.dynamodb = get_dynamodb() return True def process(self, row): # This is called for each row after open() has been called. # This implementation sends one row at a time. # For further enhancements, contact the Spark+DynamoDB connector # team: https://github.com/audienceproject/spark-dynamodb self.dynamodb.Table(table_name).put_item( Item = { 'key': str(row['key']), 'count': row['count'] }) def close(self, err): # This is called after all the rows have been processed. if err: raise err
Invoque
foreach
em sua query de transmissão com a função ou objeto acima.from pyspark.sql.functions import * spark.conf.set("spark.sql.shuffle.partitions", "1") query = ( spark.readStream.format("rate").load() .selectExpr("value % 10 as key") .groupBy("key") .count() .toDF("key", "count") .writeStream .foreach(SendToDynamoDB_ForeachWriter()) #.foreach(sendToDynamoDB_simple) // alternative, use one or the other .outputMode("update") .start() )
Usar Scala
Este exemplo mostra como usar streamingDataFrame.writeStream.foreach()
no Scala para gravar no DynamoDB.
Para executar isso, você terá que criar uma tabela do DynamoDB que tenha uma única key strings chamada “value”.
Defina uma implementação da interface
ForeachWriter
que executa a gravação.import org.apache.spark.sql.{ForeachWriter, Row} import com.amazonaws.AmazonServiceException import com.amazonaws.auth._ import com.amazonaws.services.dynamodbv2.AmazonDynamoDB import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder import com.amazonaws.services.dynamodbv2.model.AttributeValue import com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException import java.util.ArrayList import scala.collection.JavaConverters._ class DynamoDbWriter extends ForeachWriter[Row] { private val tableName = "<table name>" private val accessKey = "<aws access key>" private val secretKey = "<aws secret key>" private val regionName = "<region>" // This will lazily be initialized only when open() is called lazy val ddb = AmazonDynamoDBClientBuilder.standard() .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials(accessKey, secretKey))) .withRegion(regionName) .build() // // This is called first when preparing to send multiple rows. // Put all the initialization code inside open() so that a fresh // copy of this class is initialized in the executor where open() // will be called. // def open(partitionId: Long, epochId: Long) = { ddb // force the initialization of the client true } // // This is called for each row after open() has been called. // This implementation sends one row at a time. // A more efficient implementation can be to send batches of rows at a time. // def process(row: Row) = { val rowAsMap = row.getValuesMap(row.schema.fieldNames) val dynamoItem = rowAsMap.mapValues { v: Any => new AttributeValue(v.toString) }.asJava ddb.putItem(tableName, dynamoItem) } // // This is called after all the rows have been processed. // def close(errorOrNull: Throwable) = { ddb.shutdown() } }
Use o
DynamoDbWriter
para gravar uma taxa transmitida no DynamoDB.spark.readStream .format("rate") .load() .select("value") .writeStream .foreach(new DynamoDbWriter) .start()