Import from Cassandra(Scala)

Loading...

Cassandra Setup

  1. Make sure the security group allows Databricks to read the port 9042.
  2. Create a Maven library with the latest version of spark-cassandra-connector_xx, such as com.datastax.spark:spark-cassandra-connector_2.12:3.2.0.
  3. Install the library in a cluster.
%sh
// try to ping the cassandra node to make sure connectivity is working
ping -c 2 ec2-54-205-226-21.compute-1.amazonaws.com
// define the cluster name and cassandra host name
val sparkClusterName = "test1"
val cassandraHostIP = "ec2-54-205-226-21.compute-1.amazonaws.com"
dbutils.fs.rm("/databricks/init/$sparkClusterName/cassandra.sh")
//adding the hostname to all worker nodes via init script
dbutils.fs.put(s"/databricks/init/$sparkClusterName/cassandra.sh",
  s"""
     #!/usr/bin/bash
     echo '[driver]."spark.cassandra.connection.host" = "$cassandraHostIP"' >> /home/ubuntu/databricks/common/conf/cassandra.conf
   """.trim, true)
//verify sparkconf is set properly
spark.getConf.get("spark.cassandra.connection.host")
//Create a table in Cassandra in any keyspace by running cqlsh command
// CREATE table words_new (
//     user  TEXT, 
//     word  TEXT, 
//     count INT, 
//     PRIMARY KEY (user, word));
 
 
// INSERT INTO words_new (user, word, count ) VALUES ( 'Russ', 'dino', 10 );
// INSERT INTO words_new (user, word, count ) VALUES ( 'Russ', 'fad', 5 );
// INSERT INTO words_new (user, word, count ) VALUES ( 'Sam', 'alpha', 3 );
// INSERT INTO words_new (user, word, count ) VALUES ( 'Zebra', 'zed', 100 );

Read Cassandra

Option1 (Dataframe Read the cassandra table words_new)
val df = spark
  .read
  .format("org.apache.spark.sql.cassandra")
  .options(Map( "table" -> "words_new", "keyspace" -> "test_keyspace"))
  .load
df.explain
display(df)
Option 2(Dataframe Read table Using Helper Function)
val df2 = spark
  .read
  .cassandraFormat("words_new", "test_keyspace", "Test Cluster")
  .load()
display(df2)
df2.write.format("delta").saveAsTable("delta_cassandra_ingest")

Write to Cassandra

//Create a table in Cassandra before writing to it
// CREATE table employee_new (
//     id  TEXT, 
//     dep_id  TEXT, 
//     age INT, 
//     PRIMARY KEY (id));
//Create a sample dataframe that is going to be written in Cassandra table
import org.apache.spark.sql.functions._
val employee1 = spark.range(0, 3).select($"id".as("id"), (rand() * 3).cast("int").as("dep_id"), (rand() * 40 + 20).cast("int").as("age"))
//Writing the dataframe directly to cassandra
import org.apache.spark.sql.cassandra._
 
 
employee1.write
  .format("org.apache.spark.sql.cassandra")
  .mode("overwrite")
  .options(Map( "table" -> "employee_new", "keyspace" -> "test_keyspace"))
  .save()