top of page

Streaming SQL for Real-Time AI and Machine Learning

  • Writer: Gang Tao
    Gang Tao
  • Mar 31
  • 17 min read

Updated: Apr 1

Powered by Timeplus Python User-Defined Function


Python and SQL: The Two Most Popular Data Tools


SQL and Python are two of the most widely used tools in the data domain, both have their strengths and weaknesses, and understanding when to use each can help maximize efficiency and performance in data analysis, machine learning, and data transformation tasks.




 

See Python UDFs in action! Check out this introductory video and demo from our CTO, Gang Tao:




 

Let's look at SQL.


Pros:

  • Optimized for Data Retrieval: SQL is designed specifically for querying structured data. It is highly efficient at retrieving data from relational databases and allows users to perform complex queries quickly.

  • Declarative: SQL follows a declarative approach, meaning you specify what you want, and the database engine determines the best way to retrieve it. This makes it easier for users to focus on results rather than implementation details.

  • Standardized: SQL is widely used across different database systems with only minor variations. This makes it easy to learn and transfer skills between platforms like MySQL, PostgreSQL, and Microsoft SQL Server.

  • Scales Well: Databases are optimized for handling large datasets efficiently. Indexing, caching, and query optimization features help SQL scale to handle enterprise-level workloads.


Cons:

  • Limited Flexibility: SQL struggles with implementing complex logic such as loops, conditionals, and advanced transformations. It is best suited for data retrieval rather than data manipulation.

  • No Native Support for Machine Learning & AI: SQL cannot perform machine learning, advanced statistical analysis, or automation. It needs to be integrated with Python or other tools for such tasks.

  • Difficult for Unstructured Data: SQL is designed for structured data stored in relational tables. Handling unstructured data such as JSON, XML, images, or free text is challenging, though some databases now support JSON querying.



How about Python?


Pros:

  • Highly Flexible: Python is extremely versatile, allowing for complex data transformations, custom logic, and automation that are difficult to achieve with SQL.

  • Supports Machine Learning & AI: Python is essential for machine learning and artificial intelligence applications. With libraries like Scikit-learn, TensorFlow, and PyTorch, Python is the go-to language for predictive modeling and statistical analysis.

  • Rich Ecosystem: Python has a vast ecosystem of libraries such as Pandas, NumPy, and Matplotlib that make it powerful for data processing, visualization, and analysis.



Cons:

  • Slower for Large-Scale Queries: Python is not as optimized as SQL for retrieving and processing large datasets from relational databases. SQL databases are designed to execute queries efficiently with indexing and optimization strategies.

  • Requires Programming Skills: Python has a steeper learning curve for non-programmers compared to SQL. Writing efficient Python scripts requires knowledge of programming concepts and best practices.

  • May Require External Tools for Performance: For big data applications, Python often needs additional tools like Dask, PySpark, or database tuning to handle large-scale processing effectively.


 

SQL vs Python: Which One Should You Use?


It is a common practice to chose use different tools for different purposes.


You should use SQL when:

  • You need to efficiently retrieve and manipulate structured data stored in relational databases.

  • The task involves filtering, aggregating, or joining large datasets.

  • Performance and query optimization are a priority.

  • The dataset is stored in a database and needs minimal processing.


You should use Python when:

  • You need advanced data analysis, machine learning, or AI capabilities.

  • The task involves complex data transformations and automation.

  • The dataset includes unstructured data such as JSON, images, or text.

  • You are working with big data frameworks or require extensive data manipulation.


 

One Box for Two: Timeplus Streaming SQL with Python


Timeplus is a unified streaming SQL platform designed for high performance and efficiency, all within a single binary. Its primary interface is SQL, which comes with inherent limitations, as mentioned earlier. To address these constraints, Timeplus has recently introduced a new feature: Python user-defined functions (UDFs). With Python UDFs, users can seamlessly integrate Python functions into SQL queries, bridging the gap between SQL and Python. This enhancement enables developers and data engineers to leverage the strengths of both languages within a single platform, unlocking greater flexibility and capability.



