top of page
Writer's pictureGang Tao

Empowering Real-Time Machine Learning through Streaming Data Platforms

A hands-on guide to building real-time fraud detection using Timeplus



Machine learning is going real-time


The transition of machine learning into real-time represents a natural progression fueled by two key factors. Firstly, customers seek immediate decision-making capabilities, particularly in applications such as anomaly detection and personalized recommendations. Real-time prediction facilitates swift responses to evolving conditions, enhancing overall user experiences. Secondly, advancements in data infrastructure now support the processing of streaming data, leveraging technologies like Apache Kafka, Redpanda, Apache Pulsar, and AWS Kinesis. These technologies provide a robust foundation for constructing stream data pipelines essential for the implementation of real-time machine learning.


Real-time machine learning architecture
Real-time machine learning architecture

There are several challenges to building these real-time machine learning systems:

  • Support low latency, fresh feature generation Keeping a low latency feature means as soon as new data arrives, those related features should be calculated in real-time with second or sub-second delay. This usually requires a highly mature streaming feature pipeline.

  • Keep consistent features between training and serving In most ML systems, the batching data pipeline used for training differs from the streaming data pipeline used for inference, referring to the typical Lambda architecture. These pipelines are even created using different programming languages; for instance, Python is employed for training, while Java or Scala is used for inference. Another common issue is ensuring the point-in-time correctness of training data, preventing the use of future data for training, which could lead to data leakage.

  • Backfill historical data When dealing with missing data or when experimenting with new features, users often resort to historical data backfill to train a new model. This necessitates the entire system's ability to replay this data to reconstruct all the relevant features.

  • Manage the complexity of online and offline data infrastructures To support real-time ML, users usually need to add streaming systems used for real-time feature generation to their existing batch-based data processing for offline feature generation and model training, such as Apache Flink. This makes the whole system complex, and Flink itself can be complex to manage. As a result, running real-time machine learning is feasible mainly for vendors with robust engineering teams, but not every player is Meta or Uber.



Why streaming data platforms can be a good tool for building real-time machine learning


To solve these challenges, a typical solution is to use a real-time feature platform. A feature platform helps users manage the complexity of real-time feature pipeline and storage, providing both batching and streaming processing abstractions to support real-time machine learning. Leading vendors in this space include Tecton, Hopworks, Claypot, Fennel, and Chalk.


Another option is to leverage a streaming data platform to build your own real-time feature store:

  • It is a simple system that unifies streaming processing and historical data processing, so the training and inference are sharing the same data processing pipeline

  • It usually provides SQL-based interfaces that are easy for both data scientists and engineers to use. So you don't need to worry about the Java/Python battle, usually you can call SQL in both languages

  • Leveraging streaming processing, streaming data platforms provide low-latency data processing pipelines which process that data at arrival incrementally

  • Backfill historical data with streaming data platform is easy, just run some SQL

  • “Point-in-time correctness” can be implemented by using asof joining


 

Step-by-step guide: Implementing real-time feature pipelines for fraud detection using Timeplus Proton


So, how do you build your own real-time feature pipelines? I created this step-by-step guide on how I used Timeplus Proton (and other publicly available tools) to implement a real-time feature pipeline for fraud detection. Proton is our open source, unified streaming and historical data analytics database in a single binary.


Here is a high level overview of the key components:

  • An online payment data generator which is based on a fraud detection dataset on Kaggle

  • A streaming feature pipeline/store backed by Timeplus’s streaming data platform

  • Two live streams, one for payments and one for labels

  • Real-time and historical views for features

  • A materialized view of all features used for training and inference data

  • Model training, PyCaret, a low-code ML tool for classification model training and inference

  • Model serving, fastapi, a python API framework


Proton fraud detection architecture
Proton fraud detection architecture


Online Payment Data


First, let’s take a look at the data.


The payment data is based on a fraud detection dataset from Kaggle. There are two streams, one for online payment and one for the label data that marks a transaction as fraud or not.


The streams are created using the following SQL:

