%scala
import scala.collection.mutable.ListBuffer
val json_content1 = "{'json_col1': 'hello', 'json_col2': 32}"
val json_content2 = "{'json_col1': 'hello', 'json_col2': 'world'}"
var json_seq = new ListBuffer[String]()
json_seq += json_content1
json_seq += json_content2
val json_ds = json_seq.toDS() // Create a Spark dataset from the list.
import scala.collection.mutable.ListBuffer
json_content1: String = {'json_col1': 'hello', 'json_col2': 32}
json_content2: String = {'json_col1': 'hello', 'json_col2': 'world'}
json_seq: scala.collection.mutable.ListBuffer[String] = ListBuffer({'json_col1': 'hello', 'json_col2': 32}, {'json_col1': 'hello', 'json_col2': 'world'})
json_ds: org.apache.spark.sql.Dataset[String] = [value: string]
%py
json_content1 = "{'json_col1': 'hello', 'json_col2': 32}"
json_content2 = "{'json_col1': 'hello', 'json_col2': 'world'}"
json_list = []
json_list.append(json_content1)
json_list.append(json_content2)
print(json_list)
["{'json_col1': 'hello', 'json_col2': 32}", "{'json_col1': 'hello', 'json_col2': 'world'}"]
%scala
import org.apache.spark.sql.functions._
val test_df = Seq(
("1", "{'json_col1': 'hello', 'json_col2': 32}", "1.0"),("1", "{'json_col1': 'hello', 'json_col2': 'world'}", "1.0")
).toDF("row_number", "json_data", "token")
val row_rdd = test_df.select(col("json_data")).rdd // Selecting just the JSON column and converting it to RDD.
val string_rdd = row_rdd.map(_.mkString(",")) // Convert `RDD[Row]` to `RDD[String]`.
import org.apache.spark.sql.functions._
test_df: org.apache.spark.sql.DataFrame = [row_number: string, json_data: string ... 1 more field]
row_rdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[235] at rdd at command-1945001:7
string_rdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[236] at map at command-1945001:8
jsonDataDict = {"job_id":33100,"run_id":1048560,"number_in_job":1,"state":{"life_cycle_state":"PENDING","state_message":"Waiting for cluster"},"task":{"notebook_task":{"notebook_path":"/Users/user@databricks.com/path/test_notebook"}},"cluster_spec":{"new_cluster":{"spark_version":"4.3.x-scala2.11","attributes":{"type":"fixed_node","memory":"8g"},"enable_elastic_disk":"false","num_workers":1}},"cluster_instance":{"cluster_id":"0000-000000-wares10"},"start_time":1584689872601,"setup_duration":0,"execution_duration":0,"cleanup_duration":0,"creator_user_name":"user@databricks.com","run_name":"my test job","run_page_url":"https://testurl.databricks.com#job/33100/run/1","run_type":"SUBMIT_RUN"}
type(jsonDataDict)
Out[3]: dict
import json
jsonData = json.dumps(jsonDataDict)
jsonDataList = []
jsonDataList.append(jsonData)
jsonRDD = sc.parallelize(jsonDataList)
df = spark.read.json(jsonRDD)
#to view the schema use
df.printSchema()
root
|-- cleanup_duration: long (nullable = true)
|-- cluster_instance: struct (nullable = true)
| |-- cluster_id: string (nullable = true)
|-- cluster_spec: struct (nullable = true)
| |-- new_cluster: struct (nullable = true)
| | |-- attributes: struct (nullable = true)
| | | |-- memory: string (nullable = true)
| | | |-- type: string (nullable = true)
| | |-- enable_elastic_disk: string (nullable = true)
| | |-- num_workers: long (nullable = true)
| | |-- spark_version: string (nullable = true)
|-- creator_user_name: string (nullable = true)
|-- execution_duration: long (nullable = true)
|-- job_id: long (nullable = true)
|-- number_in_job: long (nullable = true)
|-- run_id: long (nullable = true)
|-- run_name: string (nullable = true)
|-- run_page_url: string (nullable = true)
|-- run_type: string (nullable = true)
|-- setup_duration: long (nullable = true)
|-- start_time: long (nullable = true)
|-- state: struct (nullable = true)
| |-- life_cycle_state: string (nullable = true)
| |-- state_message: string (nullable = true)
|-- task: struct (nullable = true)
| |-- notebook_task: struct (nullable = true)
| | |-- notebook_path: string (nullable = true)
Method #1 - Scala Spark - Create a Spark DataFrame from a JSON string