Change data feed demo(Python)

Loading...

Demo of Delta Lake change data feed

Create a silver table that tracks absolute number vaccinations and available doses by country

countries = [("USA", 10000, 20000), ("India", 1000, 1500), ("UK", 7000, 10000), ("Canada", 500, 700) ]
columns = ["Country","NumVaccinated","AvailableDoses"]
spark.createDataFrame(data=countries, schema = columns).write.format("delta").mode("overwrite").saveAsTable("silverTable")
%sql
SELECT * FROM silverTable
 
Country
NumVaccinated
AvailableDoses
1
2
3
4
Canada
500
700
USA
10000
20000
India
1000
1500
UK
7000
10000

Showing all 4 rows.

import pyspark.sql.functions as F
spark.read.format("delta").table("silverTable").withColumn("VaccinationRate", F.col("NumVaccinated") / F.col("AvailableDoses")) \
  .drop("NumVaccinated").drop("AvailableDoses") \
  .write.format("delta").mode("overwrite").saveAsTable("goldTable")

Generate gold table showing vaccination rate by country

%sql
SELECT * FROM goldTable
 
Country
VaccinationRate
1
2
3
4
Canada
0.7142857142857143
UK
0.7
India
0.6666666666666666
USA
0.5

Showing all 4 rows.

Enable change data feed on silver table

%sql
ALTER TABLE silverTable SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
OK

Update silver table daily

# Insert new records
new_countries = [("Australia", 100, 3000)]
spark.createDataFrame(data=new_countries, schema = columns).write.format("delta").mode("append").saveAsTable("silverTable")
%sql
-- update a record
UPDATE silverTable SET NumVaccinated = '11000' WHERE Country = 'USA'
 
num_affected_rows
1
1

Showing all 1 rows.

%sql
-- delete a record
DELETE from silverTable WHERE Country = 'UK'
 
num_affected_rows
1
1

Showing all 1 rows.

%sql
SELECT * FROM silverTable
 
Country
NumVaccinated
AvailableDoses
1
2
3
4
USA
11000
20000
Canada
500
700
Australia
100
3000
India
1000
1500

Showing all 4 rows.

Explore the change data in SQL and PySpark

%sql 
-- view the changes
SELECT * FROM table_changes('silverTable', 2, 5) order by _commit_timestamp
 
Country
NumVaccinated
AvailableDoses
_change_type
_commit_version
_commit_timestamp
1
2
3
4
Australia
100
3000
insert
2
2021-04-14T20:26:37.000+0000
USA
10000
20000
update_preimage
3
2021-04-14T20:26:39.000+0000
USA
11000
20000
update_postimage
3
2021-04-14T20:26:39.000+0000
UK
7000
10000
delete
4
2021-04-14T20:26:40.000+0000

Showing all 4 rows.

changes_df = spark.read.format("delta").option("readChangeData", True).option("startingVersion", 2).table('silverTable')
display(changes_df)
 
Country
NumVaccinated
AvailableDoses
_change_type
_commit_version
_commit_timestamp
1
2
3
4
UK
7000
10000
delete
4
2021-04-14T20:26:40.000+0000
USA
10000
20000
update_preimage
3
2021-04-14T20:26:39.000+0000
USA
11000
20000
update_postimage
3
2021-04-14T20:26:39.000+0000
Australia
100
3000
insert
2
2021-04-14T20:26:37.000+0000

Showing all 4 rows.

Propagate changes from silver to gold table

%sql
-- Collect only the latest version for each country
CREATE OR REPLACE TEMPORARY VIEW silverTable_latest_version as
SELECT * 
    FROM 
         (SELECT *, rank() over (partition by Country order by _commit_version desc) as rank
          FROM table_changes('silverTable', 2, 5)
          WHERE _change_type !='update_preimage')
    WHERE rank=1
OK
%sql
-- Merge the changes to gold
MERGE INTO goldTable t USING silverTable_latest_version s ON s.Country = t.Country
        WHEN MATCHED AND s._change_type='update_postimage' THEN UPDATE SET VaccinationRate = s.NumVaccinated/s.AvailableDoses
        WHEN NOT MATCHED THEN INSERT (Country, VaccinationRate) VALUES (s.Country, s.NumVaccinated/s.AvailableDoses)
 
num_affected_rows
num_updated_rows
num_deleted_rows
num_inserted_rows
1
1
1
0
0

Showing all 1 rows.

%sql
SELECT * FROM goldTable
 
Country
VaccinationRate
1
2
3
4
5
Canada
0.7142857142857143
Australia
0.03333333333333333
UK
0.7
USA
0.55
India
0.6666666666666666

Showing all 5 rows.

Clean up tables

%sql
DROP TABLE silverTable;
DROP TABLE goldTable;
OK