In this article we will have a demo on implementing Delta Lake’s Change Data Feed (CDF) using Multi-hop or Medallion Architecture in Azure Databricks.
Need for Change Data Feed (CDF):
Simplify CDC: Change Data Feed (CDF) addresses these challenges we faces using Change Data Capture (CDC) like improving:
- Quality Control by tracking row level changes.
- Efficiency of downstream consumption of Merge, Update and Delete operation.
Change Data Feed (CDF) feature :
- It allows Delta tables to track row-level changes between versions of a Delta table.
- Runtime records “change events” for all the data written into the table for row was inserted, deleted, or updated.
- We can read the change events in batch queries and in streaming queries
- Read batch queries using SQL and DataFrame APIs (that is, df.read)
- Read streaming queries using DataFrame APIs (that is, df.readStream)
Top Use Cases:
- Accelerate & simplify ETL and ELT: Improve Delta performance by processing only row-level changes following initial MERGE, UPDATE, or DELETE.
- Quick Materialized views: Create aggregated views only on Changes Data instead of entire dataset BI & analytics.
- Move changes data feed: Send a change data feed to downstream systems for incrementally processing.
Implementing Change Data Feed using Medallion Architecture:
Source Data:
For demo, we will create source data manually using data frame and later create temp view out of the data frame.
Dummy data is financial data provided by Databricks.
We will use 2 sets of input datasets – one is for initial load and another is for Change Data Feed.
Before we create Bronze, Silver and Gold table, we will enable the Change Data Feed for all new table. We can also enable this feature of the existing table .
Enable Change Data Feed in DeltaLake:
Change Data Feed is not enabled by default.
Enable Change Data Feed for all new table:
We can enable CDF for all new Delta tables in this scope using Spark conf setting:
Now let’s create Bronze, Silver and Gold table as mentioned in below image:
Enable change data feed for a new table:
We can enable CDF using SQL Create table command by providing property in TBLPROPERTIES as below:
Enable Change Data Feed for existing table
We can enable CDF using SQL alter table command by providing property in TBLPROPERTIES as below:
Verify whether CDF is enabled for a specific table:
To verify we need to execute below sql command on table and as a result, table properties is shown as “delta.enableChangeDataFeed= true” which is highlighted in next image.
First Load into Bronze table: Create Dummy data as Temp View
By running below code we will create data frame first and next create temp view from data frame.
Now let’s insert the same temp view into bronze table
Read changes data of the Bronze table
We can read change data feed either using version or timestamp for the start and end. Two ways we can read this data – one in batch queries and another in stream queries.
Let’s query the Bronze table using table_changes operation as batch queries from commit version 1 and result will be displayed as shown in next image.
We can optionally view bronze table data using sql select statement by running SELECT * FROM bronze_eps.
Initial load Bronze data into Silver Table:
Now there is no data in Silver table, so let’s insert bronze data into Silver table as shown in below code. Here we are fetching data from Bronze table using initial commit version number 1.
Please note, here we should clean or refine the bronze data before loading into silver table.
Read changes data of the Silver table
Now let’s query the new data for version 1 from Silver table and result is displayed as shown in next image:
View Silver layer directory
Let’s execute below PySpark code to view silver directory of delta lake; Result is displayed in next image:
First load Silver data into Gold Table:
Now there is no data in Gold table, so let’s execute below SQL code to load Silver data into Gold table :
Now let’s run SELECT statement on gold_consensus_eps gold table to view data:
Second Load in Bronze table: Create Dummy data as data frame:
Create new set of data for bronze table to merged with silver table later as mentioned in below subsequent images:
Let’s check the new feed of bronze_eps table from version 2:
Table data:
Merging the new data feed from bronze table into silver table
Now we have 2nd raw data set to be merged into silver. This new data set contains new data and existing data. So existing data will be updated in silver table using MERGE command.
Here, for demo purpose we are using commit version 2 as this is max version for bronze table; but we should use maximum commit version to get the latest change data feed.
Let’s view the Change Data Feed as a result of the above merge by executing below sql queries and result is displayed next to this code:
Silver table changed feed data for commit version 2:
Now let’s view the silver directory of delta lake:
Note that we have an additional metadata directory nested in our table directory, _change_data.
Let’s view the _change_data sub directory:
Loading into gold table using Change Data Feed only :
Inserting only changed feed of silver table into gold table : silver commit version 2
Please note, here we are using version no 2 of Silver table to get only latest Change Data Feed to load into gold table. Existing data of Gold table will not be updated, new data will be appended.
Now let’s view the final data from gold table by executing select * from gold_consensus_eps
- So Gold table is appended with new data from Silver table’s change data feed only.
- It does not take existing data of silver table.
- It takes only new or updated data based on new loads from Bronze table.
- Existing data of gold table is not updated but only new data in appended. This is the affect of change data feed.
Thanks for reading the article. Please feel free to comment in case of any query or thought on this.