structured-streaming-kinesis-sink(Scala)

Loading...
import java.nio.ByteBuffer
 
import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer
 
import com.amazonaws.auth._
import com.amazonaws.regions.Regions
import com.amazonaws.services.kinesis._
import com.amazonaws.services.kinesis.model._
 
import org.apache.spark.sql.ForeachWriter
 
/**
 * A simple Sink that writes to the given Amazon Kinesis `stream` in the given `region`. For authentication, users may provide
 * `awsAccessKey` and `awsSecretKey`, or use IAM Roles when launching their cluster.
 *
 * This Sink takes a two column Dataset, with the columns being the `partitionKey`, and the `data` respectively.
 * We will buffer data up to `maxBufferSize` before flushing to Kinesis in order to reduce cost.
 *
 * If there are failures from the Kinesis PutRecords response, we will retry failed records `maxRetries` times with exponential
 * backoff. We will throw an Exception if there are still failed records after `maxRetries` attempted.
 */
class KinesisSink(
    stream: String,
    region: String,
    awsAccessKey: Option[String] = None,
    awsSecretKey: Option[String] = None) extends ForeachWriter[(String, Array[Byte])] { 
 
  // Configurations
  private val maxBufferSize = 500 * 1024 // 500 KB
  // Max num of retries if putting records into Kinesis failed partially. Exponential backoff policy will be used to retry the failed records.
  private val maxRetries = 3
  private val retryIntervalMs = 100
  
  private var client: AmazonKinesis = _
  private val buffer = new ArrayBuffer[PutRecordsRequestEntry]()
  private var bufferSize: Long = 0L
 
  override def open(partitionId: Long, version: Long): Boolean = {
    client = createClient
    true
  }
 
  override def process(value: (String, Array[Byte])): Unit = {
    val (partitionKey, data) = value
    // Maximum of 500 records can be sent with a single `putRecords` request
    if ((data.length + bufferSize > maxBufferSize && buffer.nonEmpty) || buffer.length == 500) {
      flush()
    }
    buffer += new PutRecordsRequestEntry().withPartitionKey(partitionKey).withData(ByteBuffer.wrap(data))
    bufferSize += data.length
  }
 
  override def close(errorOrNull: Throwable): Unit = {
    if (buffer.nonEmpty) {
      flush()
    }
    client.shutdown()
  }
  
  /** Flush the buffer to Kinesis */
  private def flush(): Unit = {
    val recordRequest = new PutRecordsRequest()
      .withStreamName(stream)
      .withRecords(buffer: _*)
    
    var putRecordsResult = client.putRecords(recordRequest)
    
    var retry = 0
    var retryBuffer = buffer
    while (putRecordsResult.getFailedRecordCount > 0 && retry < maxRetries) {
      val retryWaitTimeMs = math.pow(2, retry).toLong * retryIntervalMs
      println(s"Waiting for $retryWaitTimeMs ms before retrying put records into Kinesis.")
      Thread.sleep(retryWaitTimeMs)
      
      println(s"Retry ${putRecordsResult.getFailedRecordCount} failed records. Num of retry = ${retry+1}")
      val failedRecordsBuffer = new ArrayBuffer[PutRecordsRequestEntry]()
      val putRecordsResultEntryList = putRecordsResult.getRecords
      for (i <- 0 until putRecordsResultEntryList.size()) {
        if (putRecordsResultEntryList.get(i).getErrorCode != null) {
          failedRecordsBuffer += retryBuffer(i)
        }
      }
      retryBuffer = failedRecordsBuffer
      val recordRetryRequest = new PutRecordsRequest()
        .withStreamName(stream)
        .withRecords(retryBuffer: _*)
      putRecordsResult = client.putRecords(recordRetryRequest)
      retry += 1
    }
    
    if (putRecordsResult.getFailedRecordCount > 0) {
      throw new Exception(s"Failed to put ${putRecordsResult.getFailedRecordCount} records into Kinesis stream ${stream}.")
    }
    
    buffer.clear()
    bufferSize = 0
  }
  
  /** Create a Kinesis client. */
  private def createClient: AmazonKinesis = {
    val cli = if (awsAccessKey.isEmpty || awsSecretKey.isEmpty) {
      // Role based access
      AmazonKinesisClientBuilder.standard()
        .withRegion(region)
        .build()
    } else {
      // Key based access
      AmazonKinesisClientBuilder.standard()
        .withRegion(region)
        .withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials(awsAccessKey.get, awsSecretKey.get)))
        .build()
    }
    cli
  }
}
import java.nio.ByteBuffer import scala.collection.JavaConversions._ import scala.collection.mutable.ArrayBuffer import com.amazonaws.auth._ import com.amazonaws.regions.Regions import com.amazonaws.services.kinesis._ import com.amazonaws.services.kinesis.model._ import org.apache.spark.sql.ForeachWriter defined class KinesisSink