Big Query Sample Notebook (Python)(Python)

Loading...

Loading a Google BigQuery table into a DataFrame

table = "bigquery-public-data.samples.shakespeare"
df = spark.read.format("bigquery").option("table",table).load()
df.show()
df.printSchema()
df.createOrReplaceTempView("words")
+---------+----------+-------+-----------+ | word|word_count| corpus|corpus_date| +---------+----------+-------+-----------+ | LVII| 1|sonnets| 0| | augurs| 1|sonnets| 0| | dimm'd| 1|sonnets| 0| | plagues| 1|sonnets| 0| | treason| 1|sonnets| 0| | surmise| 1|sonnets| 0| | heed| 1|sonnets| 0| |Unthrifty| 1|sonnets| 0| | quality| 1|sonnets| 0| | wherever| 1|sonnets| 0| | C| 1|sonnets| 0| | L| 1|sonnets| 0| |imaginary| 1|sonnets| 0| | H| 1|sonnets| 0| | relief| 1|sonnets| 0| | W| 1|sonnets| 0| | V| 1|sonnets| 0| | advised| 1|sonnets| 0| | grey| 1|sonnets| 0| | X| 1|sonnets| 0| +---------+----------+-------+-----------+ only showing top 20 rows root |-- word: string (nullable = false) |-- word_count: long (nullable = false) |-- corpus: string (nullable = false) |-- corpus_date: long (nullable = false)

The code below shows how you can run a Spark SQL query against the DataFrame. Please note that this SQL query runs against the DataFrame in your Databricks cluster, not in BigQuery. The next example shows how you can run a query against BigQuery and load its result into a DataFrame for further processing.

# perform word count
wordCountDf = spark.sql("SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word ORDER BY word_count DESC LIMIT 10")
 
display(wordCountDf)
 
word
word_count
1
2
3
4
5
6
7
8
9
10
the
25568
I
21028
and
19649
to
17361
of
16438
a
13409
you
12527
my
11291
in
10589
is
8735

Showing all 10 rows.

Loading the result of a BigQuery SQL query into a DataFrame

This example shows how you can run SQL against BigQuery and load the result into a DataFrame. This is useful when you want to reduce data transfer between BigQuery and Databricks and want to offload certain processing to BigQuery.

  • The BigQuery Query API is more expensive than the BigQuery Storage API.
  • The BigQuery Query API requires a Google Cloud Storage location to unload data into before reading it into Apache Spark
table = "bigquery-public-data.samples.shakespeare"
tempLocation = "databricks_testing"
query = "SELECT count(1) FROM {table}".format(table=table)
 
# load the result of a SQL query on BigQuery into a DataFrame
df = spark.read.format("bigquery") \
.option("materializationDataset", tempLocation) \
.option("query", query) \
.load() \
.collect()
 
display(df)
 
f0_
1
164656

Showing all 1 rows.

Write the contents of a DataFrame to a BigQuery table

This example shows how you can write the contents of a DataFrame to a BigQuery table. Please note that Spark needs to write the DataFrame to a temporary location (databricks_bucket1) first.

from pyspark.sql import *
 
Employee = Row("firstName", "lastName", "email", "salary")
 
employee1 = Employee("michael", "armbrust", "no-reply@berkeley.edu", 100000)
employee2 = Employee("xiangrui", "meng", "no-reply@stanford.edu", 120000)
employee3 = Employee("matei", "zaharia", "no-reply@waterloo.edu", 140000)
employee4 = Employee("patrick", "wendell", "no-reply@princeton.edu", 160000)
 
employees = [employee1, employee2, employee3, employee4]
df = spark.createDataFrame(employees)
 
display(df)
 
firstName
lastName
email
salary
1
2
3
4
michael
armbrust
no-reply@berkeley.edu
100000
xiangrui
meng
no-reply@stanford.edu
120000
matei
zaharia
no-reply@waterloo.edu
140000
patrick
wendell
no-reply@princeton.edu
160000

Showing all 4 rows.

# save to a BigQuery table named `Nested.employees`
df.write.format("bigquery") \
.mode("overwrite") \
.option("temporaryGcsBucket", "databricks-bigquery-temp") \
.option("table", "<project_name>.<dataset_name>.employees") \
.save()
# read the data we wrote to BigQuery back into a DataFrame to prove the write worked
 
readDF = spark.read.format("bigquery").option("table", "<project_name>.<dataset_name>.employees").load()
 
display(readDF)
 
firstName
lastName
email
salary
1
2
3
4
michael
armbrust
no-reply@berkeley.edu
100000
xiangrui
meng
no-reply@stanford.edu
120000
matei
zaharia
no-reply@waterloo.edu
140000
patrick
wendell
no-reply@princeton.edu
160000

Showing all 4 rows.

Advanced Features

Nested filter pushdown and nested column pruning

This data source supports advanced filter pushdown and column pruning to reduce the amount of data transfer from BigQuery to Databricks. For example, advanced filter pushdown processes certain filters on BigQuery instead of Databricks, which helps reduce the amount of data transferred.

In this example, the payload.pull_request.user.id > 500 predicate is evaluated in BigQuery. Also, the payload.pull_request.user.url column is selected in BigQuery.

df = spark.read.format("bigquery") \
.option("table", "bigquery-public-data.samples.github_nested") \
.load() \
.where("payload.pull_request.user.id > 500 and repository.url='https://github.com/bitcoin/bitcoin'") \
.select("payload.pull_request.user.url") \
.distinct() \
.sort("payload.pull_request.user.url") \
.take(3)
 
display(df)
 
url
1
2
3
https://api.github.com/users/Diapolo
https://api.github.com/users/TheBlueMatt
https://api.github.com/users/ali1234

Showing all 3 rows.

Array pushdown

This example shows how nested array predicates e.g. payload.pages[0].html_url = 'https://github.com/clarkmoody/bitcoin/wiki/Home' are pushed down to BigQuery

df = spark.read.format("bigquery") \
.option("table", "bigquery-public-data.samples.github_nested") \
.load() \
.where("payload.pages[0].html_url = 'https://github.com/clarkmoody/bitcoin/wiki/Home'") \
.select("payload.pages") \
.count() 
 
print(df)
1