utils(Scala)
Loading...
package com.databricks.spark

import java.io.File
import java.nio.file.{Files, Path, Paths, StandardCopyOption}
import java.nio.file.attribute.PosixFilePermissions
import java.text.SimpleDateFormat
import java.util.{Date, Properties}

import scala.sys.process._

import org.apache.spark.SparkContext

/** Utility functions for being able to execute shell commands on executors using ssh. */
trait SSHUtils {
  
  val sc: SparkContext
  
  private def writeFile(path: String, contents: String, append: Boolean = false): Unit = {
    val fw = new java.io.FileWriter(path, append)
    fw.write(contents)
    fw.close()
  }
  
  lazy val publicKey = "cat /root/.ssh/id_rsa.pub".!!

  /**
   * Inject private key into executors so that the driver can ssh into them.
   */
  def addAuthorizedPublicKey(key: String): Unit = {
    writeFile("/home/ubuntu/.ssh/authorized_keys", "\n" + key, true)
  }

  /**
   * Ssh into the given `host` and execute `command`.
   */
  def ssh(host: String, command: String, logStdout: Boolean = true): String = {
    println("executing command - " + command + " on host: " + host)
    val outBuffer = new collection.mutable.ArrayBuffer[String]()
    val logger = ProcessLogger(line => outBuffer += line, println(_))

    val exitCode = 
      Seq("ssh", "-o", "StrictHostKeyChecking=no", "-p", "22", "-i", "/root/.ssh/id_rsa", s"ubuntu@$host", s"$command") ! logger
    if (logStdout) {
      outBuffer.foreach(println)
    }
    if (exitCode != 0) {
      println(s"FAILED: command - $command on host: $host")
      sys.error("Command failed")
    }
    println(s"SUCCESS: command - $command on host: $host")
    outBuffer.mkString("\n")
  }
  
  /** Distribute the public key on executors as `authorized_keys`. */
  protected def setupSSH(numExecutors: Int): Unit = {
    val key = publicKey
    sc.parallelize(0 until numExecutors, numExecutors).foreach { i =>
      addAuthorizedPublicKey(key)
    }
  }
  
  /** Generate new ssh keys if required. */
  protected def generateSshKeys(): Unit = {
    if ("ls /root/.ssh/id_rsa".! > 0) {
      Seq("ssh-keygen", "-t" , "rsa", "-N", "", "-f", "/root/.ssh/id_rsa").!!
    }
    addAuthorizedPublicKey(publicKey)
  }
}
Warning: classes defined within packages cannot be redefined without a cluster restart. Compilation successful.
package com.databricks.spark

import java.io.File
import java.nio.file.{Files, Path, Paths, StandardCopyOption}
import java.nio.file.attribute.PosixFilePermissions
import java.text.SimpleDateFormat
import java.util.{Date, Properties}

import org.spark_project.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient
import kafkashaded.org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}

import org.apache.spark.sql._

import sys.process._

/**
 * Class for managing Apache Kafka on a Databricks cluster. Currently supports one broker per EC2 instance.
 * Will install Zookeeper and Schema Registry on a single node, colocated with the first broker.
 * Please do not use this code in production.
 *
 * @param spark SparkSession that will allow us to run Spark jobs to distribute ssh public keys to executors
 * @param numKafkaNodes Number of Kafka brokers to install. Must be smaller than or equal to the number of executors
 * @param stopSparkOnKafkaNodes Whether to kill Spark executors on the instances we install Kafka for better isolation
 * @param kafkaVersion The version of Kafka to install
 */
class LocalKafka(
    spark: SparkSession,
    numKafkaNodes: Int = 1,
    stopSparkOnKafkaNodes: Boolean = false,
    kafkaVersion: String = "2.0.0") extends Serializable with SSHUtils {
  import spark.implicits._
  @transient val sc = spark.sparkContext
  
  import LocalKafka._
  
  private val workers: List[String] = {
    val executors = sc.getExecutorMemoryStatus.keys.map(_.split(":").head).map { ip =>
      if (ip.startsWith("ip")) ip.stripPrefix("ip-").split('.').head.replace("-", ".")
      else ip
    }.toSet
    println("Executors = " + executors)
    executors.toList
  }
  println(s"Workers = " + workers)
  
  private val numExecutors = workers.length
  
  require(numExecutors >= numKafkaNodes,
    s"""You don't have enough executors to maintain $numKafkaNodes Kafka brokers. Please increase your cluster size,
       |or decrease the amount of Kafka brokers. Available executors: $numExecutors.
     """.stripMargin)
  lazy val kafkaNodes = workers.take(numKafkaNodes)
  private lazy val zookeeper = workers(0)
  private lazy val schemaRegistry = workers(0)
  lazy val schemaRegistryAddress = "http://" + schemaRegistry + ":8081"
  lazy val schemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryAddress, 1000)
  lazy val kafkaNodesString = kafkaNodes.map(_ + ":9092").mkString(",")
  lazy val zookeeperAddress = zookeeper + ":2181"
  private val dbfsDir = "home/streaming/benchmark"
  
  def init(): Unit = {
    generateSshKeys()
    writeInstallFile(dbfsDir, kafkaVersion)
  }
  
  init()

  /**
   * Setup Kafka in cluster
   */
  def setup() = {
    setupSSH(numExecutors)

    workers.foreach { ip =>
      ssh(ip, s"bash /dbfs/$dbfsDir/install-kafka.sh")
    }

    ssh(zookeeper, s"kafka/bin/zookeeper-server-start.sh -daemon kafka/config/zookeeper.properties")

    kafkaNodes.zipWithIndex.foreach{ case (host, id) =>
      ssh(host, s"bash /dbfs/$dbfsDir/configure-kafka.sh $zookeeper $id $host")
      ssh(host, s"kafka/bin/kafka-server-start.sh -daemon kafka/config/server.properties")
    }

    Thread.sleep(30 * 1000) // wait for Kafka to come up
    ssh(schemaRegistry, "schema-registry/bin/schema-registry-start -daemon schema-registry/etc/schema-registry/schema-registry.properties")

    if (stopSparkOnKafkaNodes) {
      kafkaNodes.foreach { ip =>
        ssh(ip, s"sudo monit stop spark-slave")
      }
    }
  }

  /** Create a `topic` on Kafka with the given number of partitions and replication factor. */
  def createTopic(topic: String, partitions: Int = 8, replFactor: Int = 1): Unit = {
    ssh(kafkaNodes(0), s"kafka/bin/kafka-topics.sh --create --topic $topic --partitions $partitions " +
      s"--replication-factor $replFactor --zookeeper $zookeeper:2181 --config message.timestamp.type=LogAppendTime")
  }
  
  /** Delete the given topic if it exists. */
  def deleteTopicIfExists(topic: String): Unit = {
    try {
      ssh(kafkaNodes(0), s"kafka/bin/kafka-topics.sh --delete --topic $topic --zookeeper $zookeeper:2181")
    } catch {
      case e: RuntimeException =>
    }
  }
}

