Pular para o conteúdo principal

transmissão estructurada patterns on Databricks

Contém Notebook e exemplos de código para padrões comuns de trabalho com transmissão estruturada em Databricks.

Como começar com a transmissão estruturada

Se o senhor for novato em transmissão estruturada, veja como executar sua primeira carga de trabalho de transmissão estruturada.

Escreva para Cassandra como um sumidouro para transmissão estruturada em Python

O Apache Cassandra é um banco de dados OLTP distribuído, de baixa latência, escalável e altamente disponível.

A transmissão estruturada funciona com o site Cassandra por meio do conectorSpark Cassandra. Esse conector é compatível com RDD e DataFrame APIs, e tem suporte nativo para gravação de dados de transmissão. Importante O senhor deve usar a versão correspondente da montagem do conector de faíscaCassandra.

O exemplo a seguir se conecta a um ou mais hosts em um clustering de banco de dados Cassandra. Ele também especifica configurações de conexão, como a localização do ponto de verificação e os nomes específicos do espaço de teclas e da tabela:

Python
spark.conf.set("spark.cassandra.connection.host", "host1,host2")

df.writeStream \
.format("org.apache.spark.sql.cassandra") \
.outputMode("append") \
.option("checkpointLocation", "/path/to/checkpoint") \
.option("keyspace", "keyspace_name") \
.option("table", "table_name") \
.start()

Escreva para o Azure Synapse Analytics usando foreachBatch() em Python

streamingDF.writeStream.foreachBatch() permite que o senhor reutilize os gravadores de dados de lotes existentes para gravar a saída de uma consulta de transmissão em Azure Synapse Analytics. Consulte a documentação do ForEachBatch para obter detalhes.

Para executar este exemplo, o senhor precisa do conector Azure Synapse Analytics. Para obter detalhes sobre o conector do Azure Synapse Analytics, consulte Consultar dados no Azure Synapse Analytics.

Python
from pyspark.sql.functions import *
from pyspark.sql import *

def writeToSQLWarehouse(df, epochId):
df.write \
.format("com.databricks.spark.sqldw") \
.mode('overwrite') \
.option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
.option("forward_spark_azure_storage_credentials", "true") \
.option("dbtable", "my_table_in_dw_copy") \
.option("tempdir", "wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<your-directory-name>") \
.save()

spark.conf.set("spark.sql.shuffle.partitions", "1")

query = (
spark.readStream.format("rate").load()
.selectExpr("value % 10 as key")
.groupBy("key")
.count()
.toDF("key", "count")
.writeStream
.foreachBatch(writeToSQLWarehouse)
.outputMode("update")
.start()
)

Escreva no Amazon DynamoDB usando foreach() em Scala e Python

streamingDF.writeStream.foreach() permite que o senhor grave a saída de uma consulta de transmissão em locais arbitrários.

Use Python

