padrões estruturados transmitidos em Databricks

Contém Notebook e amostras de código para padrões comuns para trabalhar com transmissão estruturada em Databricks.

Começando a começar com transmissão estruturada

Se você é novo na transmissão estruturada, veja execução sua primeira carga de trabalho da transmissão estruturada.

Escreva para Cassandra como um coletor para transmissão estruturada em Python

O Apache Cassandra é um banco de dados OLTP distribuído, de baixa latência, escalável e altamente disponível.

transmissão estructurada funciona com Cassandra através do Spark Cassandra Connector. Esse conector oferece suporte a APIs RDD e DataFrame e possui suporte nativo para gravar dados transmitidos. *Importante* Você deve usar a versão correspondente do spark-Cassandra-connector-assembly.

O exemplo a seguir se conecta a um ou mais hosts em clusters de banco de dados Cassandra. Ele também especifica as configurações de conexão, como a localização do ponto de verificação e o espaço-chave específico e os nomes das tabelas:

spark.conf.set("spark.cassandra.connection.host", "host1,host2")

df.writeStream \
  .format("org.apache.spark.sql.cassandra") \
  .outputMode("append") \
  .option("checkpointLocation", "/path/to/checkpoint") \
  .option("keyspace", "keyspace_name") \
  .option("table", "table_name") \
  .start()

Gravar no Azure Synapse analítico usando foreachBatch() em Python

streamingDF.writeStream.foreachBatch() permite que você reutilize gravadores de dados de lotes existentes para gravar a saída de uma query de transmissão para o Azure Synapse analítico. Consulte a documentação do foreachBatch para obter detalhes.

Para executar este exemplo, você precisa do conector Azure Synapse Analytics. Para obter detalhes sobre o conector Azure Synapse Analytics, consulte dadosquery em Azure Synapse Analytics.

from pyspark.sql.functions import *
from pyspark.sql import *

def writeToSQLWarehouse(df, epochId):
  df.write \
    .format("com.databricks.spark.sqldw") \
    .mode('overwrite') \
    .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>") \
    .option("forward_spark_azure_storage_credentials", "true") \
    .option("dbtable", "my_table_in_dw_copy") \
    .option("tempdir", "wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<your-directory-name>") \
    .save()

spark.conf.set("spark.sql.shuffle.partitions", "1")

query = (
  spark.readStream.format("rate").load()
    .selectExpr("value % 10 as key")
    .groupBy("key")
    .count()
    .toDF("key", "count")
    .writeStream
    .foreachBatch(writeToSQLWarehouse)
    .outputMode("update")
    .start()
    )

Gravar no Amazon DynamoDB usando foreach() em Scala e Python

streamingDF.writeStream.foreach() permite gravar a saída de uma query de transmissão em locais arbitrários.

Usar Python