CREATE STREAM IF NOT EXISTS online_payments
(
   `id` string,
   `type` enum8('PAYMENT' = 0, 'TRANSFER' = 1, 'CASH_OUT' = 2, 'CASH_IN' = 3, 'DEBIT' = 4),
   `amount` float64,
   `account_from` string,
   `old_balance_from` float64,
   `new_balance_from` float64,
   `account_to` string,
   `old_balance_to` float64,
   `new_balance_to` float64
)

CREATE STREAM IF NOT EXISTS online_payments_label
(
    `id` string,
    `is_fraud` bool,
    `type` string
)

This will generate the payment and label data and ingest into Proton.


There are three different types of fraud being simulated:

  • Type 1: The payment starts with a small transfer amount and then transfers the remainder of the money. The fraudster wants to verify that transactions can go through successfully with the first payment.

  • Type 2: There will be several payments to send all money from one account to several different accounts in a very short period of time.

  • Type 3: All money from one account is used as payment to a merchant.


The generator will randomly generate one of those three different types of fraud transactions. When it is a fraud transaction, a label data is saved to the label stream as well. For regular transactions, there is no label data being generated.



Build Features


"Features" refer to the individual, measurable properties or characteristics of data that are used as input for a machine learning model.


In this example, we will build three different types of features as the input of our fraud detection model.



Real-time Features


Real-time features are generated in real-time when the data arrives.  We use the following SQL to create a view for real-time features.

CREATE VIEW IF NOT EXISTS v_fraud_reatime_features AS
WITH cte AS
  (
    SELECT 
      _tp_time, 
      id, 
      type, 
      account_from, 
      amount, 
      lag(amount) AS previous_amount, 
      lag(_tp_time) AS previous_transaction_time
    FROM 
      default.online_payments
    WHERE 
      _tp_time > earliest_timestamp()
    PARTITION BY 
      account_from
  )
SELECT 
  _tp_time, 
  id, 
  type, 
  account_from, 
  amount, 
  previous_amount, 
  previous_transaction_time, 
  if(previous_transaction_time > earliest_timestamp(), date_diff('second', previous_transaction_time, _tp_time), 0) AS time_to_last_transaction
FROM 
  cte

The transaction type and transaction amount are two simple features which do not need any processing. These two features are directly created from the raw payment data.


The other two real-time features created by this SQL are:

  • amount of the current account's previous transaction

  • time elapsed from the current account's previous transaction


To build these two features, a `PARTITION BY` clause is used which will turn the stream into substreams by the account_from, then the lag function will return the previous value for specific fields in the stream from that account. This is a useful feature when you want to get the related value from a stream, and can be useful to identify type 1 and type 2 fraud.


A good feature platform should be able to handle stateful processing. In the above SQL, it is actually stateful processing as the query needs to remember if those previous records are for the current account_from? Thus, the feature is not only generated with the current records, but also related to the historical states. 



Near-Real Time Features


Not all features are real-time features, near-real time features are features built on a time window within 1 minute or 5 minutes. These kinds of features can be used to identify the user behavior across a period of time.

CREATE VIEW IF NOT EXISTS v_fraud_1m_features AS
SELECT 
  window_start, 
  account_from, 
  count(*) AS count, 
  max(amount) AS max_amount, 
  min(amount) AS min_amount, 
  avg(amount) AS avg_amount
FROM 
  tumble(default.online_payments, 60s)
WHERE 
  _tp_time > earliest_timestamp()
GROUP BY 
  window_start, account_from

CREATE VIEW IF NOT EXISTS v_fraud_5m_features AS
SELECT 
  window_start, 
  account_from, 
  count_distinct(account_to) AS target_counts
FROM 
  tumble(default.online_payments, 5m)
WHERE 
  _tp_time > earliest_timestamp()
GROUP BY 
  window_start, account_from

With the above SQL, we build some 1 minute and 5 minute features using a tumble window.

  • 1m transaction count per account

  • 1m transaction max amount per account

  • 1m transaction min amount per account

  • 1m transaction average amount per account

  • 5m distinct transaction target account number per account


