import java.nio.ByteBuffer
import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer
import com.amazonaws.auth._
import com.amazonaws.regions.Regions
import com.amazonaws.services.kinesis._
import com.amazonaws.services.kinesis.model._
import org.apache.spark.sql.ForeachWriter
/**
* A simple Sink that writes to the given Amazon Kinesis `stream` in the given `region`. For authentication, users may provide
* `awsAccessKey` and `awsSecretKey`, or use IAM Roles when launching their cluster.
*
* This Sink takes a two column Dataset, with the columns being the `partitionKey`, and the `data` respectively.
* We will buffer data up to `maxBufferSize` before flushing to Kinesis in order to reduce cost.
*
* If there are failures from the Kinesis PutRecords response, we will retry failed records `maxRetries` times with exponential
* backoff. We will throw an Exception if there are still failed records after `maxRetries` attempted.
*/
class KinesisSink(
stream: String,
region: String,
awsAccessKey: Option[String] = None,
awsSecretKey: Option[String] = None) extends ForeachWriter[(String, Array[Byte])] {
// Configurations
private val maxBufferSize = 500 * 1024 // 500 KB
// Max num of retries if putting records into Kinesis failed partially. Exponential backoff policy will be used to retry the failed records.
private val maxRetries = 3
private val retryIntervalMs = 100
private var client: AmazonKinesis = _
private val buffer = new ArrayBuffer[PutRecordsRequestEntry]()
private var bufferSize: Long = 0L
override def open(partitionId: Long, version: Long): Boolean = {
client = createClient
true
}
override def process(value: (String, Array[Byte])): Unit = {
val (partitionKey, data) = value
// Maximum of 500 records can be sent with a single `putRecords` request
if ((data.length + bufferSize > maxBufferSize && buffer.nonEmpty) || buffer.length == 500) {
flush()
}
buffer += new PutRecordsRequestEntry().withPartitionKey(partitionKey).withData(ByteBuffer.wrap(data))
bufferSize += data.length
}
override def close(errorOrNull: Throwable): Unit = {
if (buffer.nonEmpty) {
flush()
}
client.shutdown()
}
/** Flush the buffer to Kinesis */
private def flush(): Unit = {
val recordRequest = new PutRecordsRequest()
.withStreamName(stream)
.withRecords(buffer: _*)
var putRecordsResult = client.putRecords(recordRequest)
var retry = 0
var retryBuffer = buffer
while (putRecordsResult.getFailedRecordCount > 0 && retry < maxRetries) {
val retryWaitTimeMs = math.pow(2, retry).toLong * retryIntervalMs
println(s"Waiting for $retryWaitTimeMs ms before retrying put records into Kinesis.")
Thread.sleep(retryWaitTimeMs)
println(s"Retry ${putRecordsResult.getFailedRecordCount} failed records. Num of retry = ${retry+1}")
val failedRecordsBuffer = new ArrayBuffer[PutRecordsRequestEntry]()
val putRecordsResultEntryList = putRecordsResult.getRecords
for (i <- 0 until putRecordsResultEntryList.size()) {
if (putRecordsResultEntryList.get(i).getErrorCode != null) {
failedRecordsBuffer += retryBuffer(i)
}
}
retryBuffer = failedRecordsBuffer
val recordRetryRequest = new PutRecordsRequest()
.withStreamName(stream)
.withRecords(retryBuffer: _*)
putRecordsResult = client.putRecords(recordRetryRequest)
retry += 1
}
if (putRecordsResult.getFailedRecordCount > 0) {
throw new Exception(s"Failed to put ${putRecordsResult.getFailedRecordCount} records into Kinesis stream ${stream}.")
}
buffer.clear()
bufferSize = 0
}
/** Create a Kinesis client. */
private def createClient: AmazonKinesis = {
val cli = if (awsAccessKey.isEmpty || awsSecretKey.isEmpty) {
// Role based access
AmazonKinesisClientBuilder.standard()
.withRegion(region)
.build()
} else {
// Key based access
AmazonKinesisClientBuilder.standard()
.withRegion(region)
.withCredentials(new AWSStaticCredentialsProvider(new BasicAWSCredentials(awsAccessKey.get, awsSecretKey.get)))
.build()
}
cli
}
}
import java.nio.ByteBuffer
import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer
import com.amazonaws.auth._
import com.amazonaws.regions.Regions
import com.amazonaws.services.kinesis._
import com.amazonaws.services.kinesis.model._
import org.apache.spark.sql.ForeachWriter
defined class KinesisSink