top of page
Writer's pictureJove Zhong

CDC in Action, with Debezium and Timeplus

A beginner's guide for Change Data Capture (CDC)


You've probably heard the term "CDC" a lot nowadays. Sometimes it refers to the US Federal CDC (Center for Disease Control and Prevention). Sometimes it refers to the technology that synchronizes data in real-time (Change Data Capture).


I personally heard about CDC and Debezium 7 years ago, when I was at Splunk, managing the engineering team for DB Connect app. One of the key features of this app was db-input, which loads data from external databases into Splunk. A common practice back then (and even today) is to run SQL, `SELECT * FROM table WHERE timestamp>?` at a certain interval via a JDBC driver, to get the new data since the last sync. However, this solution assumes the data is append-only; UPDATE or DELETE cannot be detected. It will also impact the performance of the target database, and may miss some data. One day, our lead engineer mentioned the term "CDC" and the odd name "Debezium". That was back in 2016, in the same year Red Hat released the 0.1 version.


For anyone working in or about to work in the data/analytics world, CDC is such an important concept that you cannot miss. There are many good resources online, and Debezium provides comprehensive documentation. But I hope this blog can help beginners quickly understand what CDC is, why use CDC, and how to get their hands dirty and see it in action.


This is not meant to be a definitive guide for CDC, and I don't plan to use ChatGPT or even Google to provide lengthy content that you can easily get elsewhere.


Let's dive in now.


 

CDC in a nutshell


In a nutshell, here are key facts that you need to know about CDC:

  • Just like REST (or the old-school SOA), CDC refers to an "idea" to synchronize data. It's not a particular project or software from certain vendors.

  • The main "idea" of CDC is to have the database continuously broadcast what has been changed, in a binary format, so that the "subscribers" can get real-time notifications for the changes – not just INSERT, but also UPDATE, DELETE or even ALTER. To capture what has been in the database before the CDC process is set up, it can usually take a snapshot for existing data.

  • Main benefits of CDC:

    • Fast: gets real-time events, instead of pulling data at certain intervals

    • Complete: not just INSERT, but also UPDATE, DELETE and ALTER

    • Independent: analytics system should not keep querying the transactional databases, for both security and performance reasons

  • Different databases can provide different ways to "announce" such data changes. For example, some CDC clients act as read replicas to join the cluster, or some CDC clients receive the binlog or WAL from the main database. Some systems even allow you to put webhook URLs to receive the data changes.

  • Popular open source CDC are: Debezium, Maxwell, Estuary, etc.

  • There is no universal common schema for CDC data. Each CDC software can design its own format, and even for the same tool, the data structure for different databases can be different.

  • Since CDC is all about real-time database changes, the data is usually continuously put into a data streaming platform, such as Apache Kafka, Redpanda, or Apache Pulsar, then processed by streaming analytics systems, such as Flink, Spark, or Timeplus.


Here is a sample CDC data, generated by Debezium 2.2, from a UPDATE in a postgresql database.


{
    "before": {
        "product_id": "iPhone14",
        "price": 799
    },
    "after": {
        "product_id": "iPhone14",
        "price": 800
    },
    "source": {
        "version": "2.2.0.Final",
        "connector": "postgresql",
        "name": "doc",
        "ts_ms": 1682217416146,
        "snapshot": "false",
        "db": "defaultdb",
        "sequence": "[\"385876616\",\"385877064\"]",
        "schema": "public",
        "table": "dim_products",
        "txId": 1335,
        "lsn": 385877064,
        "xmin": null
    },
    "op": "u",
    "ts_ms": 1682217416358,
    "transaction": null
}
 

Okay, I think that's enough for CDC 101. Let's get our hands dirty by setting a simple system to show live database changes in dashboards. For visual learners, you can also check out the YouTube video below:



 

There are many tutorials using Docker Compose to set up everything on your laptop. You can try something different, by minimizing the number of local containers and leveraging free cloud services.


In this example, I set up a free postgresql on Aiven, a free trial of Redpanda Cloud as Kafka alternative, and a free streaming analytics workspace on Timeplus Cloud.

On my laptop, all I need is a Docker container for Debezium 2.2, which you can run with a single command:

docker run -d -it --rm --name connect \
-p 8083:8083 -e GROUP_ID=1 \
-e CONFIG_STORAGE_TOPIC=my_connect_configs \
-e OFFSET_STORAGE_TOPIC=my_connect_offsets \
-e STATUS_STORAGE_TOPIC=my_connect_statuses \
-e BOOTSTRAP_SERVERS=redpanda-or-kafka:9092 \
quay.io/debezium/connect:2.2.0.Final

This docker image is actually a Kafka Connect standalone server with built-in Debezium connectors. It needs to connect to the Kafka/Redpanda brokers to write CDC data and store the config/offset/status for the connectors.


