Connect Azure Databricks to Cassandra(Scala)

Connecting Azure Databricks to Cassandra

Establish network paths

Options

  1. Peer the private vnet as per VNet Peering
    1. In the azure portal under the databricks workspace asset, choose peering blade
    2. Peer the VNet where your Cassandra vms are deployed (You don't need transit routing and such--just a vanilla IP space peering suffices)
    3. In the VNet where your Cassandra vms are deployed, peer the locked VNet where databricks is working
  2. Use VNet injection as per VNet Injection
    1. Follow steps for VNet injection
    2. Use json template provided to deploy workspace using your configured VNet
  3. Make Cassandra available over public IP paths
    • Likely will need to do ACL and Firewall work with heaving auditing

Verify network connectivity to the Cassandra node

  • Working with a single node Cassandra cluster with IP 172.16.1.4 for this example
%sh
echo "Success will show 5 packets sent with 0 packet loss"
ping -c 5 172.16.1.4
echo "Look for 0 packet loss"
echo "You should see: (UNKNOWN) [IP] PORT (?) open"
nc -nzv 172.16.1.4 9042
echo ""
Success will show 5 packets sent with 0 packet loss PING 172.16.1.4 (172.16.1.4): 56 data bytes 64 bytes from 172.16.1.4: icmp_seq=0 ttl=63 time=2.287 ms 64 bytes from 172.16.1.4: icmp_seq=1 ttl=63 time=0.927 ms 64 bytes from 172.16.1.4: icmp_seq=2 ttl=63 time=1.078 ms 64 bytes from 172.16.1.4: icmp_seq=3 ttl=63 time=1.110 ms 64 bytes from 172.16.1.4: icmp_seq=4 ttl=63 time=1.128 ms --- 172.16.1.4 ping statistics --- 5 packets transmitted, 5 packets received, 0% packet loss round-trip min/avg/max/stddev = 0.927/1.306/2.287/0.496 ms Look for 0 packet loss You should see: (UNKNOWN) [IP] PORT (?) open (UNKNOWN) [172.16.1.4] 9042 (?) open

Load the connector library

  • only needs to happen once per workspace unless you need different clusters on different versions
  1. In the workspace somewhere choose create->library (e.g., Go to Workspace->Shared->CassandraWork then hit the options inverted carot, then create->library)
  2. On the page that comes up
    1. Choose source: maven coordinates
    2. Hit the button to Search Maven Central...
    3. Search for "spark-cassandra-connector"
    4. Choose your version and hit "select" on the right side
    5. Click the "Create Library" button
  3. On the following screen, choose to attach to all clusters or only specific clusters

Create the init script

Do once or when there is a change to the Cassandra cluster IP(s).

val cassandraHostIP = "172.16.1.4"

// Create script folder
dbutils.fs.mkdirs("/databricks/scripts")
// Add init script that adds the Cassandra hostname to all worker nodes
dbutils.fs.put(s"/databricks/scripts/cassandra.sh",
  s"""
     #!/usr/bin/bash
     echo '[driver]."spark.cassandra.connection.host" = "$cassandraHostIP"' >> /home/ubuntu/databricks/common/conf/cassandra.conf
   """.trim, true)
Wrote 139 bytes. sparkClusterName: String = DavesCassandraCluster cassandraHostIP: String = 172.16.1.4 res2: Boolean = true
// Check to verify the init script file was created
dbutils.fs.head("/databricks/scripts/cassandra.sh")

res1: String = #!/usr/bin/bash echo '[driver]."spark.cassandra.connection.host" = "172.16.1.4"' >> /home/ubuntu/databricks/common/conf/cassandra.conf

Run the init script

  1. Configure the script for a cluster by referencing in the cluster Init Scripts tab.
  2. Restart the cluster to run the cluster init script.
// Redefining since this would run after a reset of cluster
val cassandraHostIP = "172.16.1.4"
//verify sparkconf is set properly--This will be used by the connector
spark.conf.get("spark.cassandra.connection.host")
res16: String = 172.16.1.4

Use Cassandra

For the next bits, create some tables for reading from and writing into

  • Create tables in Cassandra in test_keyspace by running cqlsh command
  1. For the read example

    CREATE table words_new (user TEXT, word TEXT, count INT, PRIMARY KEY (user, word));

    INSERT INTO words_new (user, word, count ) VALUES ( 'Russ', 'dino', 10 );

    INSERT INTO words_new (user, word, count ) VALUES ( 'Russ', 'fad', 5 );

    INSERT INTO words_new (user, word, count ) VALUES ( 'Sam', 'alpha', 3 );

    INSERT INTO words_new (user, word, count ) VALUES ( 'Zebra', 'zed', 100 );

  2. For the write example

    CREATE table employee_new (id TEXT, dep_id TEXT, age INT, PRIMARY KEY (id));

Read Cassandra

Dataframe Read the Cassandra table roles

// Start with a system table 
val df = spark
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "roles", "keyspace" -> "system_auth"))
  .load
df.explain
== Physical Plan == *(1) Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@1737cc8f [role#30,can_login#31,is_superuser#32,member_of#33,salted_hash#34] PushedFilters: [], ReadSchema: struct<role:string,can_login:boolean,is_superuser:boolean,member_of:array<string>,salted_hash:str... df: org.apache.spark.sql.DataFrame = [role: string, can_login: boolean ... 3 more fields]
display(df)
cassandratruetrue[]$2a$10$TObpqygwTmW99FPzUbgMDuQlBvPP/2EuvOczsw05pcbvoCLjiORmy
// Using the table you created above
val df = spark
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "words_new", "keyspace" -> "test_keyspace"))
  .load
df.explain
display(df)

Write Cassandra

// Create some sample data
import org.apache.spark.sql.functions._
val employee1 = spark.range(0, 3).select($"id".as("id"), (rand() * 3).cast("int").as("dep_id"), (rand() * 40 + 20).cast("int").as("age"))
// Insert into your col-store/table
import org.apache.spark.sql.cassandra._

employee1.write
  .format("org.apache.spark.sql.cassandra")
  .mode("overwrite")
  .options(Map( "table" -> "employee_new", "keyspace" -> "test_keyspace"))
  .save()