We can see these features could be very helpful to identify type 2 fraud.



Historical Features


Historical features are those features that span all time from the data. In this sample, we create a 1 day tumble window aggregation and then calculate the max/min/average on all aggregated results using a global aggregation

CREATE VIEW IF NOT EXISTS v_fraud_1d_features AS
WITH agg1d AS
  (
    SELECT 
      window_start, account_from, count(*) AS count, max(amount) AS max_amount
    FROM 
      tumble(default.online_payments, 1d)
    WHERE 
      _tp_time > earliest_timestamp()
    GROUP BY 
      window_start, account_from
  )
SELECT 
  now64() as ts, account_from, avg(count) AS avg_count, avg(max_amount) AS avg_max_amount
FROM 
  agg1d
GROUP BY 
  account_from

With the above SQL, we created following features:

  • daily average transaction amount per account

  • daily average max transaction amount per account


These kind of features could be very useful to identity type 3 fraud as the user usually won't spend all the money if we check their spending history.



Combining all features using ASOF join


When training a machine learning model or doing a prediction, features need to be assigned to each event.  While the features are generated in different time intervals, ASOF join from all features will select the latest value from each feature view as shown in the following diagram. 


Point-in-time query
Point-in-time query
CREATE MATERIALIZED VIEW IF NOT EXISTS mv_fraud_all_features AS
SELECT 
  _tp_time AS time, 
  v_fraud_reatime_features.id AS id, 
  v_fraud_reatime_features.type AS type, 
  v_fraud_reatime_features.account_from AS account, 
  v_fraud_reatime_features.amount AS amount, 
  v_fraud_reatime_features.previous_amount AS previous_amount, 
  v_fraud_reatime_features.time_to_last_transaction AS time_to_last_transaction, 
  v_fraud_1m_features.count AS transaction_count_1m, 
  v_fraud_1m_features.max_amount AS max_transaction_amount_1m, 
  v_fraud_1m_features.avg_amount AS avg_transaction_amount_1m, 
  v_fraud_5m_features.target_counts AS distinct_transaction_target_count_5m, 
  v_fraud_1d_features.avg_count AS avg_transaction_count_1d, 
  v_fraud_1d_features.avg_max_amount AS avg_max_transaction_count_1d
FROM 
  v_fraud_reatime_features
ASOF LEFT JOIN v_fraud_1m_features ON (v_fraud_reatime_features.account_from = v_fraud_1m_features.account_from) AND (v_fraud_reatime_features._tp_time >= v_fraud_1m_features.window_start)
ASOF LEFT JOIN v_fraud_5m_features ON (v_fraud_reatime_features.account_from = v_fraud_5m_features.account_from) AND (v_fraud_reatime_features._tp_time >= v_fraud_5m_features.window_start)
ASOF LEFT JOIN v_fraud_1d_features ON (v_fraud_reatime_features.account_from = v_fraud_1d_features.account_from) AND (v_fraud_reatime_features._tp_time >= v_fraud_1d_features.ts)
SETTINGS 
  keep_versions = 100

We are running the above query as a materialized view, which is a long-running background process. All features will be calculated in real-time, incrementally, as soon as a new event comes into the online payment stream.  This is the foundation to keep a low-latency feature pipeline used for next step’s training and inference.



Train the Model


Labeling and prepare training dataset


In the previous step, a materialized view of all features is created, as we are going to train a supervised classification model used to identify the fraud, we need a dataset with label information to show which transaction is fraud.  We can simply join the feature mv with the label stream.

SELECT
 *
FROM
 table(mv_fraud_all_features) AS f
LEFT JOIN table(online_payments_label) AS l ON f.id = l.id

The label stream is an append-only log which contains the records that have been marked as fraud, while sometimes, we may make a mistake by marking some non-fraud transaction as fraud, a correction is needed. If we insert a new corrected label record to the label stream, as it is an append-only log, now the stream contains two records, one old label which is wrong and one new label which is the latest truth. When I updated the labels, a new model needed to be trained using this new label.  So the user wants to use the latest version of labels, this can be handled by using a changelog function.