object LocalKafka extends Serializable {
  var cluster: LocalKafka = null
  def setup(spark: SparkSession, numKafkaNodes: Int = 1, stopSparkOnKafkaNodes: Boolean = false): LocalKafka = {
    if (cluster == null) {
      cluster = new LocalKafka(spark, numKafkaNodes, stopSparkOnKafkaNodes)
      cluster.setup()
    }
    cluster
  }
  
  private def writeFile(path: String, contents: String, append: Boolean = false): Unit = {
    val fw = new java.io.FileWriter(path, append)
    fw.write(contents)
    fw.close()
  }
  
  private def writeInstallFile(dbfsDir: String, kafkaVersion: String): Unit = {
    Seq("mkdir", "-p", s"/dbfs/$dbfsDir").!!
    writeFile(s"/dbfs/$dbfsDir/install-kafka.sh", getInstallKafkaScript(dbfsDir, kafkaVersion))
    writeFile(s"/dbfs/$dbfsDir/configure-kafka.sh", configureKafkaScript)
  }

  /** Script that can be run on executors to install Kafka. */
  private def getInstallKafkaScript(dbfsDir: String, kafkaVersion: String) = {
    s"""#!/bin/bash
       |set -e
       |
       |sudo chown ubuntu /home/ubuntu
       |
       |if [ ! -r "/dbfs/$dbfsDir/kafka-${kafkaVersion}.tgz" ]; then
       | wget -O /dbfs/$dbfsDir/kafka-${kafkaVersion}.tgz "https://archive.apache.org/dist/kafka/${kafkaVersion}/kafka_2.11-${kafkaVersion}.tgz"
       |fi
       |
       |if [ ! -r "/dbfs/$dbfsDir/schema-registry.tar.gz" ]; then
       | wget -O /dbfs/$dbfsDir/schema-registry.tar.gz "http://packages.confluent.io/archive/4.0/confluent-oss-4.0.0-2.11.tar.gz"
       |fi
       |
       |cd /home/ubuntu && mkdir -p kafka && cd kafka
       |tar -xvzf /dbfs/$dbfsDir/kafka-${kafkaVersion}.tgz --strip 1 1> /dev/null 2>&1
       |
       |cd /home/ubuntu && mkdir -p schema-registry && cd schema-registry
       |tar -xvzf /dbfs/$dbfsDir/schema-registry.tar.gz --strip 1 1> /dev/null 2>&1
     """.stripMargin
  }
  
  /** Default Kafka configuration file. */
  private val configureKafkaScript = 
    """#!/bin/bash
set -e

cd kafka
cat > config/server.properties <<EOL
############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=$2

# Disable replication so that we don't need many Kafka brokers
offsets.topic.num.partitions=1

offsets.topic.replication.factor=1

group.initial.rebalance.delay.ms=10

transaction.state.log.replication.factor=1

transaction.state.log.min.isr=1

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=$1:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000

# Switch to enable topic deletion or not, default value is false
delete.topic.enable=true

auto.create.topics.enable=false

############################# Socket Server Settings #############################

# The port the socket server listens on
port=9092

# Hostname the broker will bind to. If not set, the server will bind to all interfaces
host.name=$3

# Hostname the broker will advertise to producers and consumers. If not set, it uses the
# value for "host.name" if configured.  Otherwise, it will use the value returned from
# java.net.InetAddress.getCanonicalHostName().
advertised.host.name=$3

# The port to publish to ZooKeeper for clients to use. If this is not set,
# it will publish the same port that the broker binds to.
# advertised.port=9092

# The number of threads handling network requests
num.network.threads=3
 
# The number of threads doing disk I/O
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600

# Use kafka broker receive time in message timestamps, instead of creation time
log.message.timestamp.type=LogAppendTime

############################# Log Basics #############################

# A comma seperated list of directories under which to store log files
log.dirs=/tmp/kafka-logs

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=8

# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1


############################# Log Retention Policy #############################

# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.

# The minimum age of a log file to be eligible for deletion
log.retention.hours=168

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according 
# to the retention policies
log.retention.check.interval.ms=300000

# By default the log cleaner is disabled and the log retention policy will default to just delete segments after their retention expires.
# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.
log.cleaner.enable=false

EOL
    """
}
Warning: classes defined within packages cannot be redefined without a cluster restart. Compilation successful.