top of page
Writer's pictureJove Zhong

Introducing Mutable Streams

In the newly-released Timeplus Enterprise v2.4, we have introduced a new type of stream called “Mutable Stream”. As the name implies, data in such streams are mutable, with previous values being replaced by primary keys. In the early days when we implemented this feature, we called this “KV Stream” or “Key-Value Stream”. You probably heard of the “The 1 Billion Row Challenge” (1brc). With the new mutable streams, Timeplus can handle data with millions or even billions of unique keys for high performance filter queries, as well as high throughput UPSERTs. Why did we introduce the mutable stream? How is this different from the default stream based on columnar storage? When should you choose row-based storage or columnar storage? Read on to find out.


Append-Only Stream and Columnar Storage


Most OLAP databases and streaming processors are optimized for append-only operations, because the main operation is acting on events arriving right away and transforming or calculating results on them. As such, an append-only data structure works well in these use cases.

image credit: https://substack.com/@arriqaaq/p-98433630

You can send new data one by one sequentially to the system. But more commonly, data is batched together to optimize for higher throughput on the client side and improve resource utilization on the server side by avoiding merging of smaller files.

image credit: https://clickhouse.com/blog/common-getting-started-issues-with-clickhouse

With the knowledge that the data is append-only and not requiring frequent updates or deletes, it is (relatively) easy to optimize the storage, to build indexes and to improve query performance for streaming use cases. For example, to optimize the query for `SELECT COUNT(*) FROM t`, an OLAP database can just maintain a file to track the number of rows. Whenever there is new data added, the number in such a file will be updated. Then the user asks how many rows in the table, the database can return the number in the file, without counting row numbers. 


Many modern OLAP systems choose columnar formats in their file layout. Data from each column is stored independently and as such certain queries make the data access very efficient.

image credit: https://datacadamia.com/data/type/relation/structure/column_store

Imagine you built a table with 500 columns and run a query to get minimum and maximum values for 5 of those columns:

SELECT min(c1), max(c2), avg(c3), sum(c4), count(c5) FROM t

With the columnar storage, the database only needs to fetch 1% (5 columns vs. 500 columns) of the data from the file system. This is very quick and eliminates resource usage.

Another benefit of columnar-storage is to apply highly-optimized data compression for each column. For example, there is a string column: state to collect which state of the USA the person is living in, such as CA, WA. A columnar database can automatically convert those string values to an int8 internally. This could save more disk space compared to data with gzip or LZ4 compression.


There are other benefits for columnar-storage, such as leveraging modern CPUs’ SIMD (Single Instruction, Multiple Data) feature to process multiple columns’ data in parallel. When you run sum(col) for a large table with millions of rows, the database can start multiple threads to calculate the sum for the values of the column, and merge the intermediate results together to return the final value.


At Timeplus, we provide a unified data storage abstraction to support both stream processing and historical queries.


  • When you push data to Timeplus or Timeplus copies data from Kafka, data is internally put into the Write-Ahead-Log (WAL). Similar to Apache Kafka, the WAL is designed for append-only data. To improve performance, Timeplus converts raw data from outside (e.g. JSON) and into an internal format called TDF (Timeplus Data Format) to minimize the serialization and deserialization overhead for the data processing at different steps in memory and on disk.

  • From the WAL append-only storage, the data is written to the Columnar Storage automatically, by a background thread by Timeplus. Timeplus leverages the open source ClickHouse libraries for efficient columnar storage and indexes, and builds on top of it a performant stream processing and query engine. 


Timeplus supports both streaming queries and historical queries.

  • When you run a streaming query (e.g. select .. from my_stream), the most recent new data is served from the streaming WAL and continues to be served until the query is terminated. 

  • When you run a historical query (e.g. select .. from table(my_stream)), the query is a one-off query and as such leverages the columnar storage to filter and aggregate the data to calculate the result efficiently.

  • You can also run a streaming query with historical data backfill  (e.g. select .. from my_stream where _tp_time>now()-14d ). In that case, Timeplus will load data from the columnar storage, as well as the latest data in the WAL to serve up consistent results from the stream. 


This makes sense in principle and actually works rather well. However, there is a change in paradigm required from the usual patterns of access developers are accustomed to.


 

CRUD

OLTP databases such as Postgres are specifically designed and optimized for transactional INSERT, UPDATE, DELETE, even READ (when one user is writing or updating a batch of new data, before that transaction is committed and reported successful, the other users are not supposed to read those data).


In the context of stream processing or real-time analytics, updating or deleting certain types of data (for example reference data) are not uncommon. For example, the Uber ride rates change during rush hour. For the stock or cryptocurrency markets, the symbol prices can also change multiple times a second based on bid and ask prices. If you want to get the current portfolio market value, you have to handle data updates or even deletes.