SELECT
 *
FROM
 table(online_payments_label)
WHERE
 id = '37f0e2d5-9786-4833-a0e0-77c13eea4691'
Query id: e4139011-89d0-48ad-881b-bac408feda6d
┌─id───────────────────────────────────┬─is_fraud─┬─type──┬────────────────_tp_time─┐
│ b849c6e3-bc7a-4b20-a8da-35c589565879 │ true     │ type1 │ 2023-11-30 18:31:06.523 │
└──────────────────────────────────────┴──────────┴───────┴─────────────────────────┘
┌─id───────────────────────────────────┬─is_fraud─┬─type─┬────────────────_tp_time─┐
│ b849c6e3-bc7a-4b20-a8da-35c589565879 │ false    │      │ 2023-12-12 20:01:12.908 │
└──────────────────────────────────────┴──────────┴──────┴─────────────────────────┘

Assuming we have a transaction b849c6e3-bc7a-4b20-a8da-35c589565879, previously marked as fraud, and then we updated it as not.


We use following query to get the latest version of all labels:

CREATE VIEW v_latest_labels AS
WITH ordered_label AS
  (
    SELECT 
      *
    FROM 
      table(online_payments_label)
    ORDER BY 
      _tp_time ASC
  )
SELECT 
  id, latest(is_fraud) AS is_fraud
FROM 
  ordered_label
GROUP BY 
  id

If we run this view, only the latest label version will be returned.


SELECT 
  *
FROM 
  v_latest_labels
WHERE 
  id = 'b849c6e3-bc7a-4b20-a8da-35c589565879'
Query id: 6e6425c9-2bd3-4894-bf54-95d7316b136b
┌─id───────────────────────────────────┬─is_fraud─┐
│ b849c6e3-bc7a-4b20-a8da-35c589565879 │ false    │
└──────────────────────────────────────┴──────────┘

Now we have our query for training dataset with latest label

SELECT
*
FROM
table(mv_fraud_all_features) AS f
LEFT JOIN v_latest_labels AS l ON f.id = l.id


Training the Classification Model


Now, we have the training dataset ready, for fraud detection, we can build a classification model. PyCaret is a low-code ML tool, which can help us choose the best model to use, in the following sample code: 

  1. Query the training dataset from Timeplus

  2. Convert the query result into a pandas dataframe

  3. Initialize PyCaret environment and find/train the best classification mode

  4. Save the model to a local file

import json
import pandas as pd
from pycaret.classification import *
from timeplus import Stream, Query, Environment

sql = '''
SELECT
 *
FROM
 table(mv_fraud_all_features) as f
LEFT JOIN v_latest_labels as l ON f.id = l.id
LIMIT 100000
'''

query = (
       Query(env=env).sql(query=sql)
       .batching_policy(10000, 1000)
       .create()
   )

query_header = query.header()
query_result = []
for event in query.result():
   if event.event == "message":
       query_result += json.loads(event.data)

columns = [ f['name'] for f in query_header]
df = pd.DataFrame(query_result, columns=columns)
df_train = df[['type', 'amount', 'previous_amount',
'time_to_last_transaction', 'transaction_count_1m', 'max_transaction_amount_1m',
'avg_transaction_amount_1m','distinct_transaction_target_count_5m','avg_transaction_count_1d',
'avg_max_transaction_count_1d','is_fraud']]

s = setup(data = df_train, target = 'is_fraud', session_id = 123)
best_model = compare_models()
save_model(best_model, 'saved_model')

With the above code, we trained a classification model using a historical query where we return a bounded set of data. You can change the time range of using other conditions to select a specific set of data used for training.



Real-time Inference


Leveraging streaming query, we can run the inference as soon as the event arrives in real time. The following code shows how to load a train model and run real-time inference:

import json
import pandas as pd