I also set up a Redpanda Console locally. It's a nice web app to manage topics and connectors on Redpanda and Kafka Connect. You can of course do everything with curl, but why not give their friendly UI a try?


Here is the docker-compose.yml

version: "3.7"
services:
  connect:
    image: quay.io/debezium/connect:2.2.0.Final
    container_name: connect
    ports:
      - 8083:8083
    environment:
      - GROUP_ID=1
      - CONFIG_STORAGE_TOPIC=my_connect_configs
      - OFFSET_STORAGE_TOPIC=my_connect_offsets
      - STATUS_STORAGE_TOPIC=my_connect_statuses
      - BOOTSTRAP_SERVERS=${BOOTSTRAP_SERVERS}
      # CONNECT_ properties are for the Connect worker
      - CONNECT_BOOTSTRAP_SERVERS=${BOOTSTRAP_SERVERS}
      - CONNECT_SECURITY_PROTOCOL=${SECURITY_PROTOCOL}
      - CONNECT_SASL_JAAS_CONFIG=${SASL_JAAS_CONFIG}
      - CONNECT_SASL_MECHANISM=SCRAM-SHA-512
      - CONNECT_PRODUCER_SECURITY_PROTOCOL=${SECURITY_PROTOCOL}
      - CONNECT_PRODUCER_SASL_JAAS_CONFIG=${SASL_JAAS_CONFIG}
      - CONNECT_PRODUCER_SASL_MECHANISM=SCRAM-SHA-512
      - CONNECT_CONSUMER_SECURITY_PROTOCOL=${SECURITY_PROTOCOL}
      - CONNECT_CONSUMER_SASL_JAAS_CONFIG=${SASL_JAAS_CONFIG}
      - CONNECT_CONSUMER_SASL_MECHANISM=SCRAM-SHA-512
      - CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE=false
      - CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE=false
  console:
    image: docker.redpanda.com/redpandadata/console:v2.2.3
    container_name: redpanda_console
    depends_on:
      - connect
    ports:
      - 8080:8080
    entrypoint: /bin/sh
    command: -c "echo \"$$CONSOLE_CONFIG_FILE\" > /tmp/config.yml; /app/console"
    environment:
      CONFIG_FILEPATH: /tmp/config.yml
      CONSOLE_CONFIG_FILE: |
        kafka:
          brokers: ${BOOTSTRAP_SERVERS}
          sasl:
            enabled: true
            username: ${SASL_USERNAME}
            password: ${SASL_PASSWORD}
            mechanism: SCRAM-SHA-512
          tls:
            enabled: true
        connect:
          enabled: true
          clusters:
            - name: local-connect-cluster
              url: http://connect:8083

My Redpanda Cloud endpoint is secured with SCRAM-SHA-512. There are some repeated settings in the Debezium configuration. I created a .env file in the same folder to minimize duplication.

BOOTSTRAP_SERVERS=seed-1234.fmc.prd.cloud.redpanda.com:9092
SASL_USERNAME=theuser
SASL_PASSWORD=thepassword
SASL_JAAS_CONFIG=org.apache.kafka.common.security.scram.ScramLoginModule required username="${SASL_USERNAME}" password="${SASL_PASSWORD}";
SECURITY_PROTOCOL=SASL_SSL

Next, start the stack via `docker-compose up`. It will take a while to load images for the first time. If you've configured your Redpanda correctly, you can see the topic list on http://localhost:8080/topics/


Setup PostgreSQL

Now, let's set up the postgresql.


You can get the database connection information from Aiven management console. The database is called `defaultdb`. Use any SQL client (TablePlus is a nice one) to connect to the database and run the following SQL to create 2 tables:

CREATE TABLE dim_products(
  product_id VARCHAR PRIMARY KEY,
  price FLOAT
);
CREATE TABLE orders(
  order_id serial PRIMARY KEY,
  product_id varchar,
  quantity int8 DEFAULT 1,
  timestamp timestamp DEFAULT NOW()
);

In order to capture the before values for UPDATE or DELETE, you also need to set REPLICA IDENTIFY to FULL. Refer to the Debezium documentation for more details.

ALTER TABLE dim_products REPLICA IDENTITY FULL;
ALTER TABLE orders REPLICA IDENTITY FULL;

Please note, the free postgresql on Aiven only gets 5GB disk. Even you are going to create few records in the database, enabling replica will take extra disk space and may lead to out of disk. In that case power off the database service then turn it on will remove some temporary files.


Add PostgreSQL Connector


