Couchbase Demo(Scala)

Loading...

Couchbase & Apache Spark

Before running this notebook, you must create and attach the Couchbase spark-connector library, version 3.2.0 or above. The Couchbase connector library is available for download from Maven Central. Databricks Runtimes that include Spark 3.2.0 require a Spark connector compiled against Scala 2.12. The examples shown in this notebook were validated against Databricks runtime 10.0.

Download and get information about the latest version of the Spark connector.

You must also configure the following parameters in the Spark config when you create a cluster:

spark.couchbase.password password

spark.couchbase.implicitBucket travel-sample

spark.couchbase.connectionString hostname

spark.couchbase.username username

spark.databricks.delta.preview.enabled true

import com.couchbase.spark._
import org.apache.spark.sql._
import com.couchbase.client.scala.json.JsonObject
import com.couchbase.spark.kv.Get
import com.couchbase.client.scala.kv.MutateInSpec
import com.couchbase.spark.kv.MutateIn
import com.couchbase.client.scala.kv.LookupInSpec
import com.couchbase.spark.kv.LookupIn
import com.couchbase.client.scala.query.QueryOptions
import com.couchbase.spark.query.QueryOptions
import com.couchbase.client.scala.analytics.AnalyticsOptions
 
import com.couchbase.spark._ import org.apache.spark.sql._ import com.couchbase.client.scala.json.JsonObject import com.couchbase.spark.kv.Get import com.couchbase.client.scala.kv.MutateInSpec import com.couchbase.spark.kv.MutateIn import com.couchbase.client.scala.kv.LookupInSpec import com.couchbase.spark.kv.LookupIn import com.couchbase.client.scala.query.QueryOptions import com.couchbase.spark.query.QueryOptions import com.couchbase.client.scala.analytics.AnalyticsOptions

Working With RDDs

sc
 .couchbaseGet(Seq(Get("airline_10"), Get("airline_10642")))
 .collect()
 .foreach(result => println(result.contentAs[JsonObject]))
Success({"country":"United States","iata":"Q5","name":"40-Mile Airline","callsign":"MILE-AIR","icao":"MLA","id":10,"type":"airline"}) Success({"country":"United Kingdom","iata":null,"name":"Jc royal.britannica","callsign":null,"icao":"JRB","id":10642,"type":"airline"})

Subdocument Lookup

sc
  .couchbaseLookupIn(Seq(LookupIn("airline_10642", Seq(LookupInSpec.get("name"))))) //in the doc find the spec
  .collect()
  .foreach(result => println(result))
LookupInResult(airline_10642,WrappedArray(SubdocField{status=SUCCESS, value="Jc royal.britannica", path='name'}),33554432,1638512593109778432,None,com.couchbase.client.scala.codec.JsonTranscoder@3ce3de55)

Raw N1QL Query

sc
  .couchbaseQuery[JsonObject]("select country, count(*) as count from `travel-sample` where type = 'airport' group by country order by count desc")
  .collect()
  .foreach(println)
{"country":"United States","count":1560} {"country":"France","count":221} {"country":"United Kingdom","count":187}

Analytics Query

val query = "SELECT ht.city,ht.state,COUNT(*) AS num_hotels FROM `travel-sample`.inventory.hotel ht GROUP BY ht.city,ht.state HAVING COUNT(*) > 30"
 
sc.couchbaseAnalyticsQuery[JsonObject](query).collect().foreach(println)
{"city":"Birmingham","num_hotels":36,"state":null} {"city":"London","num_hotels":67,"state":null} {"city":"Los Angeles","num_hotels":35,"state":"California"} {"city":"Paris","num_hotels":62,"state":"Île-de-France"} {"city":"San Diego","num_hotels":48,"state":"California"} {"city":"San Francisco","num_hotels":132,"state":"California"} query: String = SELECT ht.city,ht.state,COUNT(*) AS num_hotels FROM `travel-sample`.inventory.hotel ht GROUP BY ht.city,ht.state HAVING COUNT(*) > 30
val query = "SELECT ht.city,ht.state,COUNT(*) AS num_hotels FROM hotel ht GROUP BY ht.city,ht.state HAVING COUNT(*) > 30"
val options = AnalyticsOptions()
 
val result = sc.couchbaseAnalyticsQuery[JsonObject](query, options,keyspace = Keyspace(scope = Some("inventory")))
 
