Parse a JSON string or Python dictionary example(Python)

Loading...

Method #1 - Scala Spark - Create a Spark DataFrame from a JSON string

%scala
import scala.collection.mutable.ListBuffer
val json_content1 = "{'json_col1': 'hello', 'json_col2': 32}"
val json_content2 = "{'json_col1': 'hello', 'json_col2': 'world'}"
 
var json_seq = new ListBuffer[String]()
json_seq += json_content1
json_seq += json_content2
 
val json_ds = json_seq.toDS() // Create a Spark dataset from the list.
import scala.collection.mutable.ListBuffer json_content1: String = {'json_col1': 'hello', 'json_col2': 32} json_content2: String = {'json_col1': 'hello', 'json_col2': 'world'} json_seq: scala.collection.mutable.ListBuffer[String] = ListBuffer({'json_col1': 'hello', 'json_col2': 32}, {'json_col1': 'hello', 'json_col2': 'world'}) json_ds: org.apache.spark.sql.Dataset[String] = [value: string]
%scala
val df = spark.read.json(json_ds) // Use `spark.read.json` to parse the Spark dataset.
display(df)
 
json_col1
json_col2
1
2
hello
32
hello
world

Showing all 2 rows.

Method #1 - PySpark - Create a Spark DataFrame from a JSON string

%py
json_content1 = "{'json_col1': 'hello', 'json_col2': 32}"
json_content2 = "{'json_col1': 'hello', 'json_col2': 'world'}"
 
json_list = []
json_list.append(json_content1)
json_list.append(json_content2)
print(json_list)
["{'json_col1': 'hello', 'json_col2': 32}", "{'json_col1': 'hello', 'json_col2': 'world'}"]
%py
df = spark.read.json(sc.parallelize(json_list))
display(df)
 
json_col1
json_col2
1
2
hello
32
hello
world

Showing all 2 rows.

Method #2 - Scala Spark - Extract a string column with JSON data from a DataFrame and parse it

%scala
import org.apache.spark.sql.functions._
 
val test_df = Seq(
  ("1", "{'json_col1': 'hello', 'json_col2': 32}", "1.0"),("1", "{'json_col1': 'hello', 'json_col2': 'world'}", "1.0")
).toDF("row_number", "json_data", "token")
 
val row_rdd = test_df.select(col("json_data")).rdd  // Selecting just the JSON column and converting it to RDD.
val string_rdd = row_rdd.map(_.mkString(","))       // Convert `RDD[Row]` to `RDD[String]`.
import org.apache.spark.sql.functions._ test_df: org.apache.spark.sql.DataFrame = [row_number: string, json_data: string ... 1 more field] row_rdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[235] at rdd at command-1945001:7 string_rdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[236] at map at command-1945001:8
%scala
val df1= spark.read.json(string_rdd)
display(df1)
 
json_col1
json_col2
1
2
hello
32
hello
world

Showing all 2 rows.

Method #3 - PySpark - Create a Spark DataFrame from a Python directory

jsonDataDict = {"job_id":33100,"run_id":1048560,"number_in_job":1,"state":{"life_cycle_state":"PENDING","state_message":"Waiting for cluster"},"task":{"notebook_task":{"notebook_path":"/Users/user@databricks.com/path/test_notebook"}},"cluster_spec":{"new_cluster":{"spark_version":"4.3.x-scala2.11","attributes":{"type":"fixed_node","memory":"8g"},"enable_elastic_disk":"false","num_workers":1}},"cluster_instance":{"cluster_id":"0000-000000-wares10"},"start_time":1584689872601,"setup_duration":0,"execution_duration":0,"cleanup_duration":0,"creator_user_name":"user@databricks.com","run_name":"my test job","run_page_url":"https://testurl.databricks.com#job/33100/run/1","run_type":"SUBMIT_RUN"}
 
type(jsonDataDict)
Out[3]: dict
import json
jsonData = json.dumps(jsonDataDict)
 
jsonDataList = []
jsonDataList.append(jsonData)
 
jsonRDD = sc.parallelize(jsonDataList)
df = spark.read.json(jsonRDD)
 
#to view the schema use
df.printSchema()
root |-- cleanup_duration: long (nullable = true) |-- cluster_instance: struct (nullable = true) | |-- cluster_id: string (nullable = true) |-- cluster_spec: struct (nullable = true) | |-- new_cluster: struct (nullable = true) | | |-- attributes: struct (nullable = true) | | | |-- memory: string (nullable = true) | | | |-- type: string (nullable = true) | | |-- enable_elastic_disk: string (nullable = true) | | |-- num_workers: long (nullable = true) | | |-- spark_version: string (nullable = true) |-- creator_user_name: string (nullable = true) |-- execution_duration: long (nullable = true) |-- job_id: long (nullable = true) |-- number_in_job: long (nullable = true) |-- run_id: long (nullable = true) |-- run_name: string (nullable = true) |-- run_page_url: string (nullable = true) |-- run_type: string (nullable = true) |-- setup_duration: long (nullable = true) |-- start_time: long (nullable = true) |-- state: struct (nullable = true) | |-- life_cycle_state: string (nullable = true) | |-- state_message: string (nullable = true) |-- task: struct (nullable = true) | |-- notebook_task: struct (nullable = true) | | |-- notebook_path: string (nullable = true)
display(df)
 
cleanup_duration
cluster_instance
cluster_spec
creator_user_name
execution_duration
job_id
number_in_job
run_id
run_name
run_page_url
run_type
setup_duration
start_time
state
task
1
0
{"cluster_id": "0000-000000-wares10"}
{"new_cluster": {"attributes": {"memory": "8g", "type": "fixed_node"}, "enable_elastic_disk": "false", "num_workers": 1, "spark_version": "4.3.x-scala2.11"}}
user@databricks.com
0
33100
1
1048560
my test job
https://testurl.databricks.com#job/33100/run/1
SUBMIT_RUN
0
1584689872601
{"life_cycle_state": "PENDING", "state_message": "Waiting for cluster"}
{"notebook_task": {"notebook_path": "/Users/user@databricks.com/path/test_notebook"}}

Showing all 1 rows.