A beginner's guide for Change Data Capture (CDC)
You've probably heard the term "CDC" a lot nowadays. Sometimes it refers to the US Federal CDC (Center for Disease Control and Prevention). Sometimes it refers to the technology that synchronizes data in real-time (Change Data Capture).
I personally heard about CDC and Debezium 7 years ago, when I was at Splunk, managing the engineering team for DB Connect app. One of the key features of this app was db-input, which loads data from external databases into Splunk. A common practice back then (and even today) is to run SQL, `SELECT * FROM table WHERE timestamp>?` at a certain interval via a JDBC driver, to get the new data since the last sync. However, this solution assumes the data is append-only; UPDATE or DELETE cannot be detected. It will also impact the performance of the target database, and may miss some data. One day, our lead engineer mentioned the term "CDC" and the odd name "Debezium". That was back in 2016, in the same year Red Hat released the 0.1 version.
For anyone working in or about to work in the data/analytics world, CDC is such an important concept that you cannot miss. There are many good resources online, and Debezium provides comprehensive documentation. But I hope this blog can help beginners quickly understand what CDC is, why use CDC, and how to get their hands dirty and see it in action.
This is not meant to be a definitive guide for CDC, and I don't plan to use ChatGPT or even Google to provide lengthy content that you can easily get elsewhere.
Let's dive in now.
CDC in a nutshell
In a nutshell, here are key facts that you need to know about CDC:
Just like REST (or the old-school SOA), CDC refers to an "idea" to synchronize data. It's not a particular project or software from certain vendors.
The main "idea" of CDC is to have the database continuously broadcast what has been changed, in a binary format, so that the "subscribers" can get real-time notifications for the changes – not just INSERT, but also UPDATE, DELETE or even ALTER. To capture what has been in the database before the CDC process is set up, it can usually take a snapshot for existing data.
Main benefits of CDC:
Fast: gets real-time events, instead of pulling data at certain intervals
Complete: not just INSERT, but also UPDATE, DELETE and ALTER
Independent: analytics system should not keep querying the transactional databases, for both security and performance reasons
Different databases can provide different ways to "announce" such data changes. For example, some CDC clients act as read replicas to join the cluster, or some CDC clients receive the binlog or WAL from the main database. Some systems even allow you to put webhook URLs to receive the data changes.
Popular open source CDC are: Debezium, Maxwell, Estuary, etc.
There is no universal common schema for CDC data. Each CDC software can design its own format, and even for the same tool, the data structure for different databases can be different.
Since CDC is all about real-time database changes, the data is usually continuously put into a data streaming platform, such as Apache Kafka, Redpanda, or Apache Pulsar, then processed by streaming analytics systems, such as Flink, Spark, or Timeplus.
Here is a sample CDC data, generated by Debezium 2.2, from a UPDATE in a postgresql database.
{
"before": {
"product_id": "iPhone14",
"price": 799
},
"after": {
"product_id": "iPhone14",
"price": 800
},
"source": {
"version": "2.2.0.Final",
"connector": "postgresql",
"name": "doc",
"ts_ms": 1682217416146,
"snapshot": "false",
"db": "defaultdb",
"sequence": "[\"385876616\",\"385877064\"]",
"schema": "public",
"table": "dim_products",
"txId": 1335,
"lsn": 385877064,
"xmin": null
},
"op": "u",
"ts_ms": 1682217416358,
"transaction": null
}
Okay, I think that's enough for CDC 101. Let's get our hands dirty by setting a simple system to show live database changes in dashboards. For visual learners, you can also check out the YouTube video below:
There are many tutorials using Docker Compose to set up everything on your laptop. You can try something different, by minimizing the number of local containers and leveraging free cloud services.
In this example, I set up a free postgresql on Aiven, a free trial of Redpanda Cloud as Kafka alternative, and a free streaming analytics workspace on Timeplus Cloud.
On my laptop, all I need is a Docker container for Debezium 2.2, which you can run with a single command:
docker run -d -it --rm --name connect \
-p 8083:8083 -e GROUP_ID=1 \
-e CONFIG_STORAGE_TOPIC=my_connect_configs \
-e OFFSET_STORAGE_TOPIC=my_connect_offsets \
-e STATUS_STORAGE_TOPIC=my_connect_statuses \
-e BOOTSTRAP_SERVERS=redpanda-or-kafka:9092 \
quay.io/debezium/connect:2.2.0.Final
This docker image is actually a Kafka Connect standalone server with built-in Debezium connectors. It needs to connect to the Kafka/Redpanda brokers to write CDC data and store the config/offset/status for the connectors.
I also set up a Redpanda Console locally. It's a nice web app to manage topics and connectors on Redpanda and Kafka Connect. You can of course do everything with curl, but why not give their friendly UI a try?
Here is the docker-compose.yml
version: "3.7"
services:
connect:
image: quay.io/debezium/connect:2.2.0.Final
container_name: connect
ports:
- 8083:8083
environment:
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses
- BOOTSTRAP_SERVERS=${BOOTSTRAP_SERVERS}
# CONNECT_ properties are for the Connect worker
- CONNECT_BOOTSTRAP_SERVERS=${BOOTSTRAP_SERVERS}
- CONNECT_SECURITY_PROTOCOL=${SECURITY_PROTOCOL}
- CONNECT_SASL_JAAS_CONFIG=${SASL_JAAS_CONFIG}
- CONNECT_SASL_MECHANISM=SCRAM-SHA-512
- CONNECT_PRODUCER_SECURITY_PROTOCOL=${SECURITY_PROTOCOL}
- CONNECT_PRODUCER_SASL_JAAS_CONFIG=${SASL_JAAS_CONFIG}
- CONNECT_PRODUCER_SASL_MECHANISM=SCRAM-SHA-512
- CONNECT_CONSUMER_SECURITY_PROTOCOL=${SECURITY_PROTOCOL}
- CONNECT_CONSUMER_SASL_JAAS_CONFIG=${SASL_JAAS_CONFIG}
- CONNECT_CONSUMER_SASL_MECHANISM=SCRAM-SHA-512
- CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE=false
- CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false
console:
image: docker.redpanda.com/redpandadata/console:v2.2.3
container_name: redpanda_console
depends_on:
- connect
ports:
- 8080:8080
entrypoint: /bin/sh
command: -c "echo \"$$CONSOLE_CONFIG_FILE\" > /tmp/config.yml; /app/console"
environment:
CONFIG_FILEPATH: /tmp/config.yml
CONSOLE_CONFIG_FILE: |
kafka:
brokers: ${BOOTSTRAP_SERVERS}
sasl:
enabled: true
username: ${SASL_USERNAME}
password: ${SASL_PASSWORD}
mechanism: SCRAM-SHA-512
tls:
enabled: true
connect:
enabled: true
clusters:
- name: local-connect-cluster
url: http://connect:8083
My Redpanda Cloud endpoint is secured with SCRAM-SHA-512. There are some repeated settings in the Debezium configuration. I created a .env file in the same folder to minimize duplication.
BOOTSTRAP_SERVERS=seed-1234.fmc.prd.cloud.redpanda.com:9092
SASL_USERNAME=theuser
SASL_PASSWORD=thepassword
SASL_JAAS_CONFIG=org.apache.kafka.common.security.scram.ScramLoginModule required username="${SASL_USERNAME}" password="${SASL_PASSWORD}";
SECURITY_PROTOCOL=SASL_SSL
Next, start the stack via `docker-compose up`. It will take a while to load images for the first time. If you've configured your Redpanda correctly, you can see the topic list on http://localhost:8080/topics/
Setup PostgreSQL
Now, let's set up the postgresql.
You can get the database connection information from Aiven management console. The database is called `defaultdb`. Use any SQL client (TablePlus is a nice one) to connect to the database and run the following SQL to create 2 tables:
CREATE TABLE dim_products(
product_id VARCHAR PRIMARY KEY,
price FLOAT
);
CREATE TABLE orders(
order_id serial PRIMARY KEY,
product_id varchar,
quantity int8 DEFAULT 1,
timestamp timestamp DEFAULT NOW()
);
In order to capture the before values for UPDATE or DELETE, you also need to set REPLICA IDENTIFY to FULL. Refer to the Debezium documentation for more details.
ALTER TABLE dim_products REPLICA IDENTITY FULL;
ALTER TABLE orders REPLICA IDENTITY FULL;
Please note, the free postgresql on Aiven only gets 5GB disk. Even you are going to create few records in the database, enabling replica will take extra disk space and may lead to out of disk. In that case power off the database service then turn it on will remove some temporary files.
Add PostgreSQL Connector
You can add the Debezium connector via Redpanda Console. But the following command line just works fine:
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" \
localhost:8083/connectors \
-d '{"name":"pg-connector","config":{"connector.class": "io.debezium.connector.postgresql.PostgresConnector","publication.autocreate.mode": "filtered", "database.dbname": "defaultdb", "database.user": "avnadmin", "schema.include.list": "public", "database.port": "28851", "plugin.name": "pgoutput", "database.sslmode": "require", "topic.prefix": "doc", "database.hostname": "xyz.aivencloud.com", "database.password": "***", "table.include.list": "public.dim_products,public.orders"}}'
This will add a new connector with the name pg-connector, connecting to a PostgreSQL database in a remote server (in this case, Aiven Cloud). A few notes to the configuration items:
publication.autocreate.mode is filtered, and table.include.list is a list of tables you want to apply CDC. Sometimes you don't have permissions to enable publications for all tables, so filtered is recommended.
plugin.name is pgoutput. This works well with new versions of PostgreSQL (10+). The default value is decoderbufs, which is required to be installed separately.
topic.prefix is set to doc. As the name implies, it will be the prefix for Kafka topics. Since the schema is public, the topics to be used will be doc.public.dim_products and doc.public.orders
Make sure you create those 2 topics in Kafka/Redpanda. Then in a few seconds, new messages should be available in the topics.
You can try INSERT/UPDATE/DELETE data in PostgreSQL and check the generated JSON messages.
If you run the INSERT SQL, you will get data immediately in the Redpanda topic.
INSERT INTO dim_products ("product_id", "price") VALUES ('iPhone14', 799)
CDC message:
{
"before": null,
"after": {
"product_id": "iPhone14",
"price": 799
},
"source": {
},
"op": "c",
"ts_ms": 1682217357439,
"transaction": null
}
If you run a DELETE SQL, the CDC message would be:
{
"before": {
"product_id": "iPhone14",
"price": 800
},
"after": null,
"source": {
},
"op": "d",
"ts_ms": 1682217291411,
"transaction": null
}
Debezium also reads all existing rows and generates messages like this:
{
"before": null,
"after": {
"product_id": "iPhone14",
"price": 799
},
"source": {
},
"op": "r",
"ts_ms": 1682217357439,
"transaction": null
}
In short, the `op` attribute captures the operation type:
r means read
c means create
d means delete
u means update
Now you have to setup PostgreSQL+Debezium+Kafka Connect+Redpanda to get the latest database changes. Let's close the loop by showing the latest changes in Timeplus dashboards. The goal is whenever there is a price change, or a new order, your dashboard can reflect the change in subseconds.
Stream Processing in Timeplus
In Timeplus, go to the Sources page and create a new Redpanda source:
Simply enter the broker URL and authentication details.
Create a stream for saving such raw CDC data from Redpanda/Debezium, for example `rawcdc_dim_products`.
Next, you need to create a special stream in the changelog_kv mode to merge mutations of the events with the same primary key:
Create a sink to run the transformation from Debezium CDC messages to the Timeplus format:
select 1::int as _tp_delta, after:product_id as product_id, after:price::float as price, cast(ts_ms::string,'datetime64(3, \'UTC\')') as _tp_time from rawcdc_dim_products where op in ('c','r')
union
select -1::int as _tp_delta, before:product_id as product_id, before:price::float as price, cast(ts_ms::string,'datetime64(3, \'UTC\')') as _tp_time from rawcdc_dim_products where op='d'
union
select -1::int as _tp_delta, before:product_id as product_id, before:price::float as price, cast(ts_ms::string,'datetime64(3, \'UTC\')') as _tp_time from rawcdc_dim_products where op='u'
union
select 1::int as _tp_delta, after:product_id as product_id, after:price::float as price, cast(ts_ms::string,'datetime64(3, \'UTC\')') as _tp_time from rawcdc_dim_products where op='u'
Timeplus' built-in data lineage view reveals the relationships for the sources, streams and sinks.
To demonstrate the live CDC system, you can create a dashboard in Timeplus with a few streaming SQL, such as `select * from dim_products`
Let's say that initially there is only 1 row for iPhone14.
After running `INSERT INTO dim_products("product_id","price") VALUES('iPhone14Plus',899)`, you can immediate see the new row in dashboard without refreshing the page.
You can also run aggregations and JOINs to make good use of the CDC change. For example, when a new order arrives, you can get the latest total revenue of the day. The order can be updated or cancelled, or the price can be updated. Whenever there is a data change, you always get the up-to-date results. That's the purpose of CDC, to capture data change.
In summary, I briefly introduced in this blog what CDC is, why you need it, and how to use it in a real-time online store scenarios. Check out the demo video on Youtube or our documentation for more details. Feel free to share your questions or feedback on our Community Slack.