val jobRunId = System.currentTimeMillis() //Job Identification dbutils.widgets.text("inputConfigJobName","Stage Orders Data Mart","Job Name") val jobId = dbutils.widgets.get("inputConfigJobName").replaceAll(" ", "") //Job Config File dbutils.widgets.text("inputConfigFile","s3a://prod-etl-config/StageOrdersDataMart.json","Job Config File") val jobConfigFileName = dbutils.widgets.get("inputConfigFile").trim //Parse the Filename from the Path val jobConfigFileNameSplit = jobConfigFileName.split("/") val jobConfigFileNameNoPath = jobConfigFileNameSplit(jobConfigFileNameSplit.size - 1)
dbutils.fs.cp(jobConfigFileName, s"file:/tmp/$jobId-$jobRunId/$jobConfigFileNameNoPath") //copy from S3 to local filesystem
import java.io.File import com.typesafe.config._ val jobConfigFile = new File(s"/tmp/$jobId-$jobRunId/$jobConfigFileNameNoPath") val jobConfigConfig = ConfigFactory.parseFile(jobConfigFile) //Check if Config is Valid; throw exception if not jobConfigConfig.checkValid(jobConfigConfig) //Check if the Config Job Name matches the Job Paramter Job Name val jobName = jobConfigConfig.getString("Name") if(jobName.replaceAll(" ", "") != jobId) { throw new Exception("Job Name in Config File Does Not Match Job Name Parameter") }
import java.util.HashMap import scala.collection.JavaConversions._ import com.databricks.solutions.etl.config._ var jobConfig = new JobConfig(jobName) val jobConnectionsIter = jobConfigConfig.getObjectList("Connections").iterator while(jobConnectionsIter.hasNext) { val jobConnConfig = jobConnectionsIter.next val jobConnName = jobConnConfig.get("Name").unwrapped.toString val jobConnType = jobConnConfig.get("Type").unwrapped.toString val jobConnPropsMap = jobConnConfig.get("Properties").unwrapped.asInstanceOf[HashMap[String,String]].toMap val jobConn = new Connection(jobConnName, jobConnType, jobConnPropsMap) jobConfig = jobConfig.withConnection(jobConn) }
//Define pattern to extract and load using connections def stageTable(srcTable:String, srcConnection:Connection, tgtConnection:Connection) = { val source = srcConnection.withReader(spark.read) val df = source.option("dbtable",srcTable).load val tgtPath = tgtConnection.Properties.get("path").get + "/" + srcTable tgtConnection.withWriter(df.write).mode("overwrite").option("path",tgtPath).save println(s"Table $srcTable staged to $tgtPath") } //Get named connections from JobConfig val connOrdersDb = jobConfig.getConnection("Orders Database").get val connOrdersMart = jobConfig.getConnection("Orders Datamart").get //Stage Tables from Source to Target stageTable("orders",connOrdersDb,connOrdersMart) stageTable("customer",connOrdersDb,connOrdersMart) stageTable("supplier",connOrdersDb,connOrdersMart)
Stage Orders Data Mart
Last refresh: Never