Like we just mentioned, most of OLAP databases and streaming processors are not optimized for mutable data. Taking ClickHouse as an example, you have a few options to update or delete data, but each with caveats:

  • `ALTER TABLE .. UPDATE` is a heavy operation and designed for infrequent use. It is asynchronous by default. This means that the update occurs in the background and you will not get an instant effect on the table. 

  • If you run `SET mutations_sync = 1 before ALTER TABLE .. UPDATE` then the update query will wait for the update mutation to complete. It may take a couple of seconds.

  • In ClickHouse Cloud, a new Lightweight Update feature has been introduced. You can run `SET apply_mutations_on_fly = 1` before `ALTER TABLE .. UPDATE` which causes updated rows to be marked as updated immediately such that subsequent SELECT queries will return those values. However, update mutations are still running in the background. This is designed for updating a small amount of data and not for a streaming use case where a lot of data could be updated.

  • ClickHouse also provides data structures such as CollapsingMergeTree and ReplacingMergeTree if you want to update individual rows frequently. However you have to set up extra columns and expressions. The deduplication runs in the background in order to save space, but it does not guarantee the absence of duplicates. Results may also be served based on stale data until the background compaction takes place. 


At Timeplus, we are well aware of the mutable data needs in certain use cases and the related technical challenges.  More than 1 year ago, we introduced the Versioned Stream and Changelog Stream.



We were happy with the positive user feedback for such advanced stream modes. However, similar to the CollapsingMergeTree and ReplacingMergeTree in ClickHouse, those “mutable streams” in Timeplus require extra columns, for example _tp_delta (1 for new data, -1 for deleted data) and an optional version column. This adds significant complexity to the easy-of-use and challenges to integrate with systems like Debezium.  Lastly, Versioned Stream and Changelog Stream were still backed by columnar storage. It’s a double-edged sword as while running aggregation queries on columnar storage is lightning fast, filtering data with a particular primary key, or a time range, or marking data as deleted did not meet our performance expectations.


With the increasing customer demand for high frequency data mutations and point/range queries, we kept pushing hard to see whether we could build something better with a much better user experience and performance. We bit the bullet and asked: why not just build a true Key-Value stream backed by a row-based historical storage format. 


Meet Mutable Stream, with Row-Based Storage

After many iterations of development and user feedback, we built the new mutable stream in Timeplus Enterprise 2.4, using the popular RocksDB open-source C++ library. It is an LSM-tree based storage engine that provides key-value store APIs. It was originally developed by Facebook/Meta and was based on LevelDB. We will talk more about the design and implementation details of these two systems in a future blog.

image credit: https://www.creativcoder.dev/blog/what-is-a-lsm-tree

Compared to our previous Versioned and Changelog Streams, Mutable Streams are performant, cluster-ready and easy-to-use.

CREATE MUTABLE STREAM device_metrics
(
  device_id string,
  timestamp datetime64(3),
  batch_id uint32,
  region string,
  city string,
  lat float32,
  lon float32,
  battery float32,
  humidity uint16,
  temperature float32
)
PRIMARY KEY (device_id, timestamp, batch_id)

Note:


  1. The compound primary key is a combination of device_id, timestamp and the batch_id. Data inserted with exactly the same combination will be overridden.

  2. Searching data with any prefix combinations in the primary key is very fast. For example, searching by device_id or device_id+timestamp will be very fast. 

  3. By default there is only 1 shard and no secondary index or optimization.


To add new rows or update rows, one can simply insert data to the mutable stream. As long as the primary key(s) are identical, newer data will overwrite the previous data for the respective key.


For example, let’s use the following SQL to add 50 million rows, with some randomization to generate duplicate primary keys.


Creating a mutable stream can be as easy as using the following SQL:

INSERT INTO device_metrics
SELECT
    'device_' || to_string(floor(rand_uniform(0, 2400))) AS device_id,
    now64(9) AS timestamp,
    floor(rand_uniform(0, 50)) AS batch_id,
    'region_'||to_string(rand()%5) AS region,
    'city_'||to_string(rand()%10) AS city,
    rand()%1000/10 AS lat,
    rand()%1000/10 AS lon,
    rand_uniform(0,100) AS battery,
    floor(rand_uniform(0,80)) AS humidity,
    rand_uniform(0,100) AS temperature,
    now64() AS _tp_time
FROM numbers(50_000_000)

Depending on your hardware and server configuration, it may take a few seconds to add all data. In my case, it only took 11 seconds to process those 50 millions rows.


0 rows in set. Elapsed: 11.532 sec. Processed 50.00 million rows, 400.00 MB (4.34 million rows/s., 34.69 MB/s.)

When you query the mutable stream, Timeplus will read all historical data without any duplicated primary key.

SELECT count() FROM table(device_metrics);
┌─count()─┐
│  120000 │
└─────────┘
1 row in set. Elapsed: 0.092 sec.

You can filter data efficiently with any prefix of the primary key:


SELECT count() FROM table(device_metrics) WHERE batch_id=5;
┌─count()─┐
│    2400 │
└─────────┘
1 row in set. Elapsed: 0.078 sec. Processed 120.00 thousand rows, 480.00 KB (1.54 million rows/s., 6.15 MB/s.)

Another example:

SELECT * FROM table(device_metrics) WHERE device_id='device_1' AND timestamp>now()-1h;
┌─device_id─┬───────────────timestamp─┬─batch_id─┬─region───┬─city───┬──lat─┬──lon─┬───battery─┬─humidity─┬─temperature─┬────────────────_tp_time─┐
│ device_1  │ 2024-07-10 11:38:14.878 │        0 │ region_1 │ city_1 │ 21.1 │ 21.1 │  81.35298 │       41 │    81.35298 │ 2024-07-10 11:38:14.880 │
│ device_1  │ 2024-07-10 11:38:14.878 │       49 │ region_3 │ city_3 │ 33.3 │ 33.3 │ 62.507397 │       79 │   62.507397 │ 2024-07-10 11:38:14.880 │
└───────────┴─────────────────────────┴──────────┴──────────┴────────┴──────┴──────┴───────────┴──────────┴─────────────┴─────────────────────────┘
50 rows in set. Elapsed: 0.015 sec.

Secondary Index


For non-primary key columns, if they are frequently filtered, you can also define secondary indexes for them in the data definition for certain queries.


For example:

CREATE MUTABLE STREAM device_metrics
(
  device_id string,
  timestamp datetime64(3),
  batch_id uint32,
  region string,
  city string,
  ..
  index sidx1 (region)
  index sidx2 (city)
)
PRIMARY KEY (device_id, timestamp, batch_id)

When you query data with filters on those columns, Timeplus will automatically leverage the secondary indexes to improve query performance.


Column Family


For One-Big-Table (OBT) or extra wide tables with dozens or even hundreds of columns, it's not recommended to retrieve all columns’ data via SELECT * FROM .. as this will require more disk usage and can be slow. 


It is more common that users need to query a subset of the columns.. For those columns which are commonly queried together frequently, you can define column families to group them, so that data for those columns will be saved together and as such be quicker to retrieve in projection queries. Properly defining column families can optimize the disk i/o and avoid reading unnecessary data files.

image credit: https://www.scylladb.com/glossary/cassandra-column-family/

Please note that one column can only appear in up to one column family. The columns in the primary keys are in a special column family. There should be no overlapping for the column families or primary keys.


Taking the previous device_metrics as an example, the lat and lon are commonly queried together. You can define a column family for them.

CREATE MUTABLE STREAM device_metrics
(
  device_id string,
  timestamp datetime64(3),
  batch_id uint32,
  region string,
  city string,
  lat float32,
  lon float32,
  battery float32,
  humidity uint16,
  temperature float32,
  FAMILY cf1 (lat,lon)
)
PRIMARY KEY (device_id, timestamp, batch_id)

There are many more cool features such as multi-sharding, parallel keyspace scan and how it works with our Multi-Raft cluster. Please check the docs for more details or join our slack for discussions. 


Streaming SQL on Mutable Streams


Mutable streams work in both historical query and streaming query modes. 


When you run SELECT … FROM table(mutable_stream) .. This will scan the historical data without any duplication. You can run such queries every second or even more frequently, the data is always up-to-date.


You can also query the mutable streams in a streaming SQL application, so that whenever there is new data or updated data in the mutable stream, Timeplus SQL engine will automatically process that data and serve the latest results to the app in real-time. For example this is a streaming aggregation over the mutable stream:

SELECT device_id, avg(temperature), min(battery)
FROM device_metrics GROUP BY device_id

Preliminary Performance Numbers


Both our engineering team and our external users have conducted some preliminary performance tests on the new Mutable Stream. In many cases, it is way faster than the ClickHouse ReplacingMergeTree. In some scenarios, using ClickHouse will hit memory issues and not even return while Timeplus can return the result subsecond.

Query response time in second (smaller is better) Testbed: AWS EC2 c5a.4xlarge: 16c/32G, GP3 EBS. Contact us for more information

In one of our internal tests, on a 20c32G physical machine, we created a mutable stream with 58 columns, 8 secondary indexes, and the primary key with 3 columns. Then we loaded 20 million rows.


Sample query:

SELECT * FROM table(test_stream) WHERE 
  (algoid = 's_1-algo-20230202-000000-000-0000000-168.11.224.1') AND 
  (createdate >= '1675267200000') AND (createdate <= '1682985535000') 
  AND (ordstatus IN ('1', '2')) AND (source = 'O32') 
  AND (userid = '53') AND (securityid IN ('601998', 'IF602816'))

Whether using 3 shards or more, when there is only 1 query to scan the whole data set, most of the queries can return within 0.05 seconds.


When running 200 queries concurrently, most queries can still return within 2 seconds which is reasonable under such loads.

What’s Next


With the new mutable streams in Timeplus Enterprise, you can filter and aggregate massive amounts of streaming data at real-time, even rapidly updating data. You can also use the mutable streams to only keep the latest update to records. Combined with Timeplus materialized views, external streams for Apache Kafka or another Timeplus deployment, you may implement more streaming analytics use cases, just with SQL.

Feel free to try Timeplus Enterprise yourself or join our community slack for more discussions.

bottom of page