Azure Data Factory introduced new feature in data flow: ‘incremental extract’ allows to read only rows that changed (delta or incremental data) on Azure SQL DB sources using mapping data flow.
Currently this feature is in Public Preview.
This feature is one of the ways to the implement ‘Incremental ETL’ and ‘CDC – Change Data Capture’.As it is by managed by ADF, there is no trigger at source on changes in data.
In this article, we will have a quick and simple demo to implement ‘incremental extract’ feature by loading source data in initial run and adding/updating rows to verify the incremental load in subsequent run in monitor/sink.
This features provides us option to full load of source data in first run and load only delta i.e. incremental changes like rows added or updated in subsequent run .
IMPORTANT!!!
This feature will work when source table has timestamp column, so that ADF store the watermark and query the changed rows for us. ADF uses this timestamp column to identify the rows which were changed since last run.
Prerequisite:
Apart from creating Azure Data factory and source (Azure SQL DB)/sink (Data Lake Gen2), we need to ensure source table in Azure SQL Database has a timestamp column.
What services are created for this demo?
For demo, we have created a simple Employee table having EmpID, Salary and LogDT; LogDT is datetime column to store timestamp whenever any rows is added/updated. Below we can see the 3 available rows of Employee table which will be loaded in initial run.
We have also created below Azure SQL Database datasets based on Employee table as shown below:
Next we have to create mapping data flow and add the Azure SQL database as source and modify the ‘Source options’ tabs as below below image:
Three relevant fields in mapping data flow under ‘Source Option’ of Azure SQL DB:
- Enable incremental extract (Preview): Once enabled, this source will read the new data only since previous run i.e. delta changes
- Incremental date column (Preview): This is the timestamp column that we discussed above. This source will read new data based on this column value, since the previous run.
- Start reading from beginning (Preview):If checked, There will be initial load of full data in the first run, followed by reading data incrementally. If not checked, the initial load will be skipped in the first run, followed by capturing changed data in next runs.
Now lets run the first run by triggering now option in pipeline and monitor the pipeline after the run:
Here we can see the rows written is 3 what we has in source table;
Now let’s see the partitioned file in Azure Data Lake where the same 3 rows has been inserted into.
So in first run, we have verified the number of written in sink which is same as mentioned in pipeline monitor and source table.
Now let’s insert one new record in the source table as shown below.
INSERT INTO Employee VALUES (4,400,getdate())
And now run the pipeline again, it should write only one record in sink.
Below monitor page, shows only one record as been written.
and below new partition file is created having only one that new record.
So in the subsequent run only the new record i.e. delta changes has been writing in sink since last run. This is the incremental extract of source data.
Now update the existing record as below:
UPDATE Employee SET Salary = 500, LogDT=getdate() where EmpID=1
and run the pipeline, it should only write only one record (updated) in sink. Below monitor page, shows only one record as been written.
and below new partition file is created having only one that record which was updated i.e. it is also incremental extracted.
Hope we already the understood the usage of the new ‘incremental extract‘ feature using a simple demo
Please feel free to comment your query/thought on this. Thanks for your time to read this important update.
Happy Coding.