Este exemplo mostra como usar streamingDataFrame.writeStream.foreach() em Python para gravar no DynamoDB. O primeiro passo obtém o recurso DynamoDB Boto . Este exemplo foi escrito para usar access_key e secret_key, mas a Databricks recomenda que você use instance profile. Consulte Tutorial: Configurar o acesso ao S3 com um instance profile.

  1. Defina alguns métodos auxiliares para criar a tabela do DynamoDB para executar o exemplo.

    table_name = "PythonForeachTest"
    
    def get_dynamodb():
      import boto3
    
      access_key = "<access key>"
      secret_key = "<secret key>"
      region = "<region name>"
      return boto3.resource('dynamodb',
                     aws_access_key_id=access_key,
                     aws_secret_access_key=secret_key,
                     region_name=region)
    
    def createTableIfNotExists():
        '''
        Create a DynamoDB table if it does not exist.
        This must be run on the Spark driver, and not inside foreach.
        '''
        dynamodb = get_dynamodb()
    
        existing_tables = dynamodb.meta.client.list_tables()['TableNames']
        if table_name not in existing_tables:
          print("Creating table %s" % table_name)
          table = dynamodb.create_table(
              TableName=table_name,
              KeySchema=[ { 'AttributeName': 'key', 'KeyType': 'HASH' } ],
              AttributeDefinitions=[ { 'AttributeName': 'key', 'AttributeType': 'S' } ],
              ProvisionedThroughput = { 'ReadCapacityUnits': 5, 'WriteCapacityUnits': 5 }
          )
    
          print("Waiting for table to be ready")
    
    table.meta.client.get_waiter('table_exists').wait(TableName=table_name)
    
  2. Defina as classes e métodos que gravam no DynamoDB e chame-os de foreach. Há duas maneiras de especificar sua lógica personalizada em foreach.

    • Use uma função: esta é a abordagem simples que pode ser usada para escrever uma linha por vez. No entanto, a inicialização do cliente/conexão para escrever uma linha será feita em todas as chamadas.

      def sendToDynamoDB_simple(row):
        '''
        Function to send a row to DynamoDB.
        When used with `foreach`, this method is going to be called in the executor
        with the generated output rows.
        '''
        # Create client object in the executor,
        # do not use client objects created in the driver
        dynamodb = get_dynamodb()
      
        dynamodb.Table(table_name).put_item(
            Item = { 'key': str(row['key']), 'count': row['count'] })
      
    • Use uma classe com os métodos open, process e close : isso permite uma implementação mais eficiente em que um cliente/conexão é inicializado e várias linhas podem ser gravadas.

      class SendToDynamoDB_ForeachWriter:
        '''
        Class to send a set of rows to DynamoDB.
        When used with `foreach`, copies of this class is going to be used to write
        multiple rows in the executor. See the python docs for `DataStreamWriter.foreach`
        for more details.
        '''
      
        def open(self, partition_id, epoch_id):
          # This is called first when preparing to send multiple rows.
          # Put all the initialization code inside open() so that a fresh
          # copy of this class is initialized in the executor where open()
          # will be called.
          self.dynamodb = get_dynamodb()
          return True
      
        def process(self, row):
          # This is called for each row after open() has been called.
          # This implementation sends one row at a time.
          # For further enhancements, contact the Spark+DynamoDB connector
          # team: https://github.com/audienceproject/spark-dynamodb
          self.dynamodb.Table(table_name).put_item(
              Item = { 'key': str(row['key']), 'count': row['count'] })
      
        def close(self, err):
          # This is called after all the rows have been processed.
          if err:
            raise err
      
  3. Invoque foreach em sua query de transmissão com a função ou objeto acima.

    from pyspark.sql.functions import *
    
    spark.conf.set("spark.sql.shuffle.partitions", "1")
    
    query = (
      spark.readStream.format("rate").load()
        .selectExpr("value % 10 as key")
        .groupBy("key")
        .count()
        .toDF("key", "count")
        .writeStream
        .foreach(SendToDynamoDB_ForeachWriter())
        #.foreach(sendToDynamoDB_simple)  // alternative, use one or the other
        .outputMode("update")
        .start()
    )
    

Usar Scala

Este exemplo mostra como usar streamingDataFrame.writeStream.foreach() no Scala para gravar no DynamoDB.

Para executar isso, você terá que criar uma tabela do DynamoDB que tenha uma única key strings chamada “value”.

  1. Defina uma implementação da interface ForeachWriter que executa a gravação.

    import org.apache.spark.sql.{ForeachWriter, Row}
    import com.amazonaws.AmazonServiceException
    import com.amazonaws.auth._
    import com.amazonaws.services.dynamodbv2.AmazonDynamoDB
    import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder
    import com.amazonaws.services.dynamodbv2.model.AttributeValue
    import com.amazonaws.services.dynamodbv2.model.ResourceNotFoundException
    import java.util.ArrayList
    
    import scala.collection.JavaConverters._
    
    class DynamoDbWriter extends ForeachWriter[Row] {
      private val tableName = "<table name>"
      private val accessKey = "<aws access key>"
      private val secretKey = "<aws secret key>"
      private val regionName = "<region>"
    
      // This will lazily be initialized only when open() is called
      lazy val ddb = AmazonDynamoDBClientBuilder.standard()
        .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials(accessKey, secretKey)))
        .withRegion(regionName)
        .build()
    
      //
      // This is called first when preparing to send multiple rows.
      // Put all the initialization code inside open() so that a fresh
      // copy of this class is initialized in the executor where open()
      // will be called.
      //
      def open(partitionId: Long, epochId: Long) = {
        ddb  // force the initialization of the client
        true
      }
    
      //
      // This is called for each row after open() has been called.
      // This implementation sends one row at a time.
      // A more efficient implementation can be to send batches of rows at a time.
      //
      def process(row: Row) = {
        val rowAsMap = row.getValuesMap(row.schema.fieldNames)
        val dynamoItem = rowAsMap.mapValues {
          v: Any => new AttributeValue(v.toString)
        }.asJava
    
        ddb.putItem(tableName, dynamoItem)
      }
    
      //
      // This is called after all the rows have been processed.
      //
      def close(errorOrNull: Throwable) = {
        ddb.shutdown()
      }
    }
    
  2. Use o DynamoDbWriter para gravar uma taxa transmitida no DynamoDB.

    spark.readStream
      .format("rate")
      .load()
      .select("value")
      .writeStream
      .foreach(new DynamoDbWriter)
      .start()
    

transmissão-transmissão junta

Esses dois Notebook mostram como usar join de transmissão-transmissão em Python e Scala.

transmissão-transmissão junta-se ao Python Notebook

Abra o bloco de anotações em outra guia

transmissão-transmissão junta-se ao NotebookScala

Abra o bloco de anotações em outra guia