import org.apache.spark.sql.types._ val schema = new StructType() .add("_c0",IntegerType,true) .add("carat",DoubleType,true) .add("cut",StringType,true) .add("color",StringType,true) .add("clarity",StringType,true) .add("depth",IntegerType,true) // The depth field is defined wrongly. The actual data contains floating point numbers, while the schema specifies an integer. .add("table",DoubleType,true) .add("price",IntegerType,true) .add("x",DoubleType,true) .add("y",DoubleType,true) .add("z",DoubleType,true) .add("_corrupt_record", StringType, true) // The schema contains a special column _corrupt_record, which does not exist in the data. This column captures rows that did not parse correctly. val diamonds_with_wrong_schema = spark.read.format("csv") .option("header", "true") .schema(schema) .load("/databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv")
import org.apache.spark.sql.types._
schema: org.apache.spark.sql.types.StructType = StructType(StructField(_c0,IntegerType,true), StructField(carat,DoubleType,true), StructField(cut,StringType,true), StructField(color,StringType,true), StructField(clarity,StringType,true), StructField(depth,IntegerType,true), StructField(table,DoubleType,true), StructField(price,IntegerType,true), StructField(x,DoubleType,true), StructField(y,DoubleType,true), StructField(z,DoubleType,true), StructField(_corrupt_record,StringType,true))
diamonds_with_wrong_schema: org.apache.spark.sql.DataFrame = [_c0: int, carat: double ... 10 more fields]
// The mistake in the user-specified schema causes any row with a non-integer value in the depth column to be nullified. // There are some rows, where the value of depth is an integer e.g. 64.0. They are parsed and coverted successfully. // The _currupt_record column shows the string with original row data, which helps find the issue. display(diamonds_with_wrong_schema)
// Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when the referenced columns only include the internal corrupt record column (named _corrupt_record by default). // For example: spark.read.schema(schema).csv(file).filter($"_corrupt_record".isNotNull).count() and spark.read.schema(schema).csv(file).select("_corrupt_record").show(). // Instead, you can cache or save the parsed results and then send the same query. val badRows = diamonds_with_wrong_schema.filter($"_corrupt_record".isNotNull) badRows.cache() val numBadRows = badRows.count() badRows.unpersist()
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 167.0 failed 4 times, most recent failure: Lost task 0.3 in stage 167.0 (TID 877, 10.97.245.56, executor 0): com.databricks.sql.io.FileReadException: Error while reading file dbfs:/databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv.
val diamonds_with_wrong_schema_drop_malformed = spark.read.format("csv") .option("mode", "DROPMALFORMED") .option("header", "true") .schema(schema) .load("/databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv")
diamonds_with_wrong_schema_drop_malformed: org.apache.spark.sql.DataFrame = [_c0: int, carat: double ... 10 more fields]
val diamonds_with_wrong_schema_fail_fast = spark.read.format("csv") .option("mode", "FAILFAST") .option("header", "true") .schema(schema) .load("/databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv")
diamonds_with_wrong_schema_fail_fast: org.apache.spark.sql.DataFrame = [_c0: int, carat: double ... 10 more fields]
display(diamonds_with_wrong_schema_fail_fast)
SparkException: Job aborted due to stage failure: Task 0 in stage 170.0 failed 4 times, most recent failure: Lost task 0.3 in stage 170.0 (TID 882, 10.97.240.8, executor 1): com.databricks.sql.io.FileReadException: Error while reading file dbfs:/databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv.
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anon$2.logFileNameAndThrow(FileScanRDD.scala:340)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anon$2.getNext(FileScanRDD.scala:319)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:406)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:259)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at org.apache.spark.sql.execution.collect.UnsafeRowBatchUtils$.encodeUnsafeRows(UnsafeRowBatchUtils.scala:80)
at org.apache.spark.sql.execution.collect.Collector.$anonfun$processFunc$1(Collector.scala:187)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:144)
at org.apache.spark.scheduler.Task.run(Task.scala:117)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$9(Executor.scala:639)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1559)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:642)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Malformed records are detected in record parsing. Parse Mode: FAILFAST. To process malformed records as null result, try setting the option 'mode' as 'PERMISSIVE'.
at org.apache.spark.sql.catalyst.util.FailureSafeParser.parse(FailureSafeParser.scala:77)
at org.apache.spark.sql.catalyst.csv.UnivocityParser$.$anonfun$parseIterator$2(UnivocityParser.scala:411)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anon$2.getNext(FileScanRDD.scala:292)
... 15 more
Caused by: org.apache.spark.sql.catalyst.util.BadRecordException: java.lang.NumberFormatException: For input string: "61.5"
at org.apache.spark.sql.catalyst.csv.UnivocityParser.org$apache$spark$sql$catalyst$csv$UnivocityParser$$convert(UnivocityParser.scala:308)
at org.apache.spark.sql.catalyst.csv.UnivocityParser.$anonfun$parse$2(UnivocityParser.scala:253)
at org.apache.spark.sql.catalyst.csv.UnivocityParser$.$anonfun$parseIterator$1(UnivocityParser.scala:404)
at org.apache.spark.sql.catalyst.util.FailureSafeParser.parse(FailureSafeParser.scala:64)
... 21 more
Caused by: java.lang.NumberFormatException: For input string: "61.5"
at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
at java.lang.Integer.parseInt(Integer.java:580)
at java.lang.Integer.parseInt(Integer.java:615)
at scala.collection.immutable.StringLike.toInt(StringLike.scala:304)
at scala.collection.immutable.StringLike.toInt$(StringLike.scala:304)
at scala.collection.immutable.StringOps.toInt(StringOps.scala:33)
at org.apache.spark.sql.catalyst.csv.UnivocityParser.$anonfun$makeConverter$6(UnivocityParser.scala:156)
at org.apache.spark.sql.catalyst.csv.UnivocityParser.$anonfun$makeConverter$6$adapted(UnivocityParser.scala:156)
at org.apache.spark.sql.catalyst.csv.UnivocityParser.nullSafeDatum(UnivocityParser.scala:237)
at org.apache.spark.sql.catalyst.csv.UnivocityParser.$anonfun$makeConverter$5(UnivocityParser.scala:156)
at org.apache.spark.sql.catalyst.csv.UnivocityParser.org$apache$spark$sql$catalyst$csv$UnivocityParser$$convert(UnivocityParser.scala:290)
... 24 more
Driver stacktrace:
PERMISSIVE mode (default)
Last refresh: Never