from timeplus import Stream, Query, Environment
from pycaret.classification import load_model

model = load_model('saved_model')

sql = '''
SELECT
 *
FROM
 mv_fraud_all_features
WHERE _tp_time > now() -1h
LIMIT 3
'''

query = (
       Query(env=env).sql(query=sql).create()
   )

query_header = query.header()
columns = [ f['name'] for f in query_header]

for event in query.result():
   if event.event == "message":
	query_result = []
query_result += json.loads(event.data)
df = pd.DataFrame(query_result, columns=columns)
df_infer = df[['id', 'type', 'amount', 'previous_amount',
'time_to_last_transaction', 'transaction_count_1m', 'max_transaction_amount_1m',
'Avg_transaction_amount_1m','distinct_transaction_target_count_5m','avg_transaction_count_1d','avg_max_transaction_count_1d']]
	prediction = predict_model(model, data = df_infer)
	id = prediction['id'].tolist()[0]
    prediction_lable = prediction['prediction_label'].tolist()[0]
    is_fraud = 'fraud' if prediction_lable == 1 else 'not fraud'
    print(f"transaction {id} is {is_fraud}")      
transaction 23a861db-aabd-424f-943e-d7748ea465a7 is not fraudtransaction 4cc484f4-1668-40c7-a23c-604e871684ab is fraudtransaction e10e3090-3ffe-4a14-9085-7ceef933d8ff is not fraud

Now you have trained a fraud detection model and can run a real-time inference with real-time features using Proton streaming processing! If you want more features, like running your prediction in SQL or monitoring your model performance in real-time with dashboards, you can use Timeplus Cloud which is built on top of Proton. The enterprise version provides more functions like SQL exploration UI, Visualization and Dashboard, and Alerts. Read on to learn more about these features.



Create a UDF to run predict using streaming SQL


To simplify the inference, we can create a UDF for the inference, so the real-time prediction can be running within the same system of the feature pipeline by running streaming SQL.

Following code using FastAPI to host the fraud classification model and then we create a UDF called `fraud_detect`.

from typing import List
from fastapi import FastAPI
from pydantic import BaseModel

import pandas as pd
from pycaret.classification import load_model, predict_model

model = load_model('fraud_model')
app = FastAPI()


class PredictItem(BaseModel):
   type: List[str]
   amount: List[float]
   previous_amount: List[float]
   time_to_last_transaction: List[int]
   transaction_count_1m: List[int]
   max_transaction_amount_1m: List[float]
   avg_transaction_amount_1m: List[float]
   distinct_transaction_target_count_5m: List[int]
   avg_transaction_count_1d: List[int]
   avg_max_transaction_count_1d: List[int]


@app.get("/")
def info():
   return {"info": "timeplus fraud detection server"}


@app.post("/predict")
def predict(item: PredictItem):
   data = []
   length = len(item.type)
   for i in range(length):
       row = [item.type[i], item.amount[i],
              item.previous_amount[i], item.time_to_last_transaction[i],
              item.transaction_count_1m[i], item.max_transaction_amount_1m[i],
              item.avg_transaction_amount_1m[i],
              item.distinct_transaction_target_count_5m[i],
              item.avg_transaction_count_1d[i],
              item.avg_max_transaction_count_1d[i]
              ]
       data.append(row)

   cols = ['type', 'amount', 'previous_amount',
           'time_to_last_transaction', 'transaction_count_1m',
           'max_transaction_amount_1m', 'avg_transaction_amount_1m',
           'distinct_transaction_target_count_5m', 'avg_transaction_count_1d',
           'avg_max_transaction_count_1d']

   df_infer = pd.DataFrame(data, columns=cols)
   prediction = predict_model(model, data=df_infer)
   prediction_lable = prediction['prediction_label'].tolist()

   return {"result": prediction_lable}

We can create a remote UDF that can be used to run inference using SQL.




Here is the streaming SQL to run the inference.