result.collect().foreach(println)
{"city":"Birmingham","num_hotels":36,"state":null} {"city":"London","num_hotels":67,"state":null} {"city":"Los Angeles","num_hotels":35,"state":"California"} {"city":"Paris","num_hotels":62,"state":"Île-de-France"} {"city":"San Diego","num_hotels":48,"state":"California"} {"city":"San Francisco","num_hotels":132,"state":"California"} query: String = SELECT ht.city,ht.state,COUNT(*) AS num_hotels FROM hotel ht GROUP BY ht.city,ht.state HAVING COUNT(*) > 30 options: com.couchbase.client.scala.analytics.AnalyticsOptions = AnalyticsOptions(None,None,None,None,false,None,None,None,None) result: org.apache.spark.rdd.RDD[com.couchbase.client.scala.json.JsonObject] = AnalyticsRDD[142] at RDD at AnalyticsRDD.scala:41

Spark SQL

Register DataFrames

val airlines = spark.read.format("couchbase.query")
  .option(QueryOptions.Filter, "type = 'airline'")
  .load()
airlines.createOrReplaceTempView("airlines")
 
val airports = spark.read.format("couchbase.query")
  .option(QueryOptions.Filter, "type = 'airport'")
  .load()
airports.createOrReplaceTempView("airports")
 
airlines: org.apache.spark.sql.DataFrame = [__META_ID: string, callsign: string ... 6 more fields] airports: org.apache.spark.sql.DataFrame = [__META_ID: string, airportname: string ... 8 more fields]

Look at a Schema

airlines.printSchema
root |-- __META_ID: string (nullable = true) |-- callsign: string (nullable = true) |-- country: string (nullable = true) |-- iata: string (nullable = true) |-- icao: string (nullable = true) |-- id: long (nullable = true) |-- name: string (nullable = true) |-- type: string (nullable = true)
airports.printSchema
root |-- __META_ID: string (nullable = true) |-- airportname: string (nullable = true) |-- city: string (nullable = true) |-- country: string (nullable = true) |-- faa: string (nullable = true) |-- geo: struct (nullable = true) | |-- alt: long (nullable = true) | |-- lat: double (nullable = true) | |-- lon: double (nullable = true) |-- icao: string (nullable = true) |-- id: long (nullable = true) |-- type: string (nullable = true) |-- tz: string (nullable = true)

Run Spark SQL on the temp spark view

%sql select * from airlines order by name asc limit 10
 
__META_ID
callsign
country
iata
icao
id
name
type
1
2
3
4
5
6
7
8
9
10
airline_10
MILE-AIR
United States
Q5
MLA
10
40-Mile Airline
airline
airline_665
FLIGHTVUE
United Kingdom
null
VUE
665
AD Aviation
airline
airline_315
AMTRAN
United States
null
AMT
315
ATA Airlines
airline
airline_792
CYCLONE
United States
ZA
CYD
792
Access Air
airline
airline_21
AIGLE AZUR
France
ZI
AAF
21
Aigle Azur
airline
airline_1191
REUNION
France
UU
REU
1191
Air Austral
airline
airline_139
AIRCALIN
France
SB
ACI
139
Air Caledonie International
airline
airline_567
FRENCH WEST
France
TX
FWI
567
Air Caraïbes
airline
airline_149
NIGHT CARGO
United States
2Q
SNC
149
Air Cargo Carriers
airline
airline_16881
Cudlua
United Kingdom
null
CUD
16881
Air Cudlua
airline

Showing all 10 rows.

Airlines grouped by Country

%sql select country, count(*) from airlines group by country;
United KingdomFranceUnited States21%11%68%countryUnited KingdomUnited KingdomFranceFranceUnited StatesUnited States

Airports By Country, Visualized with UDF

val countrymap = (s: String) => {
  s match {
    case "France" => "FRA"
    case "United States" => "USA"
    case "United Kingdom" => "GBR"
  }
}
spark.udf.register("countrymap", countrymap)
countrymap: String => String = $Lambda$7949/2096140635@1f788b88 res19: org.apache.spark.sql.expressions.UserDefinedFunction = SparkUserDefinedFunction($Lambda$7949/2096140635@1f788b88,StringType,List(Some(class[value[0]: string])),Some(class[value[0]: string]),Some(countrymap),true,true)
%sql select countrymap(country), count(*) from airports group by country;
+N/A0-500500-10001000-1500