Set Up Embedded Metastore(Scala)

1. Check if a cluster scoped init script exists with the same name

dbutils.fs.head("dbfs:/databricks/init_scripts/set_spark_embedded_metastore.sh")
java.io.FileNotFoundException: dbfs:/databricks/init/set_spark_embedded_metastore.sh

2. If not, set up the cluster scoped init script

val configStrs = """
"spark.sql.catalogImplementation" = "hive",
"spark.hadoop.javax.jdo.option.ConnectionDriverName" = "org.apache.derby.jdbc.EmbeddedDriver",
"spark.hadoop.javax.jdo.option.ConnectionURL" = "jdbc:derby:memory:myInMemDB;create=true",
"spark.hadoop.javax.jdo.option.ConnectionUserName" = "hiveuser",
"spark.hadoop.javax.jdo.option.ConnectionPassword" = "hivepass",
"spark.hadoop.datanucleus.autoCreateSchema" = "true",
"spark.hadoop.datanucleus.autoCreateTables" = "true",
"spark.hadoop.datanucleus.fixedDatastore" = "false",
"""

val initScriptContent = s"""
  |#!/bin/bash
  |
  |cat << 'EOF' > /databricks/driver/conf/00-custom-spark-driver-embedded-metastore.conf
  |[driver] {
  |$configStrs
  |}
  |EOF
  |cat > /databricks/common/conf/disable-metastore-monitor.conf << EOL
  |{
  |  databricks.daemon.driver.enableMetastoreMonitor = false,
  |  databricks.daemon.driver.enableMetastoreHealthCheck = false,
  |}
  |EOL
""".stripMargin


dbutils.fs.put("dbfs:/databricks/init_scripts/set_spark_embedded_metastore.sh",initScriptContent, true)
Wrote 838 bytes. configStrs: String = " "spark.sql.catalogImplementation" = "hive", "spark.hadoop.javax.jdo.option.ConnectionDriverName" = "org.apache.derby.jdbc.EmbeddedDriver", "spark.hadoop.javax.jdo.option.ConnectionURL" = "jdbc:derby:memory:myInMemDB;create=true", "spark.hadoop.javax.jdo.option.ConnectionUserName" = "hiveuser", "spark.hadoop.javax.jdo.option.ConnectionPassword" = "hivepass", "spark.hadoop.datanucleus.autoCreateSchema" = "true", "spark.hadoop.datanucleus.autoCreateTables" = "true", "spark.hadoop.datanucleus.fixedDatastore" = "false", " initScriptContent: String = " #!/bin/bash cat << 'EOF' > /databricks/driver/conf/00-custom-spark-driver-embedded-metastore.conf [driver] { "spark.sql.catalogImplementation" = "hive", "spark.hadoop.javax.jdo.option.ConnectionDriverName" = "org.apache.derby.jdbc.EmbeddedDriver", "spark.hadoop.javax.jdo.option.ConnectionURL" = "jdbc:derby:memory:myInMemDB;create=true", "spark.hadoop.javax.jdo.option.ConnectionUserName" = "hiveuser", "spark.hadoop.javax.jdo.option.ConnectionPassword" = "hivepass", "spark.hadoop.datanucleus.autoCreateSchema" = "true", "spark.hadoop.datanucleus.autoCreateTables" = "true", "spark.hadoop.datanucleus.fixedDatastore" = "false", } EOF cat > /databricks/common/conf/disable-metastore-monitor.conf << EOL { databricks.daemon.driver.enableMetastoreMonitor = false, databricks.daemon.driver.enableMetastoreHealthCheck = false, } EOL " res0: Boolean = true

3. Verify that the init script is set up

dbutils.fs.head("dbfs:/databricks/init_scripts/set_spark_embedded_metastore.sh")
res2: String = " #!/bin/bash cat << 'EOF' > /databricks/driver/conf/00-custom-spark-driver-embedded-metastore.conf [driver] { "spark.sql.catalogImplementation" = "hive", "spark.hadoop.javax.jdo.option.ConnectionDriverName" = "org.apache.derby.jdbc.EmbeddedDriver", "spark.hadoop.javax.jdo.option.ConnectionURL" = "jdbc:derby:memory:myInMemDB;create=true", "spark.hadoop.javax.jdo.option.ConnectionUserName" = "hiveuser", "spark.hadoop.javax.jdo.option.ConnectionPassword" = "hivepass", "spark.hadoop.datanucleus.autoCreateSchema" = "true", "spark.hadoop.datanucleus.autoCreateTables" = "true", "spark.hadoop.datanucleus.fixedDatastore" = "false", } EOF "

