Avro benchmark(Scala)
import java.sql.Date
import java.util
import java.util.concurrent.TimeUnit

import scala.collection.JavaConverters._
import scala.math.BigDecimal.RoundingMode
import scala.util.Random

import com.google.common.io.Files

import org.apache.spark.sql._
import org.apache.spark.sql.types._

val tempDir = Files.createTempDir()
val avroDir = tempDir + "./avro"
val numberOfRows = 1000000
val defaultSize = 100 // Size used for items in generated RDD like strings, arrays and maps

val testSchema = StructType(Seq(
  StructField("StringField", StringType, false),
  StructField("IntField", IntegerType, true),
  StructField("dateField", DateType, true),
  StructField("DoubleField", DoubleType, false),
  StructField("DecimalField", DecimalType(10, 10), true),
  StructField("ArrayField", ArrayType(BooleanType), false),
  StructField("MapField", MapType(StringType, IntegerType), true),
  StructField("StructField", StructType(Seq(StructField("id", IntegerType, true))), false)))

/**
 * This function generates a random map(string, int) of a given size.
 */
private def generateRandomMap(rand: Random, size: Int): java.util.Map[String, Int] = {
  val jMap = new util.HashMap[String, Int]()
  for (i <- 0 until size) {
    jMap.put(rand.nextString(5), i)
  }
  jMap
}

/**
 * This function generates a random array of booleans of a given size.
 */
private def generateRandomArray(rand: Random, size: Int): util.ArrayList[Boolean] = {
  val vec = new util.ArrayList[Boolean]()
  for (i <- 0 until size) {
    vec.add(rand.nextBoolean())
  }
  vec
}

private def generateRandomRow(): Row = {
  val rand = new Random()
  Row(rand.nextString(defaultSize), rand.nextInt(), new Date(rand.nextLong()), rand.nextDouble(),
    BigDecimal(rand.nextDouble()).setScale(10, RoundingMode.HALF_UP),
    generateRandomArray(rand, defaultSize).asScala,
    generateRandomMap(rand, defaultSize).asScala, Row(rand.nextInt()))
}

println(s"\n\n\nPreparing for a benchmark test - creating a RDD with $numberOfRows rows\n\n\n")

val testDataFrame = spark.createDataFrame(
  spark.sparkContext.parallelize(0 until numberOfRows).map(_ => generateRandomRow()),
  testSchema)
Preparing for a benchmark test - creating a RDD with 1000000 rows import java.sql.Date import java.util import java.util.concurrent.TimeUnit import scala.collection.JavaConverters._ import scala.math.BigDecimal.RoundingMode import scala.util.Random import com.google.common.io.Files import org.apache.spark.sql._ import org.apache.spark.sql.types._ tempDir: java.io.File = /local_disk0/tmp/1537341861893-0 avroDir: String = /local_disk0/tmp/1537341861893-0./avro numberOfRows: Int = 1000000 defaultSize: Int = 100 testSchema: org.apache.spark.sql.types.StructType = StructType(StructField(StringField,StringType,false), StructField(IntField,IntegerType,true), StructField(dateField,DateType,true), StructField(DoubleField,DoubleType,false), StructField(DecimalField,DecimalType(10,10),true), StructField(ArrayField,ArrayType(BooleanType,true),false), StructField(MapField,MapType(StringType,IntegerType,true),true), StructField(StructField,StructType(StructField(id,IntegerType,true)),false)) testDataFrame: org.apache.spark.sql.DataFrame = [StringField: string, IntField: int ... 6 more fields]
/**
 * spark-avro write benchmark.
 */
spark.time {
  testDataFrame.write.format("avro").save(avroDir)
}
Time taken: 89219 ms
/**
 * spark-avro read benchmark.
 */
spark.time {
  spark.read.format("avro").load(avroDir).queryExecution.toRdd.foreach(_ => ())
}
Time taken: 28738 ms
// Clean up output files
import org.apache.commons.io.FileUtils

FileUtils.deleteDirectory(tempDir)
import org.apache.commons.io.FileUtils