Big Query Sample Notebook (Scala)(Scala)

Loading...

Loading a Google BigQuery table into a DataFrame

import com.google.cloud.spark.bigquery.BigQueryDataFrameReader
import sqlContext.implicits._
import com.google.cloud.spark.bigquery.BigQueryDataFrameReader import sqlContext.implicits._
val table = "bigquery-public-data.samples.shakespeare"
 
// load data from BigQuery
val df = spark.read.format("bigquery").option("table", table).load()
df.show()
df.printSchema()
df.createOrReplaceTempView("words")
+---------+----------+-------+-----------+ | word|word_count| corpus|corpus_date| +---------+----------+-------+-----------+ | LVII| 1|sonnets| 0| | augurs| 1|sonnets| 0| | dimm'd| 1|sonnets| 0| | plagues| 1|sonnets| 0| | treason| 1|sonnets| 0| | surmise| 1|sonnets| 0| | heed| 1|sonnets| 0| |Unthrifty| 1|sonnets| 0| | quality| 1|sonnets| 0| | wherever| 1|sonnets| 0| | C| 1|sonnets| 0| | L| 1|sonnets| 0| |imaginary| 1|sonnets| 0| | H| 1|sonnets| 0| | relief| 1|sonnets| 0| | W| 1|sonnets| 0| | V| 1|sonnets| 0| | advised| 1|sonnets| 0| | grey| 1|sonnets| 0| | X| 1|sonnets| 0| +---------+----------+-------+-----------+ only showing top 20 rows root |-- word: string (nullable = false) |-- word_count: long (nullable = false) |-- corpus: string (nullable = false) |-- corpus_date: long (nullable = false) table: String = bigquery-public-data.samples.shakespeare df: org.apache.spark.sql.DataFrame = [word: string, word_count: bigint ... 2 more fields]

The code below shows how you can run a Spark SQL query against the DataFrame. Please note that this SQL query runs against the DataFrame in your Databricks cluster, not in BigQuery. The next example shows how you can run a query against BigQuery and load its result into a DataFrame for further processing.

// perform word count
val wordCountDf = spark.sql("SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word ORDER BY word_count DESC LIMIT 10")
 
display(wordCountDf)
 
word
word_count
1
2
3
4
5
6
7
8
9
10
the
25568
I
21028
and
19649
to
17361
of
16438
a
13409
you
12527
my
11291
in
10589
is
8735

Showing all 10 rows.

Loading the result of a BigQuery SQL query into a DataFrame

This example shows how you can run SQL against BigQuery and load the result into a DataFrame. This is useful when you want to reduce data transfer between BigQuery and Databricks and want to offload certain processing to BigQuery.

  • The BigQuery Query API is more expensive than the BigQuery Storage API.
  • The BigQuery Query API requires a Google Cloud Storage location to unload data into before reading it into Apache Spark
val table = "bigquery-public-data.samples.shakespeare"
val tempLocation = "databricks_testing"
 
// load the result of a SQL query on BigQuery into a DataFrame
val df = 
  spark.read.format("bigquery")
  .option("materializationDataset", tempLocation)
  .option("query", s"SELECT count(1) FROM `${table}`")
  .load()
  .collect()
 
display(df)
 
col_0
1
164656

Showing all 1 rows.

val df = spark.read.format("bigquery").option("table", "Nested.nested").load()
df: org.apache.spark.sql.DataFrame = [id: bigint, name: string ... 1 more field]

Write the contents of a DataFrame to a BigQuery table

This example shows how you can write the contents of a DataFrame to a BigQuery table. Please note that Spark needs to write the DataFrame to a temporary location (databricks_bucket1) first.

case class Employee(firstName: String, lastName: String, email: String, salary: Int)
 
// Create the Employees
val employee1 = new Employee("michael", "armbrust", "no-reply@berkeley.edu", 100000)
val employee2 = new Employee("xiangrui", "meng", "no-reply@stanford.edu", 120000)
val employee3 = new Employee("matei", "zaharia", "no-reply@waterloo.edu", 140000)
val employee4 = new Employee("patrick", "wendell", "no-reply@princeton.edu", 160000)
 
val df = Seq(employee1, employee2, employee3, employee4).toDF
 
display(df)
 
firstName
lastName
email
salary
1
2
3
4
michael
armbrust
no-reply@berkeley.edu
100000
xiangrui
meng
no-reply@stanford.edu
120000
matei
zaharia
no-reply@waterloo.edu
140000
patrick
wendell
no-reply@princeton.edu
160000

Showing all 4 rows.

// save to a BigQuery table named `Nested.employees`
df.write
  .format("bigquery")
  .mode("overwrite")
  .option("temporaryGcsBucket", "databricks-bigquery-temp")
  .option("table", "Nested.employees")
  .save()
// read the data we wrote to BigQuery back into a DataFrame to prove the write worked
 
val readDF = 
  spark.read
    .format("bigquery")
    .option("table", "Nested.employees")
    .load()
 
display(readDF)
 
firstName
lastName
email
salary
1
2
3
4
patrick
wendell
no-reply@princeton.edu
160000
xiangrui
meng
no-reply@stanford.edu
120000
matei
zaharia
no-reply@waterloo.edu
140000
michael
armbrust
no-reply@berkeley.edu
100000

Showing all 4 rows.

Advanced Features

Nested filter pushdown and nested column pruning

This data source supports advanced filter pushdown and column pruning to reduce the amount of data transfer from BigQuery to Databricks. For example, advanced filter pushdown processes certain filters on BigQuery instead of Databricks, which helps reduce the amount of data transferred.

In this example, the payload.pull_request.user.id > 500 predicate is evaluated in BigQuery. Also, the payload.pull_request.user.url column is selected in BigQuery.

val df = spark.read.format("bigquery")
  .option("table", "bigquery-public-data.samples.github_nested")
  .load()
  .where("payload.pull_request.user.id > 500 and repository.url='https://github.com/bitcoin/bitcoin'")
  .select("payload.pull_request.user.url")
  .distinct
  .as[String]
  .sort("payload.pull_request.user.url")
  .take(3)
df: Array[String] = Array(https://api.github.com/users/Diapolo, https://api.github.com/users/TheBlueMatt, https://api.github.com/users/ali1234)