Este exemplo mostra como usar streamingDataFrame.writeStream.foreach() em Python para gravar no DynamoDB. A primeira etapa obtém o recurso do DynamoDB Boto. Este exemplo foi escrito para usar access_key e secret_key, mas o site Databricks recomenda que o senhor use o perfil de instância. Veja o tutorial: Configurar o acesso S3 com um instance profile.

  1. Defina alguns métodos auxiliares para criar uma tabela do DynamoDB para executar o exemplo.

    Python
    table_name = "PythonForeachTest"

    def get_dynamodb():
    import boto3

    access_key = "<access key>"
    secret_key = "<secret key>"
    region = "<region name>"
    return boto3.resource('dynamodb',
    aws_access_key_id=access_key,
    aws_secret_access_key=secret_key,
    region_name=region)

    def createTableIfNotExists():
    '''
    Create a DynamoDB table if it does not exist.
    This must be run on the Spark driver, and not inside foreach.
    '''
    dynamodb = get_dynamodb()

    existing_tables = dynamodb.meta.client.list_tables()['TableNames']
    if table_name not in existing_tables:
    print("Creating table %s" % table_name)
    table = dynamodb.create_table(
    TableName=table_name,
    KeySchema=[ { 'AttributeName': 'key', 'KeyType': 'HASH' } ],
    AttributeDefinitions=[ { 'AttributeName': 'key', 'AttributeType': 'S' } ],
    ProvisionedThroughput = { 'ReadCapacityUnits': 5, 'WriteCapacityUnits': 5 }
    )

    print("Waiting for table to be ready")

    table.meta.client.get_waiter('table_exists').wait(TableName=table_name)
  2. Defina as classes e os métodos que gravam no DynamoDB e, em seguida, chame-os a partir de foreach. Há duas maneiras de especificar sua lógica personalizada em foreach.

    • Use uma função: Essa é a abordagem simples que pode ser usada para escrever 1 linha por vez. No entanto, a inicialização do cliente/conexão para escrever uma linha será feita em cada chamada.

      Python
      def sendToDynamoDB_simple(row):
      '''
      Function to send a row to DynamoDB.
      When used with `foreach`, this method is going to be called in the executor
      with the generated output rows.
      '''
      # Create client object in the executor,
      # do not use client objects created in the driver
      dynamodb = get_dynamodb()

      dynamodb.Table(table_name).put_item(
      Item = { 'key': str(row['key']), 'count': row['count'] })
    • Use uma classe com os métodos open, process e close: isso permite uma implementação mais eficiente em que um cliente/conexão é inicializado e várias linhas podem ser gravadas.

      Python
      class SendToDynamoDB_ForeachWriter:
      '''
      Class to send a set of rows to DynamoDB.
      When used with `foreach`, copies of this class is going to be used to write
      multiple rows in the executor. See the python docs for `DataStreamWriter.foreach`
      for more details.
      '''

      def open(self, partition_id, epoch_id):
      # This is called first when preparing to send multiple rows.
      # Put all the initialization code inside open() so that a fresh
      # copy of this class is initialized in the executor where open()
      # will be called.
      self.dynamodb = get_dynamodb()
      return True

      def process(self, row):
      # This is called for each row after open() has been called.
      # This implementation sends one row at a time.
      # For further enhancements, contact the Spark+DynamoDB connector
      # team: https://github.com/audienceproject/spark-dynamodb
      self.dynamodb.Table(table_name).put_item(
      Item = { 'key': str(row['key']), 'count': row['count'] })

      def close(self, err):
      # This is called after all the rows have been processed.
      if err:
      raise err
  3. Invoque o site foreach em sua consulta de transmissão com a função ou objeto acima.

    Python
    from pyspark.sql.functions import *

    spark.conf.set("spark.sql.shuffle.partitions", "1")

    query = (
    spark.readStream.format("rate").load()
    .selectExpr("value % 10 as key")
    .groupBy("key")
    .count()
    .toDF("key", "count")
    .writeStream
    .foreach(SendToDynamoDB_ForeachWriter())
    #.foreach(sendToDynamoDB_simple) // alternative, use one or the other
    .outputMode("update")
    .start()
    )

Use Scala

Este exemplo mostra como usar streamingDataFrame.writeStream.foreach() em Scala para gravar no DynamoDB.

Para executar isso, o senhor terá de criar uma tabela DynamoDB que tenha uma única cadeia de caracteres key chamada "value".

  1. Defina uma implementação da interface ForeachWriter que executa a gravação.

    Scala
    import org.apache.spark.sql.{ForeachWriter, Row}
    import com.amazonaws.AmazonServiceException
    import com.amazonaws.auth._
    import com.amazonaws.services.dynamodbv2.AmazonDynamoDB
    import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder
    import com.amazonaws.services.dynamodbv2.model.AttributeValue
    import com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException
    import java.util.ArrayList

    import scala.collection.JavaConverters._

    class DynamoDbWriter extends ForeachWriter[Row] {
    private val tableName = "<table name>"
    private val accessKey = "<aws access key>"
    private val secretKey = "<aws secret key>"
    private val regionName = "<region>"

    // This will lazily be initialized only when open() is called
    lazy val ddb = AmazonDynamoDBClientBuilder.standard()
    .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials(accessKey, secretKey)))
    .withRegion(regionName)
    .build()

    //
    // This is called first when preparing to send multiple rows.
    // Put all the initialization code inside open() so that a fresh
    // copy of this class is initialized in the executor where open()
    // will be called.
    //
    def open(partitionId: Long, epochId: Long) = {
    ddb // force the initialization of the client
    true
    }

    //
    // This is called for each row after open() has been called.
    // This implementation sends one row at a time.
    // A more efficient implementation can be to send batches of rows at a time.
    //
    def process(row: Row) = {
    val rowAsMap = row.getValuesMap(row.schema.fieldNames)
    val dynamoItem = rowAsMap.mapValues {
    v: Any => new AttributeValue(v.toString)
    }.asJava

    ddb.putItem(tableName, dynamoItem)
    }

    //
    // This is called after all the rows have been processed.
    //
    def close(errorOrNull: Throwable) = {
    ddb.shutdown()
    }
    }
  2. Use o endereço DynamoDbWriter para gravar uma transmissão de taxa no DynamoDB.

    Scala
    spark.readStream
    .format("rate")
    .load()
    .select("value")
    .writeStream
    .foreach(new DynamoDbWriter)
    .start()

transmissão-transmissão join

Estes dois Notebooks mostram como usar a união transmissão-transmissão em Python e Scala.

transmissão-transmissão join Python Notebook

Open notebook in new tab

transmissão-transmissão join Scala Notebook

Open notebook in new tab