In the past, Timeplus already supported JavaScript UDF and Remote UDF. These two UDFs have limitations as well:

  • JavaScript UDF provides an option to run computation written in JavaScript, which is a good external to SQL capabilities, while it does not support those external third party libraries.

  • Remote UDF can be used to push computation to a remote server, while it cannot be used for aggregation or case there are large volumes of data. Since transfer data through network calls might be slow.


This is why Python UDF is a good extension for Timeplus SQL users.  One of the biggest benefits of Python UDF is to leverage those community libraries of data processing, machine learning and AI libraries to enrich what the user can do with SQL.



In today’s blog, I am going to share how users can leverage Python UDF to build some real-time machine learning and AI applications.


 

Python UDF Basis


Python UDF, just like other SQL functions, provides data processing capabilities, written in Python, executed in the SQL.


There are two kinds of SQL functions:

  • Aggregation functions: Operate on multiple rows and return a single summary value. These functions are typically used with GROUP BY clauses in SQL queries.

  • Scalar function: Operate on individual rows and return one result for each row. These include mathematical, string ops, date/time, and conditional functions.



For example, explained with the above diagram, if you run a mathematical function +1 to all the data, it is a non aggregate function, that each input gets operated with +1 operation.   While the other functions SUM and group by Color, will aggregate and operate SUM on input with the same color and output two results.


Here are samples of how to write these two types of customer defined functions.


Scalar Sample: Add Five to Each Input


The following Python UDF will implement a math function +5 which will add five to each input number.

CREATE OR REPLACE FUNCTION add_five(value uint16) RETURNS int 
LANGUAGE PYTHON AS 

$$

def add_five(value):
   for i in range(len(value)):
       value[i] = value[i] + 5
   return value

