Faster Change Data Capture for your Data Lake – Servian

Alan Bluwol

The intent of this article is to discuss and present a new, faster approach to performing Change Data Capture (CDC) for your Data Lake using SQL.

While Centre for Disease Control is a fitting acronym considering COVID-19 — here I’m actually talking about Change Data Capture.

Change Data Capture refers to the process or technology for identifying and capturing changes made to a dataset. Those changes can then be applied to another data set or made available in a format consumable by data integration tools. This is typically done to keep systems in sync and to maintain data record history as it changes over time.

To explain the process, I’ll attempt to diagrammatically represent the concepts.

Source is the origin of a data set, typically from an operational system that is maintained day-to-day as part of usual business operations.

Target is a representation of a source, typically in a data lake or warehouse, that maintains current and historical views of data from operational systems.

Current is a term used to describe the temporal state of data, relative to the environment in which it resides. Current is the “latest” view of data. Current in source may be newer than current in target.

To distinguish the current view from the two sources, we’ll use relative terms from the perspective of the target. Source represents the next available version of data to be loaded into the target.

Another way to represent the same thing is to use the T notation to indicate time.

Sources may be in the form of extracts, each representing snapshots of a data set at a point in time, which contains unique up-to-date records. Deleted records may be either physical (not present) or logical (flagged deleted/inactive).

  • T-1 is History in Target
  • T1 is Current in Target
  • T2 is the Next available version of data in Source
  • T3, T4 (and so on) are the Next available future versions of the data in Source in chronological order

From Source, we want to load the next available version of the data set (typically a full system snapshot).

In Target, we want to effect changes according to business rules, typically to capture and maintain data record history.

Full-Join CDC

Full Join is the typical approach to solving CDC.

In SQL the FULL OUTER JOIN combines the results of both left and right outer joins and returns all (matched or unmatched) rows from the tables on both sides of the join clause.

This approach is a two-stage process:

  1. Full Join
  2. Case

Full Outer Join is typically used to compare two versions of data sets (e.g. account) to detect changes on both sides.

Assume source and target are loaded into the same database (perhaps in different schemas), we can compare them using the SQL statement below:

SELECT * 
FROM SOURCE S
FULL JOIN TARGET T
ON S.BK = T.BK

Step 1 — Full Join

The Venn diagram shows the two data sets and the logical relationship between them.

The three areas of the Venn diagram helps illustrate the state of the two versions of the data sets.

Step 2 — Case

After the join, we use a case statement to determine the state of each side to indicate how we’ll treat each row of the source to change the target to keep it aligned.

Note, when both sides still exist, we need to determine if the record is updated, unchanged or even logically deleted.

CASE
WHEN T.BK IS NULL THEN 'insert'
WHEN S.BK IS NULL THEN 'delete'
WHEN T.BK = S.BK AND S.HASH <> T.HASH THEN 'update'
ELSE 'no-change'
END

Deletes may occur in different ways:

  • Source no longer exists
  • Source exists and flagged as deleted

There may be other business rules that govern the detection of a deleted record.

CASE 
WHEN S.BK IS NULL THEN 'delete'
WHEN T.BK = S.BK AND S.ACTIVE = FALSE THEN 'delete'
END

Performance

We ran this approach to CDC in AWS Redshift to baseline performance with the following result:

Environment: AWS
Database: Redshift
Instance: ds2.xlarge
Nodes: 8
vCPU: 4
ECU: 14
Memory: 31 GiB
Storage: 2TB HDD
I/O: 0.4 GB/sec
Records: 20 million
Run Time: 40 minutes

Joins can be inefficient — the larger the tables of the join become, the more data needs to be shipped between nodes.

It’s possible to get into a situation where the entire table needs to be shipped to every node working on the query, as opposed to just processing it within the nodes it resides.

Union-Lead CDC

The principle of this solution is essentially the same as a Full Join, but the strategy is different and leverages the power of MPP (Massively Parallel Processing) found in databases like AWS Redshift, Snowflake and BigQuery more effectively.

This approach is a three-stage process:

  1. Union All
  2. Lead
  3. Case

Step 1 — Union All

Union All brings together the two versions of the data set from source and target into the same data set.

The union co-locates the two versions of the data set into a single view, distributed across all nodes according to its configured DistKey.

Assuming the two data sets are already distributed consistently, the union is very quick.

Note, although star (*) is illustrated in the example query, it is recommended that only the key columns required for comparison are brought together via the union.

SELECT * 
FROM SOURCE S
UNION ALL
SELECT *
FROM TARGET T

Step 2 — Lead

The window function Lead brings together the two versions of the data set from source and target into the same row, by business key.

SELECT ..., 
LEAD(HASH, 1)
OVER (
PARTITION BY BK
ORDER BY TIMESTAMP DESC )

This function will prepare the data for comparison, similar to a join, but will do it in parallel across all nodes, without needing to move data around.

Step 3 — Case

Just as before, use a case statement to determine the state of source and target.

CASE 
WHEN TARGET.BK IS NULL THEN 'insert'
WHEN SOURCE.BK IS NULL THEN 'delete'
WHEN TARGET.BK = S.BK AND SOURCE.HASH <> TARGET.HASH THEN 'update'
ELSE 'no-change'
END

The logic is exactly the same as we discussed before.

Performance

But the performance is nothing like the traditional approach!

Using the same AWS Redshift instance and the alternative approach yielded the following result:

Environment: AWS
Database: Redshift
Instance: ds2.xlarge
Nodes: 8
vCPU: 4
ECU: 14
Memory: 31 GiB
Storage: 2TB HDD
I/O: 0.4 GB/sec
Records: 20 million
Run Time: 2 minutes (20x faster)

And it scales really well — because the size of the table becomes less important when the data is co-located amongst nodes for comparison.

Localised merging and comparison permits parallel analysis in-situ, to achieve the same outcome we traditionally would have used joins to solve.

It isn’t enough to simply adopt standard, well established, legacy solution to problems. Although they work, they may be sub-optimal. Solutions need to be tweaked and re-engineered to fit into the context in which it’s run. In this case, modern database systems (columnar, MPP, server-less) organise and process data differently to traditional database systems — therefore solutions to problems like CDC ought to leverage the capability at its disposal, and doing so could see drastic performance improvements.