4. In the Clusters UI, attach the init script to the cluster

In the Clusters UI, select the cluster to use and click Edit. Go to Configuration > Advanced Options > Init Scripts. In Init Script Path, enter the DBFS path and filename of the script, and click Add. Start the cluster. After the cluster starts up, attach the cluster to this notebook

5. Verify if the configuration is picked up by the cluster

sc.getConf.getAll.toSeq.filter(x => x._1.contains("spark.sql.catalogImplementation") || x._1.contains("spark.hadoop.javax") || x._1.contains("datanucleus")).foreach(println)


/**

In theory, it should print configs that we specified:

(spark.hadoop.datanucleus.fixedDatastore,false)
(spark.hadoop.javax.jdo.option.ConnectionDriverName,org.apache.derby.jdbc.EmbeddedDriver)
(spark.hadoop.javax.jdo.option.ConnectionPassword,hivepass)
(spark.hadoop.datanucleus.autoCreateTables,true)
(spark.hadoop.javax.jdo.option.ConnectionURL,jdbc:derby:memory:myInMemDB;create=true)
(spark.sql.catalogImplementation,hive)
(spark.hadoop.datanucleus.autoCreateSchema,true)
(spark.hadoop.javax.jdo.option.ConnectionUserName,hiveuser)

*/
(spark.hadoop.datanucleus.fixedDatastore,false) (spark.hadoop.javax.jdo.option.ConnectionDriverName,org.apache.derby.jdbc.EmbeddedDriver) (spark.hadoop.javax.jdo.option.ConnectionPassword,hivepass) (spark.hadoop.datanucleus.autoCreateTables,true) (spark.hadoop.javax.jdo.option.ConnectionURL,jdbc:derby:memory:myInMemDB;create=true) (spark.sql.catalogImplementation,hive) (spark.hadoop.datanucleus.autoCreateSchema,true) (spark.hadoop.javax.jdo.option.ConnectionUserName,hiveuser)

6. Run a SQL statement to see if it works

Derby is installed on a single cluster, so if the cluster was just created, the SQL statement should show that there are no tables.

%sql

SHOW TABLES;
OK

7. Create a table and try again

%sql 

CREATE TABLE x (a INT, b TINYINT NOT NULL);
OK
%sql

SHOW TABLES;
defaultxfalse

Troubleshooting

How can I verify that the cluster is connecting to the embedded metastore, and not to the global metastore (provided by Databricks)?

Run the following script. The grep command should return no matches.

%sh

netstat -ant | grep 3306

How can I identify the port?

import java.util.concurrent.{CountDownLatch, Executors, TimeUnit}
import java.sql.Timestamp
import java.util.Date
import java.util.concurrent.atomic.AtomicInteger

import scala.collection.parallel.ForkJoinTaskSupport
import scala.sys.process._

import org.apache.spark.sql.hive.HiveExternalCatalog
import org.apache.spark.sql.hive.HiveUtils
import org.apache.spark.sql.hive.client.PoolingHiveClient
import org.apache.spark.sql.catalyst.catalog.CatalogTablePartition
import org.apache.spark.util.{SerializableConfiguration, ThreadUtils, Utils}

import com.databricks.backend.common.util.Project
import com.databricks.backend.daemon.driver.DriverConf
import com.databricks.conf.trusted.ProjectConf
import com.databricks.sql.DatabricksStaticSQLConf

// Get the port of the metastore RDS connection from Databricks conf:
val dbConf = new DriverConf(ProjectConf.loadLocalConfig(Project.Driver))
val port = dbConf.internalMetastorePort

import java.util.concurrent.{CountDownLatch, Executors, TimeUnit} import java.sql.Timestamp import java.util.Date import java.util.concurrent.atomic.AtomicInteger import scala.collection.parallel.ForkJoinTaskSupport import scala.sys.process._ import org.apache.spark.sql.hive.HiveExternalCatalog import org.apache.spark.sql.hive.HiveUtils import org.apache.spark.sql.hive.client.PoolingHiveClient import org.apache.spark.sql.catalyst.catalog.CatalogTablePartition import org.apache.spark.util.{SerializableConfiguration, ThreadUtils, Utils} import com.databricks.backend.common.util.Project import com.databricks.backend.daemon.driver.DriverConf import com.databricks.conf.trusted.ProjectConf import com.databricks.sql.DatabricksStaticSQLConf dbConf: com.databricks.backend.daemon.driver.DriverConf = com.databricks.backend.daemon.driver.DriverConf(driver) port: Int = 3306