SELECT
 id, fraud_detect(to_string(type), amount, previous_amount, time_to_last_transaction, transaction_count_1m, max_transaction_amount_1m, avg_transaction_amount_1m, distinct_transaction_target_count_5m, avg_transaction_count_1d, avg_max_transaction_count_1d) AS predict
FROM
 mv_fraud_all_features

Monitor model performance in real-time


Now, we have successfully deployed our feature pipeline, trained a classification mode, deployed the model with fastAPI, and created a prediction UDF in Timeplus. Next, it is important to monitor how well the model is working. Since we have the ground truth label in this demo case, we can show those metrics of the classification model in real time.

WITH t AS
 (
   SELECT
     p._tp_time AS ts, p.id AS id, l.is_fraud AS truth
   FROM
     online_payments AS p
   LEFT JOIN changelog(online_payments_label, id) AS l ON p.id = l.id
   settings seek_to = '-1h'
 ),
p AS (
 SELECT
 _tp_time AS ts, id, fraud_detect(to_string(type), amount, previous_amount, time_to_last_transaction, transaction_count_1m, max_transaction_amount_1m, avg_transaction_amount_1m, distinct_transaction_target_count_5m, avg_transaction_count_1d, avg_max_transaction_count_1d) AS predict
FROM
 mv_fraud_all_features
settings enable_optimize_predicate_expression = 0, seek_to = '-1h'
)
SELECT
 t.ts AS ts, t.id AS id, t.truth AS truth, (p.predict = 1) AS predict
FROM
 t JOIN p ON t.id = p.id AND date_diff_within(1m, t.ts, p.ts)


The above SQL will return the predicted value and the truth value for each event. With the Timeplus SQL UI, you can get an overview of the distribution of truth and prediction.  We make this query into a view called `v_fraud_truth_vs_predict_seekto_1h` and then we can monitor the model performance with following SQL: 

WITH metrics AS
 (
   SELECT
     ts, truth, predict,
     if((truth = true) AND (predict = true), 1, 0) AS TP,
     if((truth = true) AND (predict = false), 1, 0) AS FP,
     if((truth = false) AND (predict = false), 1, 0) AS TN,
     if((truth = false) AND (predict = true), 1, 0) AS FN
   FROM
     v_fraud_truth_vs_predict_seekto_1h
 )
SELECT
 window_start,
 sum(TP + TN) / count() AS accuracy,
 sum(TP) / (sum(TP) + sum(FP)) AS precision,
 sum(TP) / (sum(TP) + sum(FN)) AS recall
FROM
 tumble(metrics, ts, 15m)
GROUP BY
 window_start


The query returns the accuracy, precision and recall of past one hour data in a 15 minutes tumble window.


Another query is used to monitor the predict fraud vs real fraud:

WITH gt AS (
SELECT
 window_start, count(*), 'ground_truth' AS label
FROM
 tumble(online_payments_label, 1m)
where is_fraud = 1
GROUP BY
 window_start
SETTINGS
 seek_to = '-1h'
),
predict AS (
 SELECT
 window_start, count(*), 'prediction' AS label
FROM
 tumble(v_detected_fraud, 1m)
GROUP BY
 window_start
SETTINGS
 seek_to = '-1h'
)
SELECT * FROM gt
union SELECT * FROM predict


Using the line chart, we can clearly see the trend. This live dashboard can be found in our demo dashboard here.



Create alert on detected fraud


FInally, it is important to take action when fraud is detected. Timeplus supports different downstream data channels like Kafka, Slack, and Webhook. We can send the detected fraud into any of these notification channels and take action as soon as fraud happens.



 

Conclusion


Machine learning is going real-time. Luckily, there are some great tools available to build real-time machine learning systems from open-source tools and frameworks. I hope this tutorial was useful to you, and is an easy-to-follow guide on how to leverage Proton to build a real-time feature pipeline to serve your requirement for training / inference machine-learning models.


The sample code shown here can be found in Proton’s sample folder, and we also have a public demo workspace. If you like the solution, please fork or star! Let us know if you have any questions.


 

References


bottom of page