$$;

  • The first line defines the function name, input parameters and return types of the UDF

  • The UDF function body is quoted within two $$, this is similar to ‘ or ` which just provide a string, using $$ can effectively avoid conflict with those quote characters used in the python code.

  • The UDF function is just a regular python function, note here, the type of input value here is not a number, instead it is an array of numbers. As Timeplus is built on top of column storage. All the data within Timeplus are wrapped as columns, and processed in batch. So the Timeplus SQL engine will transfer each input parameter as an array in a batch. Thus, the function here need iterated the input values and add five for each of them.



Users can use the UDF in SQL just like a regular SQL function, for example:

SELECT add_five(100)

SELECT add_five(col) from stream_name

One important thing to know is how the SQL types map to Python types, for more information about this, refer to this document.



Aggregation Sample: Find the Max number from Input


Below is an example of how to write an aggregation function to find the maximum number from inputs:

CREATE OR REPLACE AGGREGATE FUNCTION getMax(value uint16) RETURNS uint16 
LANGUAGE PYTHON AS $$

import pickle

class getMax:
   def __init__(self):
       self.max = 0

   def serialize(self):
       data = {}
       data['max'] = self.max
       return pickle.dumps(data)

   def deserialize(self, data):
       data = pickle.loads(data)
       self.max = data['max']

   def merge(self, other):
       if (other.max > self.max):
           self.max = other.max

   def process(self, values):
       for item in values:
           if item > self.max:
               self.max = item
   
   def finalize(self):
       return [self.max]

$$;

  • The first line defines the function name, input parameters and return types of the UDF, similar as the scalar function, but with AGGREGATE key word to specify it is an aggregate function

  • The UDF is defined in a class with the same name as the UDF function name, since aggregation is stateful operation, which needs to use the class members to keep internal state.

  • In the init function, the self.max is used to store the max numbers from the input

  • The process function is the main entry point of the aggregation processing, which iterates all input and compares with the current max number stored in the self.max as global state and updates it if the input is a bigger number than the current max.

  • The finalize function will emit the current aggregation result, this function will be called when the SQL engine decides to output the aggregation result.

  • serialize and deserialize function are used to save/restore the UDF state in case the user recreates the UDF due to Timeplus restart or the query is rescheduled in a distributed environment.  In this case, the sample code just calls pickle to save the python object to a local disk.

  • merge function is used to merge the aggregation state from multiple parallel processors, since Timeplus will run the aggregation in multiple threads under the hood.


Here are some sample cases to use the aggregation.

– global aggregations
SELECT getMax(col) FROM stream_name

– tumble window based aggregation
SELECT window_start, getMax(col) 
FROM tumble(stream_name, 1m) 
GROUP BY window_start

 

Machine Learning - Batch Training + Stream Inference


There are lots of machine learning libraries available in the python community.  Which provide classification, regression or clustering features.  Here I listed some popular choices.


  • Scikit-learn is the go-to library for classical machine learning algorithms, including regression, classification, and clustering. Built on top of NumPy, SciPy, and matplotlib, Scikit-learn provides a comprehensive suite of tools for data preprocessing, model selection, and evaluation.

  • PyCaret is a low-code machine learning library that automates many aspects of the ML pipeline, such as feature engineering, model selection, hyperparameter tuning, and deployment. It significantly reduces the time required to build ML models, making it an excellent choice for beginners and those who need rapid experimentation.

  • XGBoost (Extreme Gradient Boosting) is a highly optimized gradient boosting framework known for its speed and efficiency. It is widely used in Kaggle competitions and industry applications where performance and accuracy are critical. XGBoost excels in handling structured/tabular data and is particularly useful for classification and regression tasks.


A typical Machine Learning workflow includes data preparation, feature extraction, model selection, model training, model evaluation, model deployment and model monitoring.


With Python UDF and Timeplus Streaming SQL, users can effectively run these processes on a single data platform.  

  • Data preparation: Timeplus Streaming SQL can be effectively used for data preparation, including extracting data from unstructured data source, transforming source data into required fields or format, filling missing data or masking sensitive data.  

  • Feature extraction: High quality features are key to machining learning model performance, refer to my blog about how Timeplus can be used to support real-time feature pipeline.

  • Model Training: Training a machine learning model with Timeplus means using historical data to run a batch training process, I will show in the later example how to do it with Python UDF.

  • Model Evaluation: Model evaluation is the process of assessing how well a trained machine learning model performs on unseen data. It helps determine whether the model generalizes well or suffers from issues like overfitting or underfitting. With Timeplus Python UDF, we can either do it in the Python code, or use SQL queries to evaluate those performance metrics, as most of these performance metrics are just math operations.

  • Model Deployment: With Timeplus python UDF, the model can be deployed using the Python UDF itself, which means there is no extra data infrastructure required to deploy the mode, the inference is provided by the Python UDF. This can simplify the overall infrastructure complexity.

  • Model Monitoring: Timeplus streaming SQL is a powerful tool to monitor data in real-time, leveraging Timeplus streaming SQL, users can monitor model performance in real time with simple SQL queries.


Let’s take a look at how to use Timeplus Python UDF to do a simple iris dataset classification.



Scikit-Learning Classification - Iris Dataset


First, let's import the iris dataset into a stream called iris.  I have already put the dataset on AWS S3 and users can access the data by using the url function.

CREATE STREAM IF NOT EXISTS iris
(
  `sepal.length` float64,
  `sepal.width` float64,
  `petal.length` float64,
  `petal.width` float64,
  `variety` string
);

INSERT INTO iris (
  `sepal.length`,
  `sepal.width`,
  `petal.length`,
  `petal.width`,
  `variety`
)
SELECT * 
FROM url('https://tp-solutions.s3.us-west-2.amazonaws.com/iris.csv', 'CSVWithNames');

Next, we can create two UDFs, one aggregation function called train_sklearn_classifier and one scalar function called predict_sklearn_classifier.



CREATE OR REPLACE aggregate FUNCTION train_sklearn_classifier(features array(float64), label string, name string) RETURNS string LANGUAGE PYTHON AS
$$

import numpy as np
import pandas as pd
import joblib
from sklearn.preprocessing import LabelEncoder
from sklearn.linear_model import LogisticRegression

class train_sklearn_classifier:
   def __init__(self):
       self.model = ''

   def serialize(self):
       data = {}
       data["model"] = self.model
       return pickle.dumps(data)

   def deserialize(self, data):
       data = pickle.loads(data)
       self.model = data["model"]

   def merge(self, other):
       pass

   def process(self, features, labels, names):
       try:
           data = []
           for feature, label in zip(features, labels):
               row = feature + [label]  # Create a new list with label appended
               data.append(row)
          
           feature_names = [f'f{n}' for n in range(len(features[0]))]
           df = pd.DataFrame(data, columns=feature_names + ['label'])

           X = df[feature_names].values  # Features
           y = df['label'].values  # Labels
           label_encoder = LabelEncoder()
           y_encoded = label_encoder.fit_transform(y)
          
           classifier = LogisticRegression(max_iter=200)
           classifier.fit(X, y)
           joblib.dump(classifier, f'{names[0]}.pkl')
           self.model = str(classifier)

       except Exception as e:
           self.model = str(e)

   def finalize(self):
       return [self.model]

$$;

CREATE OR REPLACE FUNCTION predict_sklearn_classifier(features array(float64), name string) RETURNS string LANGUAGE PYTHON AS
$$
import traceback
import joblib
import numpy as np

def predict_sklearn_classifier(features, name):
   results = []
   for (features, name) in zip(features, name):
       try:
           loaded_classifier = joblib.load(f'{name}.pkl')
           new_data = np.array([features])
           new_prediction = loaded_classifier.predict(new_data)
           results = [ str(v) for v in new_prediction]
       except Exception as e:
           trace = traceback.format_exc()
           results.append(trace)

   return results

$$;

The training function has three inputs

  1. features, which is an array of float

  2. label, which is the prediction target

  3. name, a model name, which will be used to save a local model file.


In this example, we save the model as a local file using pickle. When running the prediction, the UDF will load the trained model and then use that model to run the classification prediction.


Here is the SQL to train the model and run inference using the above UDF:

-- train the model
SELECT
 train_sklearn_classifier([sepal.length, sepal.width, petal.length, petal.width], variety, 'test_sklearn_classifier')
FROM
 table(iris);


-- prediction
SELECT
 predict_sklearn_classifier([5.1, 3.5, 1.4, 0.2], 'test_sklearn_classifier')

As Timeplus is a streaming processing platform, users can continuously run inference on incoming data, and users can also put the inference query in the materialized view which will continuously process data in the background.   


 

Time Series Forecast - Statistical Method


Time series forecasting is a machine learning technique used to predict future values based on previously observed data points collected over time. It is widely used in finance, weather prediction, supply chain management, and many other domains where trends and patterns evolve over time.


There are some popular time series forecasting libraries you can use.


  • Prophet is an open-source time series forecasting library developed by Facebook. It is particularly well-suited for handling missing data, detecting seasonal patterns, and incorporating external variables. Prophet is designed to be robust and easy to use, making it an excellent choice for business forecasting applications.

  • StatsForecast is a Python library developed by Nixtla for time series forecasting using statistical models such as ARIMA, ETS, and exponential smoothing. It is optimized for speed and scalability, making it a great alternative to traditional forecasting methods.

  • Darts is an open-source, unified framework for time series forecasting. It provides a consistent interface for working with statistical models, deep learning models, and ensemble approaches. Darts simplifies the process of model selection and experimentation.

  • Statsmodels is a Python library for statistical modeling and econometrics, including time series analysis. It provides implementations of well-known statistical models such as ARIMA, SARIMA, and VAR.


These tools provide time series forecasting based on statistical methods. 



Time series forecasting using Statsmodels - Air Passenger Forecast


In this example, we create a time series forecasting function and apply it on the Air Passenger dataset which is commonly used in demonstrating how to run time series prediction.


First let's prepare the dataset.


CREATE STREAM passengers
(
  `Date` string,
  `Number` uint32
);

INSERT INTO passengers (`Date`, `Number`) VALUES
('1949-01', 112),
('1949-02', 118),
('1949-03', 132),
... ...
('1960-11', 390),
('1960-12', 432);


You can visualize the data as a line chart in Timeplus using the following query:

SELECT
   to_time(Date) AS time, Number
FROM
   table(passengers)

As a streaming platform, users want to continuously forecast what happened in real time, so we can create a test query that replays the above dataset repeatedly as a long running view with the following query.


CREATE RANDOM STREAM counter(i int default rand()%5) SETTINGS eps=1;

CREATE VIEW v_passenger_replay
(
  `time` datetime,
  `id` uint8,
  `date` string,
  `number` uint32
) AS
WITH selector AS
  (
    SELECT
      to_unix_timestamp(_tp_time) % 144 AS id
    FROM
      default.counter
  ), data AS
  (
    SELECT
      row_number_in_block() AS id, Date AS date, Number AS number
    FROM
      table(default.passengers)
  )
SELECT
  now() AS time, id, data.date, data.number
FROM
  selector
INNER JOIN data ON selector.id = data.id

  • A random stream is created with eps = 1, which will generate one random event per second, and it will continuously generate data until the user drops it.

  • A view is created by join the random stream with the passenger stream

  • For the random stream, we using to_unix_timestamp(_tp_time) % 144 to generate, continuously id from 0 to 143, as the passenger dataset has id from 0 to 143 by calling function  row_number_in_block, so when we joining the two query, we got a continuously replay of the air passenger dataset, when it reach the end, it will replay from the beginning 


We can show this replay in Timeplus as real-time visualizations.



Now we create the forecast Python UDF based ARIMA from stesmodels.



CREATE OR REPLACE aggregate FUNCTION forecast_agg(timestamp string, value float64) RETURNS float64 LANGUAGE PYTHON AS 
$$

import pandas as pd
import pickle
from statsmodels.tsa.arima.model import ARIMA

class forecast_agg:
    def __init__(self):
        self.model = None  # ARIMA model will be created dynamically
        self.ts = []  # Store raw time series data
        self.forecast = None
        self.order = (1, 1, 1)  # Default ARIMA order (p, d, q)

    def serialize(self):
        """Serialize only the time series data, since ARIMA models are not picklable."""
        data = {"ts": self.ts, "order": self.order}
        return pickle.dumps(data)

    def deserialize(self, data):
        """Deserialize time series data and recreate the model."""
        data = pickle.loads(data)
        self.ts = data["ts"]
        self.order = data["order"]
        self.model = None  # The model will be refitted in process()

    def merge(self, other):
        pass

    def process(self, timestamp, value):
        """Process new data points and generate forecast."""
        try:
            for t, v in zip(timestamp, value):
                self.ts.append((t, v))

            df = pd.DataFrame(self.ts, columns=["ds", "y"])
            df["ds"] = pd.to_datetime(df["ds"])
            df = df.tail(12)  # Keep only the last 12 data points
            df.set_index("ds", inplace=True)

            if len(df) < 5:  # ARIMA needs enough data points to estimate parameters
                self.forecast = None
                return

            # Fit the ARIMA model
            self.model = ARIMA(df["y"], order=self.order)
            fitted_model = self.model.fit()

            # Forecast the next time step
            forecast = fitted_model.forecast(steps=1)
            self.forecast = forecast.iloc[0]

        except Exception as e:
            self.forecast = None

    def finalize(self):
        """Return the final forecasted value."""
        return [self.forecast] if self.forecast is not None else [0.0]

$$;

In this mode, we use the last 12 data points to predict the next one.


By running the following query, users get a real-time prediction on the test reply of the air passenger data  stream.

WITH forecast AS
 (
   SELECT
     latest(date) AS lt, latest(number) AS lv, forecast_agg(date, to_float64(number)) AS forecast
   FROM
     v_passenger_replay
   EMIT STREAM PERIODIC 500ms
 )
SELECT
 now() AS time,
 to_time(lt) AS t,
 lv,
 array_join([(lv, 'truth'), (forecast, 'forecast')]) AS reshape,
 reshape.1 AS value,
 reshape.2 AS label
FROM
 forecast

The above query provides a real-time prediction, transforming the query result so it can be shown as a multi series line chart in data visualizations.


 

Deep Learning: From simple neural network to nature language processing


Deep Learning is a subset of machine learning that uses artificial neural networks with multiple layers (hence "deep") to learn from large amounts of data. It is inspired by the structure and function of the human brain and excels in tasks like image recognition, natural language processing, and time series forecasting.  Deep Neural Networks (DNNs) are also the foundation of Large Language Models (LLMs) like GPT (ChatGPT), BERT, and LLaMA.


There are some popular deep learning frameworks and tools.

  • PyTorch is Facebook’s deep learning framework, widely adopted in both research and production environments. Known for its dynamic computation graph, PyTorch provides an intuitive interface that allows for faster experimentation and debugging.

  • TensorFlow is Google’s deep learning framework, designed to handle large-scale machine learning workloads with ease. It supports distributed computing and provides extensive tools for production deployment.

  • Hugging Face is the go-to platform for natural language processing (NLP) and AI model sharing. It offers pretrained models, APIs, and development tools that simplify AI implementation. The Hugging Face Transformers library is an open-source Python library that provides easy access to state-of-the-art deep learning models for natural language processing (NLP), vision, and speech tasks. It allows developers to use and fine-tune powerful pretrained models such as BERT, GPT, T5, LLaMA, and Stable Diffusion with minimal effort.


Let’s take a closer look at how to leverage these libraries with Timeplus Python UDF.


Neural Network with PyTorch - Iris Classification


Similar as I just showed in the previous sample using sklearn to do the classification, we can use pytorch to create a neural network to do classification as well and here comes the code.


I am not going to show the details, but it can be referred to from this github gist.


While there are pros and cons to use neural networks to do the classification compared to tradition machine learning models


Pros:

  1. Better for Complex Patterns: One of the key strengths of neural networks is their ability to learn hierarchical and nonlinear patterns. Unlike traditional machine learning models, which often require feature engineering and manual transformations, deep learning models can automatically capture intricate relationships within the data. This makes them highly effective for tasks involving images, speech, and other complex datasets.

  2. Handles Large and High-Dimensional Data: Neural networks excel at processing high-dimensional data. Whether you're working with images (CNNs), text (RNNs, Transformers), or time-series data (LSTMs), deep learning models can efficiently extract meaningful features and make accurate predictions.

  3. Automatic Feature Engineering: Feature engineering is often a tedious task in traditional machine learning. With neural networks, this process is largely automated. The layers of a neural network learn to extract relevant features from raw data, reducing the need for manual intervention. This is particularly beneficial in domains like computer vision and natural language processing.


Cons:

  1. Needs More Data: Deep learning models thrive on large amounts of labeled data. Without sufficient data, models struggle to generalize, leading to poor performance on unseen examples. In contrast, traditional machine learning algorithms like decision trees or logistic regression can perform well even with smaller datasets.

  2. Longer Training Time: Training deep neural networks is computationally expensive. Large models require powerful GPUs or TPUs to process data efficiently, and even then, training can take hours or even days depending on the dataset and model complexity.

  3. Harder to Tune: Neural networks come with numerous hyperparameters, including the number of layers, neurons per layer, activation functions, and learning rates. Finding the optimal combination often requires extensive experimentation and fine-tuning, which can be time-consuming.

  4. Less Interpretability: A significant drawback of deep learning models is their black-box nature. Unlike simpler models like decision trees, which provide clear decision paths, neural networks make predictions based on complex weight matrices that are difficult to interpret. This lack of transparency can be a concern in high-stakes applications like healthcare and finance.

  5. More Prone to Overfitting: With millions of parameters, deep neural networks have a high capacity to memorize training data. This increases the risk of overfitting, especially when training on small datasets. Techniques like dropout regularization and data augmentation can help mitigate this issue, but they add to the overall complexity.


Users can choose which method to use based on their requirements and use cases.

Neutral network can do more than classification, another example is using LSTM to perform time series forecasting, I have put the sample code here for your reference. https://gist.github.com/gangtao/6ca203df9bebefbe8f6dcd1265639373 



Sentimental Analysis using Hugging Face Transformer


One of the most important use cases to leverage deep learning techniques is nature language processing, which is also the foundation of today’s LLM.  In the following example, we show how to build a sentimental analysis function using Python UDF based on hugging face transformer libraries.



CREATE OR REPLACE FUNCTION transformer_sentiment_analyzer(input string) RETURNS string LANGUAGE PYTHON AS
$$
from transformers import pipeline


def transformer_sentiment_analyzer(input):
   model_dir = "/timeplus/tmp/models"
   model_name = "distilbert-base-uncased-finetuned-sst-2-english"

   results = []
   for input_string in input:
       try:
           sentiment_analyzer = pipeline(
               "sentiment-analysis",
               model= model_name, 
               tokenizer=model_name,
               model_kwargs={"cache_dir": model_dir}
           )

           result = sentiment_analyzer(input_string)
           results.append(str(result))
       except Exception as e:
           trace = traceback.format_exc()
           results.append(trace)

   return results

$$;

This scalar function is simple and straightforward, it gives each input string a sentimental score.


Here is a sample to run such analysis in a SQL

select transformer_sentiment_analyzer('hi, nice to meet you')

[{'label': 'POSITIVE', 'score': 0.9996515512466431}]

The model give the input ‘hi, nice to meet you’ positive label with a confidence level 0.9996



 

AI/LLM - LLM and Agentic Tasks


LLMs are transforming the software industry, making machine learning accessible to non-data scientists. There are many private model providers available to help you build applications such as OpenAI, Google Gemini, Claude, or Open source models like DeepSeek, users can also choose to run open source, local models leverage Ollama or hosted on the Cloud like Groq.


It is super easy to integrate LLM into your data processing streaming SQL leverage Timeplus Python UDF.



Build A Data Pipeline with Open AI as Function


We build a scala function called chat, which will send the input string to OpenAI, gpt model.


CREATE OR REPLACE FUNCTION chat(value string) RETURNS string LANGUAGE PYTHON AS 
$$

import os        
from openai import OpenAI

client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))

def chat(value):
    res = []

    for v in value:
        try:
            chat_completion = client.chat.completions.create(
                messages=[{
                    "role": "user",
                    "content": v
                }],
                model="gpt-3.5-turbo")
            res.append(chat_completion.choices[0].message.content)
        except Exception as e:
            res.append(str(e))
    return res

$$

With this LLM call, users can actually build some real-time transformation leverage LLM, here is a simple example:

SELECT
 chat( concat('what is the airport code of ', city , ' return in json format') )
FROM
 (
   SELECT
     'Vancouver' AS city
 )

The above query will return { "airport_code": "YVR" }


With just 20 lines of code, we build a real-time data processing pipeline leveraging LLM, that is cool!


With Python UDF users can do more like this, leveraging those agentic frameworks, such as langchain, llamaIndex, autogen, users can build more advanced real time AI applications on top of Timeplus.



 

Summary


Timeplus is a high-performance streaming processing platform that uses SQL as its primary interface. By integrating Python UDFs (User-Defined Functions), Timeplus empowers data scientists, data engineers, and software developers with a seamless way to process and analyze real-time data.


With Python UDFs, Timeplus makes it easy to implement a variety of machine learning and AI-driven workflows, including:


  • Traditional Machine Learning – Apply predictive models directly on streaming data.

  • Time Series Forecasting – Leverage statistical and deep learning models for real-time forecasting.

  • Deep Learning & NLP – Process text data and build AI-driven insights.

  • AI, LLM & Agentic Workflows – Integrate large language models (LLMs) and intelligent agents for automation.


By combining real-time data processing with advanced machine learning capabilities, Timeplus enables businesses to detect trends, automate decisions, and build AI-powered applications with ease. 


Try it yourself! See all Python UDF examples in our 'examples' repo on GitHub.

bottom of page