You can add the Debezium connector via Redpanda Console. But the following command line just works fine:

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" \
localhost:8083/connectors \
-d '{"name":"pg-connector","config":{"connector.class": "io.debezium.connector.postgresql.PostgresConnector","publication.autocreate.mode": "filtered",  "database.dbname": "defaultdb",  "database.user": "avnadmin",  "schema.include.list": "public",  "database.port": "28851",  "plugin.name": "pgoutput",  "database.sslmode": "require",  "topic.prefix": "doc",  "database.hostname": "xyz.aivencloud.com",  "database.password": "***",  "table.include.list": "public.dim_products,public.orders"}}'

This will add a new connector with the name pg-connector, connecting to a PostgreSQL database in a remote server (in this case, Aiven Cloud). A few notes to the configuration items:

  • publication.autocreate.mode is filtered, and table.include.list is a list of tables you want to apply CDC. Sometimes you don't have permissions to enable publications for all tables, so filtered is recommended.

  • plugin.name is pgoutput. This works well with new versions of PostgreSQL (10+). The default value is decoderbufs, which is required to be installed separately.

  • topic.prefix is set to doc. As the name implies, it will be the prefix for Kafka topics. Since the schema is public, the topics to be used will be doc.public.dim_products and doc.public.orders

Make sure you create those 2 topics in Kafka/Redpanda. Then in a few seconds, new messages should be available in the topics.


You can try INSERT/UPDATE/DELETE data in PostgreSQL and check the generated JSON messages.


If you run the INSERT SQL, you will get data immediately in the Redpanda topic.

INSERT INTO dim_products ("product_id", "price") VALUES ('iPhone14', 799)

CDC message:

{
    "before": null,
    "after": {
        "product_id": "iPhone14",
        "price": 799
    },
    "source": {
    },
    "op": "c",
    "ts_ms": 1682217357439,
    "transaction": null
}

If you run a DELETE SQL, the CDC message would be:

{
    "before": {
        "product_id": "iPhone14",
        "price": 800
    },
    "after": null,
    "source": {
    },
    "op": "d",
    "ts_ms": 1682217291411,
    "transaction": null
}

Debezium also reads all existing rows and generates messages like this:

{
    "before": null,
    "after": {
        "product_id": "iPhone14",
        "price": 799
    },
    "source": {
    },
    "op": "r",
    "ts_ms": 1682217357439,
    "transaction": null
}

In short, the `op` attribute captures the operation type:

  • r means read

  • c means create

  • d means delete

  • u means update

Now you have to setup PostgreSQL+Debezium+Kafka Connect+Redpanda to get the latest database changes. Let's close the loop by showing the latest changes in Timeplus dashboards. The goal is whenever there is a price change, or a new order, your dashboard can reflect the change in subseconds.


Stream Processing in Timeplus


In Timeplus, go to the Sources page and create a new Redpanda source:

Simply enter the broker URL and authentication details.

Create a stream for saving such raw CDC data from Redpanda/Debezium, for example `rawcdc_dim_products`.


Next, you need to create a special stream in the changelog_kv mode to merge mutations of the events with the same primary key:

Create a sink to run the transformation from Debezium CDC messages to the Timeplus format:


select 1::int as _tp_delta, after:product_id as product_id, after:price::float as price, cast(ts_ms::string,'datetime64(3, \'UTC\')') as _tp_time from rawcdc_dim_products where op in ('c','r')
union
select -1::int as _tp_delta, before:product_id as product_id, before:price::float as price, cast(ts_ms::string,'datetime64(3, \'UTC\')') as _tp_time from rawcdc_dim_products where op='d'
union
select -1::int as _tp_delta, before:product_id as product_id, before:price::float as price, cast(ts_ms::string,'datetime64(3, \'UTC\')') as _tp_time from rawcdc_dim_products where op='u'
union 
select 1::int as _tp_delta, after:product_id as product_id, after:price::float as price, cast(ts_ms::string,'datetime64(3, \'UTC\')') as _tp_time from rawcdc_dim_products where op='u'      

Timeplus' built-in data lineage view reveals the relationships for the sources, streams and sinks.


To demonstrate the live CDC system, you can create a dashboard in Timeplus with a few streaming SQL, such as `select * from dim_products`


Let's say that initially there is only 1 row for iPhone14.

After running `INSERT INTO dim_products("product_id","price") VALUES('iPhone14Plus',899)`, you can immediate see the new row in dashboard without refreshing the page.


You can also run aggregations and JOINs to make good use of the CDC change. For example, when a new order arrives, you can get the latest total revenue of the day. The order can be updated or cancelled, or the price can be updated. Whenever there is a data change, you always get the up-to-date results. That's the purpose of CDC, to capture data change.


 

In summary, I briefly introduced in this blog what CDC is, why you need it, and how to use it in a real-time online store scenarios. Check out the demo video on Youtube or our documentation for more details. Feel free to share your questions or feedback on our Community Slack.



bottom of page