At Timeplus, our mission is to provide developers with an easy-to-use tool for stream processing. Stream processing is challenging due to the inherent nature of continuously real-time, unbounded data streams with varying properties. Unlike traditional batch processing, where data is static and finite, stream processing deals with constantly arriving, often unordered, data in real time, requiring complex strategies to ensure timely, accurate, and scalable processing. Here are the some of key reasons why stream processing is not easy:
Unbounded Data
Real-Time Nature: Stream processing operates on data that arrives continuously and indefinitely, unlike batch processing which works with bounded data sets. This means that you cannot wait for all the data to arrive before starting processing, and you have to decide how to process partial data, handle state, and trigger results in real-time.
No End: Since data is constantly flowing, there’s no predefined end to the dataset. Managing the flow of data, especially when the data load spikes, requires robust scaling and partitioning mechanisms.
Out-of-Order Data
Late or Early Events: Events often arrive out of order due to network delays, time zone differences, or system lags. Stream processing must handle out-of-order data without losing accuracy, requiring techniques like watermarks to track event progress.
Correctness: Ensuring correctness when events are out of order is tricky. You need mechanisms to handle late-arriving events (e.g., updates to previously computed results), or risk producing inaccurate results.
Handling Event Time vs Processing Time
Event Time: This is the actual time when an event occurred, which may differ from the time it is processed (processing time). Stream processing needs to ensure computations are based on event time (for windowing, joins, etc.), while processing in real-time with often delayed or missing events.
Processing Time: Operations based purely on processing time may introduce incorrect results, as processing is dependent on system load, delays, and latencies.
State Management
Continuous State: Stream processing systems maintain state over time (e.g., counts, aggregations, or sessions). Managing the state for unbounded streams requires sophisticated state management strategies to ensure efficiency and fault tolerance.
Scalability: As streams grow, maintaining large state (e.g., user sessions or sliding window aggregations) becomes increasingly complex. Ensuring state is distributed and scalable across nodes is also crucial but non-trivial.
Durability and Consistency: Keeping the state consistent across failures or node restarts (e.g., via checkpoints) adds complexity to ensure exactly-once processing guarantees.
Windowing and Time-Based Operations
Dynamic Windows: Many real-time operations require grouping events into windows (e.g., tumbling or sliding windows) based on time. Managing windows over an unbounded stream, especially when events arrive late or out-of-order, adds complexity.
Triggering Logic: Determining when to trigger computation over windows (early, on time, or late) requires careful design. Missed or premature triggers can lead to inaccurate results.
There are lots more not listed here as well, such as fault tolerance, scalability, handling various data formats, support changing data schemas, debug and monitor, each of these deserves a dedicated blog for discussion. I am going to use a simple example to explain why streaming processing is so hard and sometimes confusing, monitoring whether there is a faulty IoT sensor using streaming SQL.
Assuming in an IoT monitoring scenario, there are sensors, sending data to an event streaming system, say Apache Kafka, and then using a streaming processing tool, say Timeplus, to process these sensor data in real-time.
The user wants to monitor if one of the sensors are faulty and are no longer sending data, as soon as it happens, the sensor needs to be replaced or fixed.
Let's check how to solve this problem using streaming processing and SQL. All the samples here are implemented using the open source streaming processing tool Proton.
First, create a random stream to simulate the sensors using the following DDL.
CREATE RANDOM STREAM device
(
`sensor` string DEFAULT ['A', 'B', 'C'][(int_div(rand(), 1000000000) % 3) + 1],
`temperature` float DEFAULT round(rand_normal(10, 0.5), 2)
)
SETTINGS eps = 1
A random stream is a very useful feature of Proton, which can be used for stream function test or simulation. (Refer to my previous blog, "Calculating Pi with Random Stream: A Monte Carlo Simulation Approach" to learn more)
There are three fields
sensor, the randomly generated name of the sensor from list A,B,C
temperature, the random simulated temperature value reported from the sensor, following normal distribution, with mean 10, and standard deviation 0.5, rounding with 2 digits
_tp_time, this is the default event time of the stream, when not specified, it will be the current time when event is ingested to Proton
The eps setting configures how many events generated per second, in this case, it will generate one random event per second.
5b8a720e3027 :) select * from device
SELECT
*
FROM
device
Query id: b1f2a3b9-52ac-4aeb-9bdb-c3abf72c4956
┌─sensor─┬─temperature─┬─────────────────_tp_time─┐
│ A │ 10.31 │ 2024-11-23 17:53:28.238Z │
└────────┴─────────────┴──────────────────────────┘
┌─sensor─┬─temperature─┬─────────────────_tp_time─┐
│ B │ 10.83 │ 2024-11-23 17:53:29.235Z │
└────────┴─────────────┴──────────────────────────┘
┌─sensor─┬─temperature─┬─────────────────_tp_time─┐
│ C │ 10.99 │ 2024-11-23 17:53:30.235Z │
└────────┴─────────────┴──────────────────────────┘
↖ Progress: 3.00 rows, 66.00 B (1.25 rows/s., 27.43 B/s.)
To simulate a stream that has problems with incoming data, we create a materialized view and use that materialized view to ingest the event into a target stream, when we delete the materialized view, we can simulate the stream is breaking for some reason.
-- target stream
CREATE STREAM device_reader
(
`sensor` string,
`temperature` float32
);
-- simulate get data from source to proton
CREATE MATERIALIZED VIEW mv_device_reader INTO device_reader
AS
SELECT * FROM device;
If user run SELECT * FROM device_reader, the query will return the events reported from the sensors in real-time. To monitor whether no data is reported, a straightforward solution is to monitor what is the throughput of this streaming by leveraging a tumble window based query like this:
SELECT
window_start, count(*) AS count
FROM
tumble(device_reader, 1s)
GROUP BY
window_start
5b8a720e3027 :) SELECT
window_start, count(*) AS count
FROM
tumble(device_reader, 1s)
GROUP BY
window_start
SELECT
window_start, count(*) AS count
FROM
tumble(device_reader, 1s)
GROUP BY
window_start
Query id: 4a73d045-b09d-48f4-a84e-d57bdae3f800
┌─────────────window_start─┬─count─┐
│ 2024-11-23 18:02:26.000Z │ 1 │
└──────────────────────────┴───────┘
┌─────────────window_start─┬─count─┐
│ 2024-11-23 18:02:27.000Z │ 1 │
└──────────────────────────┴───────┘
┌─────────────window_start─┬─count─┐
│ 2024-11-23 18:02:28.000Z │ 1 │
└──────────────────────────┴───────┘
┌─────────────window_start─┬─count─┐
│ 2024-11-23 18:02:29.000Z │ 1 │
└──────────────────────────┴───────┘
Using a count aggregation and a 1 second tumble window, above SQL reports how many events are received every second, which is a typical throughput metric as event per second (EPS), in this case, my simulated data source will report 1 event per second. So from the query, we can consistently get result 1 for every second in real-time.
One thing to mention is that above SQL uses ingest time to process the window, in real production, event time should be used. In this demonstration. We just assume the event time equals ingest time which is when the event is generated
Now, I'll stop the simulated data source by dropping the materialized view and observe what happened to the query.
When I drop the materialized view, there will be no more events sent to the device reader stream, my expectation is that the streaming SQL will report 0 eps so I can use this to decide whether the source is broken or not, as 0 eps means no data is reporting and something is broken.
While the actual result is that the query stopped at 2024-11-23 18:15:29.000Z and no more result is emitted from the query, what happened here, what streaming query stops reporting anything after the source is stopped?
To explain why this is happening, we need a deeper understanding of how streaming query works.
As I mentioned in the beginning, one of the key challenges is to handle out-out-order events. In the streaming processing world, each stream comes from different sources, and these sources do not share the same clock, like the multiverse in quantum physics (my favorite topic), each stream has its own time!
The streaming processing system has to honor the original time (event time) for each stream, so how does the streaming processing system know what is the time for each different stream? It gets it through observations!
To decide what is the time for a specific stream, the streaming processing system observes the event time from the incoming event, and updates the watermark. In case no new event comes, the watermark stops updating, the world is freezing now for the observed stream from the stream processing system’s view.
In stream processing, a watermark is a mechanism used to track the progress of event processing based on event time, which is often distinct from processing time.
A watermark is a timestamp that represents the point up to which the system believes it has received all relevant events in a stream, based on event time.
It's a threshold that the stream processing system uses to determine when it can safely assume no more events with earlier timestamps will arrive, allowing the system to proceed with window-based computations or trigger aggregations.
For the above sample, the last observed event has an event time 2024-11-23 18:15:29.000Z , by default watermark settings, watermark has been updated to 2024-11-23 18:15:29.000Z, it means the system assumes all events with a timestamp earlier than 2024-11-23 18:15:29.000Z have been received and can be processed.
Now the query is waiting the watermark to be updated to 2024-11-23 18:15:30.000Z, so it can close and emit the next tumble window, but unfortunately, as no new event is coming, that window will not close so no aggregation data will be emitted until new events push the watermark forward.
Now, we have a dilemma, we want to know if there is no event generated, but without a new event, no aggregation result will be there! This is a common design for streaming processing, if you try the same thing as Flink or RisingWave, most likely you will get the same result.
As an engineer, problem solving is my job, so I started to search for an alternative or workaround for this one with these known limitations.
In my previous blog, I talked about random stream, which is a long running randomly generated event stream. I was thinking that what if I combine/join a timely random stream event with the monitoring target stream, so I can push the watermark forward leveraging the random stream.
Let's create the following random stream that generates a trigger event per second:
CREATE RANDOM STREAM timer_1s
(
`name` string DEFAULT 'timer'
)
SETTINGS eps = 1
.
The initial idea was to run a left join the random timer stream and the target IoT stream.
The Join SQL should look something like the following:
WITH count_iot_1s AS
(
SELECT
window_start, count() AS count
FROM
tumble(device_reader, 1s)
GROUP BY
window_start
), timer AS
(
SELECT
to_datetime(_tp_time) AS time
FROM
timer_1s
)
SELECT
time, c.count
FROM
timer AS t
LEFT JOIN count_iot_1s AS c ON t.time = c.window_start
However, running the above query will return an error: Not implemented. Code: 48. DB::Exception: 'Append' LEFT ALL JOIN 'Append' is not supported
Why does Proton not support this? Let’s revisit those challenges I mentioned at the beginning. Due to the unbounded data, when we run such a left join of two streams, the stream processor needs to keep both streams in the memory so that when a new event comes, it can search which row to join. For a long running, unbounded stream, this state will eventually grow to a very big value (depending on the stream throughput and computation complexity). So there are alternatives to support stream joins like range join or as of join. Refer this https://docs.timeplus.com/joins#stream_stream_join for more information.
Even if left join is supported, another issue should be considered. How to handle the watermark for two different streams when joining two streams. As I mentioned, each stream has its own clock time, what to do when joining these two streams?
Several options are there:
Wait for the slower watermark: The system can delay the join operation until both streams have watermarks that allow the join to be processed. This approach ensures that all potential matching events are considered, but it introduces some latency.
Align watermarks using a buffer: In this approach, a buffer is maintained for each stream, and the system waits until the slower stream catches up with the faster stream’s watermark. Once both watermarks are aligned, the join can proceed. This requires memory for buffering the unaligned events.
Late arrival handling: If one stream is ahead (i.e., its watermark is more advanced), any events that arrive in the slower stream after the faster stream has moved past the watermark are considered late. In this case, you can handle these late events by:
discarding them.
joining them and producing updated results, but flagging them as late
That looks super complex and that is why streaming processing is hard!
Let's continue our exploration.
The Join solution seems to be a dead end now, what else can I do? I really want to solve this problem.
Since the problem is caused by a freezing watermark where no new event is coming, can we find a way to make the watermark progress forward?
Here comes the solution, we can create a timer stream, which will generate a new event every second, and then we can send both device read result and timer into one stream, with different labels, as in this new stream, the timer event will be always generated, it will help to move the watermark, and we only need to monitor if in specific window, whether these is event with ‘device’ label.
CREATE RANDOM STREAM timer
(
`time` datetime64 DEFAULT now64()
)
SETTINGS eps = 1;
-- target stream
CREATE STREAM device_reader
(
`label` string,
`sensor` string,
`temperature` float32
);
-- simulate get data from source to proton
CREATE MATERIALIZED VIEW mv_timer_reader INTO device_reader
AS
SELECT
'timer' AS label, '' AS sensor, 0 AS temperature
FROM
timer;
CREATE MATERIALIZED VIEW mv_device_reader INTO device_reader
AS
SELECT
'device' AS label, sensor, temperature
FROM
device;
Notes:
Timer stream is a random stream, with EPS 1 per second
Add a new field name ‘label’ which used to identify the event come from device or timer
mv_device_reader is still reading data from device, but adding label as ‘device’
mv_timer_reader put timer events into device reader, but adding label as ‘timer’, leave sensor and temperature as empty value
Now we can monitor if the ‘device’ label exists in the device reader to decide if the event source has new incoming data.
SELECT
window_start, group_uniq_array(label) AS unique_labels
FROM
tumble(device_reader, 3s)
GROUP BY
window_start;
ed0ef0e1dcca :) SELECT
window_start, group_uniq_array(label) as unique_labels
FROM
tumble(device_reader, 3s)
GROUP BY
window_start;
SELECT
window_start, group_uniq_array(label) AS unique_labels
FROM
tumble(device_reader, 3s)
GROUP BY
window_start
Query id: cfce5cb2-63ad-404b-ae6b-885fc23d0fbb
┌─────────────window_start─┬─unique_labels──────┐
│ 2024-11-26 22:46:09.000Z │ ['timer','device'] │
└──────────────────────────┴────────────────────┘
┌─────────────window_start─┬─unique_labels──────┐
│ 2024-11-26 22:46:12.000Z │ ['timer','device'] │
└──────────────────────────┴────────────────────┘
┌─────────────window_start─┬─unique_labels──────┐
│ 2024-11-26 22:46:15.000Z │ ['timer','device'] │
└──────────────────────────┴────────────────────┘
┌─────────────window_start─┬─unique_labels─┐
│ 2024-11-26 22:46:18.000Z │ ['timer'] │
└──────────────────────────┴───────────────┘
┌─────────────window_start─┬─unique_labels─┐
│ 2024-11-26 22:46:21.000Z │ ['timer'] │
└──────────────────────────┴───────────────┘
┌─────────────window_start─┬─unique_labels─┐
│ 2024-11-26 22:46:24.000Z │ ['timer'] │
└──────────────────────────┴───────────────┘
The above query result shows when the ‘mv_device_reader’ MV is dropped, the aggregation only contains events from the ‘timer’ and no label ‘device’ is there.
We can create an alert in case no ‘device’ label has been found in this 3 seconds window.
SELECT
window_start, group_uniq_array(label) as unique_labels
FROM
tumble(device_reader, 3s)
GROUP BY
window_start
HAVING index_of(unique_labels,'device') = 0;
User can also send these alerts to Apache Kafka through an external stream.
While this one works, it seems very complex. To monitor this stream, users have to create a timer and write both timer and observer target into one stream. Do we have other simpler solutions?
Fortunately, SQL is a powerful tool. In the latest Proton release , there is a new feature `EMIT PERIODIC time_interval REPEAT` to emit existing results for global aggregation, this can help to solve this problem.
A global aggregation starts the aggregation for all incoming events since the query is submitted, and never ends. Using global aggregation, we can count how many events have been ingested, and leverage lag functions, we can know the change of total count compared to last emit. In case, the count does not increase as expected, we know something is wrong with the stream.
Due to the same reason, following SQL won't work:
WITH accumulate_count AS (
SELECT count(*) AS count FROM device_reader
)
SELECT count, lag(count) AS previous_count FROM accumulate_count
Due to the watermark not being updated after the stream is breaking, the accumulate_count query in above SQL will stop emitting, so users can not call this query to monitor if the stream is broken or not.
By adding `EMIT PERIODIC time_interval REPEAT` , we force the global aggregation to emit data even if the watermark is not updating.
WITH accumulate_count AS (
SELECT count(*) AS count FROM device_reader EMIT PERIODIC 2s REPEAT
)
SELECT count, lag(count) AS previous_count FROM accumulate_count
5b8a720e3027 :) WITH accumulate_count AS (
SELECT count(*) AS count FROM device_reader EMIT PERIODIC 2s REPEAT
)
SELECT count, lag(count) AS previous_count FROM accumulate_count
WITH accumulate_count AS
(
SELECT
count(*) AS count
FROM
device_reader
EMIT STREAM PERIODIC 2s REPEAT
)
SELECT
count, lag(count) AS previous_count
FROM
accumulate_count
Query id: 1a2a8347-cc58-44b3-afdb-9d01865a703a
┌─count─┬─previous_count─┐
│ 2 │ 0 │
└───────┴────────────────┘
┌─count─┬─previous_count─┐
│ 4 │ 2 │
└───────┴────────────────┘
┌─count─┬─previous_count─┐
│ 6 │ 4 │
└───────┴────────────────┘
┌─count─┬─previous_count─┐
│ 8 │ 6 │
└───────┴────────────────┘
┌─count─┬─previous_count─┐
│ 10 │ 8 │
└───────┴────────────────┘
┌─count─┬─previous_count─┐
│ 11 │ 10 │
└───────┴────────────────┘
┌─count─┬─previous_count─┐
│ 11 │ 11 │
└───────┴────────────────┘
┌─count─┬─previous_count─┐
│ 11 │ 11 │
└───────┴────────────────┘
┌─count─┬─previous_count─┐
│ 11 │ 11 │
└───────┴────────────────┘
From the query result we can see that, when the MV is dropped, the aggregation query has current count equals previous count, which means the stream is stopping getting new data.
Users can create an alert that only returns when such a condition is met.
WITH accumulate_count AS (
SELECT count(*) AS count FROM device_reader EMIT PERIODIC 2s REPEAT
)
SELECT count, lag(count) AS previous_count FROM accumulate_count
WHERE count = previous_count
Summary
In today’s blog, I have demonstrated how to leverage streaming SQL to monitor a data source like a IoT sensor that stops working, which seems to be a simple job but actually complex. We explored the reason why streaming processing is hard when handling unbounded data, late and unordered events, complex state and why watermark based solutions can cause some tricky problems.
You can run my sample from Proton github repo.
Timeplus,a real-time data processing platform built on top of Proton, the mission is to provide developers with an easy to use streaming processing tool. I am happy if you have questions , comments, or suggestions regarding what we discussed today. Want to try these features, check out open source Proton, timeplus.com for a cloud trial or deploy your own On-Prem version of timeplus enterprise.