ETL Job(Scala)

Stage Orders Data Mart

  1. Ingest data from JDBC Database
    • Decrypt credentials from config file
  2. Write data to Data Mart (Parquet on S3)
val jobRunId = System.currentTimeMillis()

//Job Identification
dbutils.widgets.text("inputConfigJobName","Stage Orders Data Mart","Job Name")
val jobId = dbutils.widgets.get("inputConfigJobName").replaceAll(" ", "")

//Job Config File
dbutils.widgets.text("inputConfigFile","s3a://prod-etl-config/StageOrdersDataMart.json","Job Config File")
val jobConfigFileName = dbutils.widgets.get("inputConfigFile").trim

//Parse the Filename from the Path
val jobConfigFileNameSplit = jobConfigFileName.split("/")
val jobConfigFileNameNoPath = jobConfigFileNameSplit(jobConfigFileNameSplit.size - 1)
%run ./src/JobConfig
dbutils.fs.cp(jobConfigFileName, s"file:/tmp/$jobId-$jobRunId/$jobConfigFileNameNoPath")  //copy from S3 to local filesystem
import java.io.File
import com.typesafe.config._

val jobConfigFile = new File(s"/tmp/$jobId-$jobRunId/$jobConfigFileNameNoPath")
val jobConfigConfig = ConfigFactory.parseFile(jobConfigFile)

//Check if Config is Valid; throw exception if not
jobConfigConfig.checkValid(jobConfigConfig)

//Check if the Config Job Name matches the Job Paramter Job Name
val jobName = jobConfigConfig.getString("Name")
if(jobName.replaceAll(" ", "") != jobId) {
  throw new Exception("Job Name in Config File Does Not Match Job Name Parameter")
}
import java.util.HashMap
import scala.collection.JavaConversions._
import com.databricks.solutions.etl.config._

var jobConfig = new JobConfig(jobName)

val jobConnectionsIter = jobConfigConfig.getObjectList("Connections").iterator

while(jobConnectionsIter.hasNext) {
  val jobConnConfig = jobConnectionsIter.next
  val jobConnName = jobConnConfig.get("Name").unwrapped.toString
  val jobConnType = jobConnConfig.get("Type").unwrapped.toString
  val jobConnPropsMap = jobConnConfig.get("Properties").unwrapped.asInstanceOf[HashMap[String,String]].toMap
  val jobConn = new Connection(jobConnName, jobConnType, jobConnPropsMap)
  jobConfig = jobConfig.withConnection(jobConn)
}
//Define pattern to extract and load using connections
def stageTable(srcTable:String, srcConnection:Connection, tgtConnection:Connection) = {
  val source = srcConnection.withReader(spark.read)
  val df = source.option("dbtable",srcTable).load  
  val tgtPath = tgtConnection.Properties.get("path").get + "/" + srcTable
  tgtConnection.withWriter(df.write).mode("overwrite").option("path",tgtPath).save
  println(s"Table $srcTable staged to $tgtPath")
}

//Get named connections from JobConfig
val connOrdersDb = jobConfig.getConnection("Orders Database").get
val connOrdersMart = jobConfig.getConnection("Orders Datamart").get

//Stage Tables from Source to Target
stageTable("orders",connOrdersDb,connOrdersMart)
stageTable("customer",connOrdersDb,connOrdersMart)
stageTable("supplier",connOrdersDb,connOrdersMart)