import com.couchbase.spark._
import org.apache.spark.sql._
import com.couchbase.client.scala.json.JsonObject
import com.couchbase.spark.kv.Get
import com.couchbase.client.scala.kv.MutateInSpec
import com.couchbase.spark.kv.MutateIn
import com.couchbase.client.scala.kv.LookupInSpec
import com.couchbase.spark.kv.LookupIn
import com.couchbase.client.scala.query.QueryOptions
import com.couchbase.spark.query.QueryOptions
import com.couchbase.client.scala.analytics.AnalyticsOptions
import com.couchbase.spark._
import org.apache.spark.sql._
import com.couchbase.client.scala.json.JsonObject
import com.couchbase.spark.kv.Get
import com.couchbase.client.scala.kv.MutateInSpec
import com.couchbase.spark.kv.MutateIn
import com.couchbase.client.scala.kv.LookupInSpec
import com.couchbase.spark.kv.LookupIn
import com.couchbase.client.scala.query.QueryOptions
import com.couchbase.spark.query.QueryOptions
import com.couchbase.client.scala.analytics.AnalyticsOptions
sc
.couchbaseGet(Seq(Get("airline_10"), Get("airline_10642")))
.collect()
.foreach(result => println(result.contentAs[JsonObject]))
Success({"country":"United States","iata":"Q5","name":"40-Mile Airline","callsign":"MILE-AIR","icao":"MLA","id":10,"type":"airline"})
Success({"country":"United Kingdom","iata":null,"name":"Jc royal.britannica","callsign":null,"icao":"JRB","id":10642,"type":"airline"})
sc
.couchbaseLookupIn(Seq(LookupIn("airline_10642", Seq(LookupInSpec.get("name"))))) //in the doc find the spec
.collect()
.foreach(result => println(result))
LookupInResult(airline_10642,WrappedArray(SubdocField{status=SUCCESS, value="Jc royal.britannica", path='name'}),33554432,1638512593109778432,None,com.couchbase.client.scala.codec.JsonTranscoder@3ce3de55)
val query = "SELECT ht.city,ht.state,COUNT(*) AS num_hotels FROM `travel-sample`.inventory.hotel ht GROUP BY ht.city,ht.state HAVING COUNT(*) > 30"
sc.couchbaseAnalyticsQuery[JsonObject](query).collect().foreach(println)
{"city":"Birmingham","num_hotels":36,"state":null}
{"city":"London","num_hotels":67,"state":null}
{"city":"Los Angeles","num_hotels":35,"state":"California"}
{"city":"Paris","num_hotels":62,"state":"Île-de-France"}
{"city":"San Diego","num_hotels":48,"state":"California"}
{"city":"San Francisco","num_hotels":132,"state":"California"}
query: String = SELECT ht.city,ht.state,COUNT(*) AS num_hotels FROM `travel-sample`.inventory.hotel ht GROUP BY ht.city,ht.state HAVING COUNT(*) > 30
val query = "SELECT ht.city,ht.state,COUNT(*) AS num_hotels FROM hotel ht GROUP BY ht.city,ht.state HAVING COUNT(*) > 30"
val options = AnalyticsOptions()
val result = sc.couchbaseAnalyticsQuery[JsonObject](query, options,keyspace = Keyspace(scope = Some("inventory")))
result.collect().foreach(println)
{"city":"Birmingham","num_hotels":36,"state":null}
{"city":"London","num_hotels":67,"state":null}
{"city":"Los Angeles","num_hotels":35,"state":"California"}
{"city":"Paris","num_hotels":62,"state":"Île-de-France"}
{"city":"San Diego","num_hotels":48,"state":"California"}
{"city":"San Francisco","num_hotels":132,"state":"California"}
query: String = SELECT ht.city,ht.state,COUNT(*) AS num_hotels FROM hotel ht GROUP BY ht.city,ht.state HAVING COUNT(*) > 30
options: com.couchbase.client.scala.analytics.AnalyticsOptions = AnalyticsOptions(None,None,None,None,false,None,None,None,None)
result: org.apache.spark.rdd.RDD[com.couchbase.client.scala.json.JsonObject] = AnalyticsRDD[142] at RDD at AnalyticsRDD.scala:41
val airlines = spark.read.format("couchbase.query")
.option(QueryOptions.Filter, "type = 'airline'")
.load()
airlines.createOrReplaceTempView("airlines")
val airports = spark.read.format("couchbase.query")
.option(QueryOptions.Filter, "type = 'airport'")
.load()
airports.createOrReplaceTempView("airports")
airlines: org.apache.spark.sql.DataFrame = [__META_ID: string, callsign: string ... 6 more fields]
airports: org.apache.spark.sql.DataFrame = [__META_ID: string, airportname: string ... 8 more fields]
airlines.printSchema
root
|-- __META_ID: string (nullable = true)
|-- callsign: string (nullable = true)
|-- country: string (nullable = true)
|-- iata: string (nullable = true)
|-- icao: string (nullable = true)
|-- id: long (nullable = true)
|-- name: string (nullable = true)
|-- type: string (nullable = true)
airports.printSchema
root
|-- __META_ID: string (nullable = true)
|-- airportname: string (nullable = true)
|-- city: string (nullable = true)
|-- country: string (nullable = true)
|-- faa: string (nullable = true)
|-- geo: struct (nullable = true)
| |-- alt: long (nullable = true)
| |-- lat: double (nullable = true)
| |-- lon: double (nullable = true)
|-- icao: string (nullable = true)
|-- id: long (nullable = true)
|-- type: string (nullable = true)
|-- tz: string (nullable = true)
val countrymap = (s: String) => {
s match {
case "France" => "FRA"
case "United States" => "USA"
case "United Kingdom" => "GBR"
}
}
spark.udf.register("countrymap", countrymap)
countrymap: String => String = $Lambda$7949/2096140635@1f788b88
res19: org.apache.spark.sql.expressions.UserDefinedFunction = SparkUserDefinedFunction($Lambda$7949/2096140635@1f788b88,StringType,List(Some(class[value[0]: string])),Some(class[value[0]: string]),Some(countrymap),true,true)
Couchbase & Apache Spark
Before running this notebook, you must create and attach the Couchbase
spark-connector
library, version 3.2.0 or above. The Couchbase connector library is available for download from Maven Central. Databricks Runtimes that include Spark 3.2.0 require a Spark connector compiled against Scala 2.12. The examples shown in this notebook were validated against Databricks runtime 10.0.Download and get information about the latest version of the Spark connector.
You must also configure the following parameters in the Spark config when you create a cluster:
spark.couchbase.password password
spark.couchbase.implicitBucket travel-sample
spark.couchbase.connectionString hostname
spark.couchbase.username username
spark.databricks.delta.preview.enabled true