Identify duplicate data on append example(Scala)

Loading...

This example notebook shows you how to identify the source (notebook or job id) of duplicate data in an append scenario.

User 1 is ingesting data to table default.events_tbl10.

import org.apache.spark.sql.functions._
 
val rawData = spark
              .read 
              .json("/databricks-datasets/structured-streaming/events/") 
              .drop("time") 
              .withColumn("date", expr("cast(concat('2018-01-', cast(rand(5) * 30 as int) + 1) as date)")) 
              .withColumn("deviceId", expr("cast(rand(5) * 100 as int)")).dropDuplicates("deviceId").limit(100)
 
rawData.write.mode("append").partitionBy("action","date").format("delta").saveAsTable("default.events_tbl10")
 
display(rawData)
 
action
date
deviceId
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Close
2018-01-01
0
Open
2018-01-01
1
Close
2018-01-01
2
Close
2018-01-01
3
Open
2018-01-02
4
Close
2018-01-02
5
Close
2018-01-02
6
Close
2018-01-03
7
Close
2018-01-03
8
Open
2018-01-03
9
Close
2018-01-04
10
Close
2018-01-04
11
Open
2018-01-04
12
Close
2018-01-04
13
Close
2018-01-05
14
Open
2018-01-05
15
Close
2018-01-06
16

Showing all 100 rows.

User 2 does not know this data was already ingested to default.events_tbl10 and repeated the ingestion in another job.

 
val rawData = spark
              .read 
              .json("/databricks-datasets/structured-streaming/events/") 
              .drop("time") 
              .withColumn("date", expr("cast(concat('2018-01-', cast(rand(5) * 30 as int) + 1) as date)")) 
              .withColumn("deviceId", expr("cast(rand(5) * 100 as int)")).dropDuplicates("deviceId").limit(100)
 
rawData.write.mode("append").partitionBy("action","date").format("delta").saveAsTable("default.events_tbl10")
rawData: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [action: string, date: date ... 1 more field]

default.events_tbl10 has duplicates but you don't know which notebook or job created the duplicate data. You can write a simple query to identify how much data was duplicated.

%sql select count(*) as count, deviceId from default.events_tbl10 group by deviceId order by deviceId
 
count
deviceId
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
2
0
2
1
2
2
2
3
2
4
2
5
2
6
2
7
2
8
2
9
2
10
2
11
2
12
2
13
2
14
2
15
2
16

Showing all 100 rows.

Narrow it down and concentrate on a few samples. In our example, all deviceID's are duplicated. Pick deviceID 0 and run a query using input_file_name to identify the parquet path.

%sql select *, input_file_name() as path from default.events_tbl10  where deviceId=0
 
action
date
deviceId
path
1
2
Open
2018-01-01
0
dbfs:/user/hive/warehouse/events_tbl10/action=Open/date=2018-01-01/part-00000-c4ec83b7-b901-4e11-bdc6-5deacb715575.c000.snappy.parquet
Open
2018-01-01
0
dbfs:/user/hive/warehouse/events_tbl10/action=Open/date=2018-01-01/part-00000-f1144779-3215-4ef2-be0a-5058d8326433.c000.snappy.parquet

Showing all 2 rows.

You need to find out which Delta version the files belong to. Describe the table and look for the location.

%sql describe table extended default.events_tbl10  -- Note down the location of the table to debug for the next step. It is here in row 11.
 
col_name
data_type
comment
1
2
3
4
5
6
7
8
9
10
11
12
13
14
action
string
date
date
deviceId
int
# Partitioning
Part 0
action
Part 1
date
# Detailed Table Information
Name
default.events_tbl10
Location
dbfs:/user/hive/warehouse/events_tbl10
Provider
delta
Type
MANAGED
Table Properties
[delta.minReaderVersion=1,delta.minWriterVersion=2]

Showing all 14 rows.

Use grep to search the location for the parquet files.

%sh grep -r 'part-00000-c4ec83b7-b901-4e11-bdc6-5deacb715575.c000.snappy.parquet' /dbfs/user/hive/warehouse/events_tbl10/_delta_log
 
/dbfs/user/hive/warehouse/events_tbl10/_delta_log/00000000000000000001.json:{"add":{"path":"action=Open/date=2018-01-01/part-00000-c4ec83b7-b901-4e11-bdc6-5deacb715575.c000.snappy.parquet","partitionValues":{"action":"Open","date":"2018-01-01"},"size":538,"modificationTime":1624533982000,"dataChange":true,"stats":"{\"numRecords\":2,\"minValues\":{\"deviceId\":0},\"maxValues\":{\"deviceId\":1},\"nullCount\":{\"deviceId\":0}}"}}
%sh grep -r 'part-00000-f1144779-3215-4ef2-be0a-5058d8326433.c000.snappy.parquet' /dbfs/user/hive/warehouse/events_tbl10/_delta_log
 
/dbfs/user/hive/warehouse/events_tbl10/_delta_log/00000000000000000000.json:{"add":{"path":"action=Open/date=2018-01-01/part-00000-f1144779-3215-4ef2-be0a-5058d8326433.c000.snappy.parquet","partitionValues":{"action":"Open","date":"2018-01-01"},"size":538,"modificationTime":1624533827000,"dataChange":true,"stats":"{\"numRecords\":2,\"minValues\":{\"deviceId\":0},\"maxValues\":{\"deviceId\":1},\"nullCount\":{\"deviceId\":0}}"}}

The grep results identified the impacted versions as 00000000000000000000.json and 00000000000000000001.json. Use describe history to idenfity the user name, as well as the notebook or job id that caused the duplicate to appear in the Delta table.

%sql select * from (describe history default.events_tbl10 ) t where t.version In(0,1)