top of page
Writer's pictureJove Zhong

Real-Time AI with Timeplus, Kafka, Milvus, and Other OSS Tools

Our Cofounder and Head of Product, Jove Zhong, shares how to retrieve real-time data from external systems to ground large language model (LLM) on up-to-date information, providing users with real-time insights.



This is the talk I gave at the Unstructured Data and LLM Seattle Meetup on April 18, 2024, transcribed as a blog. The presentation recording is available below and on Zilliz’s YouTube channel.



 

Real-Time AI Use Case


One of the best examples of why we need real-time data for LLM/GAI comes from Confluent. In this scenario, a customer is interacting with a chatbot, looking for options to rebook a flight to avoid delays.



With the fast evolving of AI, we now have bigger and better models. On the same day I presented this talk, Meta open-sourced their Llama 3, with 8B and 70B parameters that can support a broad range of use cases. However, LLMs can be inconsistent – sometimes they regurgitate random facts from their training data. No matter how big the training data set is, it won’t include the latest information on the internet when you ask the question.


 

RAG


Retrieval-augmented generation (RAG) is an AI framework for improving the quality of LLM-generated responses by grounding the model on external sources of knowledge to supplement the LLM’s internal representation of information. 


In the rest of this blog, I will demonstrate how to use a set of open-source tools to retrieve the latest stories and comments on Hacker News, how to turn the text into embeddings, and send them to vector databases. During this process, you can use Apache Kafka and streaming SQL to pre-process or filter the data. This is a simplified version of real-time RAG.


All code shared in this tutorial is available here. When you run the streamlit web application, you can search for certain topics. It will show related discussions which are taking place on Hacker News in real-time.


Here’s a screenshot of the demo application:



 

Overall data flow



  1. Input: Stream stories and comments from Hacker News API.

  2. Pre-process: Retrieve updates and filter for stories/comments via Bytewax.

  3. Retrieve Content: Download the html and parse it into usable text. Thanks to the awesome Unstructured.io.

  4. Vectorize: Create an embedding or list of embeddings for text using Hugging Face Transformers.

  5. Output: Write the JSON document with embeddings and other fields to a local Timeplus Proton server, into the data streams.

  6. Filter/Route: Use streaming SQL to filter/transform JSON data and send output to a local Kafka server.

  7. Forward: Use Milvus plugin for Kafka Connect to forward the data to Zilliz Cloud.

  8. Query: Build a streamlit web application to query the live data with the same embedding model.


Check out all the code here.


 

Step-By-Step Walkthrough


Prerequisites: Set up the development environment


Python 3.11 is recommended to set up a virtual environment and install all dependencies:

python3.11 -m venv py311
source py311/bin/activate
pip install -r requirements.txt

Please check the full requirements.txt for details. The key dependencies are:

  • torch==2.2.0

  • transformers>=4.38.0

  • unstructured==0.12.4

  • bytewax==0.18.2

  • proton-driver

  • pymilvus

  • streamlit


You also need to install Timeplus locally, instead of as a Docker container, so that it can connect to the local Kafka server without extra configuration.


Create a new folder and run following commands:

curl https://install.timeplus.com | sh
proton server

The real-time Hacker News feed will arrive in Timeplus first. We will set up Apache Kafka and Kafka Connect later on (in Step 7), to forward the data to Zilliz Cloud as the fully-managed cloud vector database.



Step 1: Get data from Hacker News API

You can use Bytewax’s SimplePollingSource to create a new source to load data from Hacker News HTTP API.


First, get the latest story id, available at this link:

import requests
from bytewax.inputs import SimplePollingSource

class HNSource(SimplePollingSource):
    def next_item(self):
        return(
            "GLOBAL_ID",
            requests.get("https://hacker-news.firebaseio.com/v0/maxitem.json").json(),
        )

Next, get the last 150 items on the first run and get a new ID every 15 seconds from now on.

import bytewax.operators as op
from bytewax.dataflow import Dataflow

def get_id_stream(old_max_id, new_max_id) -> Tuple[str,list]:
    if old_max_id is None:
        # Get the last 150 items on the first run.
        old_max_id = new_max_id - 150
    return ic(new_max_id, range(old_max_id, new_max_id))

# main entry
def run_hn_flow(polling_interval=15):
    flow = Dataflow("hn_stream")
    max_id = op.input("in", flow, HNSource(timedelta(seconds=polling_interval)))
    id_stream = op.stateful_map("range", max_id, lambda: None, get_id_stream).then(
    op.flat_map, "strip_key_flatten", lambda key_ids: key_ids[1]).then(op.redistribute, "scaling")


Step 2: Get the metadata


Once you’ve got the Hacker News item id, you can download the text content and other metadata.

import bytewax.operators as op

def download_metadata(hn_id) -> Optional[dict]:
    return requests.get(
        f"https://hacker-news.firebaseio.com/v0/item/{hn_id}.json"
    ).json()

def run_hn_flow(polling_interval=15):
    ..
    enriched = op.filter_map("enrich", id_stream, download_metadata)


Step 3: Retrieve the text and parse HTML content


The Hacker News story can contain a URL link, while the comments are in plain text. You can use Python built-in libraries to remove HTML tags. Unstructured also provides handy text clean up functions.

import html
import re
from unstructured.cleaners.core import (
    clean,
    replace_unicode_quotes,
    clean_non_ascii_chars,
)
from unstructured.staging.huggingface import chunk_by_attention_window


pattern = re.compile("<.*?>")

def prep_text(metadata_content, tokenizer):
    metadata_content["text"] = html.unescape(metadata_content["text"])
    metadata_content["text"] = re.sub(pattern, "", metadata_content["text"])
    metadata_content["text"] = clean_non_ascii_chars(
        replace_unicode_quotes(metadata_content["text"])
    )
    metadata_content["text"] = chunk_by_attention_window(
        metadata_content["text"], tokenizer
    )
    return metadata_content

def run_hn_flow(polling_interval=15):
    ..
    comments = op.map(
        "clean_text", comments, lambda document: prep_text(document, tokenizer)
    )

You can also leverage Unstructured to parse the HTML content of the story link and extract and clean up the web content. Please check utils.py for details.



Step 4: Vectorize - Create text embeddings


So, we’ve got the text content for Hacker News stories and comments. The next step is to vectorize the content and create embeddings. Embedding is a magical process to “compress” the information as an array of float numbers, no matter how long the sentence is. For individual words, a good embedding model can create “vectors” that are close enough for similar words. For example, in this 2-dimensional model, the x and y for “football” is significantly closer to the x and y for “soccer”, instead of the coordinate for “footwear”.



In our example, we will create high-dimensional embeddings with 384 FloatVector. If you don’t have one already, create a free account in Zilliz Cloud and create a new collection called “comments” with the following schema:



You can use OpenAI’s embedding API to create high quality embeddings, but it’s not the only option. In this tutorial, we will be using Hugging Facing’s state-of-the-art machine learning for PyTorch, TensorFlow, and JAX. The Auto Classes can automatically retrieve the relevant model given the name/path to the pretrained weights/config/vocabulary.

from transformers import AutoTokenizer, AutoModel
import torch

tokenizer = AutoTokenizer.from_pretrained("sentence-transformers/all-MiniLM-L6-v2")
model = AutoModel.from_pretrained("sentence-transformers/all-MiniLM-L6-v2")

def hf_document_embed(chunk, tokenizer, model, torch, length=384):
    inputs = tokenizer(
        chunk, padding=True, truncation=True, return_tensors="pt", max_length=length
    )
    with torch.no_grad():
        embed = model(**inputs).last_hidden_state[:, 0].cpu().detach().numpy()
    return embed.flatten()

def run_hn_flow(polling_interval=15):
    ..
    comments = op.map(
        "comment_embeddings",
        comments,
        lambda document: hf_document_embed(
            document, tokenizer, model, torch, length=VECTOR_DIMENSIONS
        ),
    )


Step 5: Write JSON documents with embeddings and other fields to Timeplus Proton


As the last step of the Bytewax data flow, create an output to send the streaming results to a data stream in Timeplus Proton:

def run_hn_flow(polling_interval=15):
    ..
    op.output(
        "comments_out",
        comments,
        ProtonSink("hn_comments_raw", os.environ.get("PROTON_HOST","127.0.0.1"))
    )

The ProtonSink extends Bytewax’s DynamicSink. It’ll call CREATE STREAM IF NOT EXISTS DDL to create a stream with a single string column if the specified stream doesn’t exist. Then, run a set of INSERT INTO to send the JSON string into the stream. Please note that the embedding results are NumPy array, which is not serializable with Python’s default JSONEncoder, so we built our own encoder:

from bytewax.outputs import DynamicSink, StatelessSinkPartition
from proton_driver import client
import json
import numpy as np

class NumpyEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, np.ndarray):
            return obj.tolist()
        elif isinstance(obj, bytes):
            return "bytes"
        return json.JSONEncoder.default(self, obj)

class _ProtonSinkPartition(StatelessSinkPartition):
    def __init__(self, stream: str, host: str):
        self.client=client.Client(host=host, port=8463)
        self.stream=stream
        sql=f"CREATE STREAM IF NOT EXISTS `{stream}` (raw string)"
        self.client.execute(sql)

    def write_batch(self, items):
        rows=[]
        for item in items:
            str=json.dumps(item[0],cls=NumpyEncoder)
            rows.append([str]) # single column in each row
        sql = f"INSERT INTO `{self.stream}` (raw) VALUES"
        self.client.execute(sql,rows)

class ProtonSink(DynamicSink):
    def __init__(self, stream: str, host: str):
        self.stream = stream
        self.host = host if host is not None and host != "" else "127.0.0.1"

    def build(self, worker_index, worker_count):
        return _ProtonSinkPartition(self.stream, self.host)

Start the python script via:

python -m bytewax.run "pipeline:run_hn_flow()"

Then, check the data in Timeplus by launching the Proton client:

proton client -h 127.0.0.1

Run this SQL to check the sample data:

select raw from table(hn_comments_raw) limit 1

The sample output is:

{"key_id": "39997057_0", "by": "jdsully", "id": 39997057, "parent": 39997048, "time": 1712794288, "type": "comment", "root_id": 39996504, "text": "They asked for it and at that time you didnt really need to justify it.", "doc_embedding": [-0.05919811129570007, 0.35973143577575684, -0.049948230385780334, .., -0.14836473762989044]}


Step 6: Filter/Route - Use streaming SQL to filter/transform JSON data and send output to a local Kafka server


The following SQL statements will set up a pipeline to read all hacker news comments and send them to the Kafka topic. In the next step, we will set up zilliz-kafka-connect-milvus to forward the live data to Zilliz Cloud.

CREATE EXTERNAL STREAM comments_topic(
  key_id string, by string, id int64, parent int64, time int64, type string,
  root_id int64, text string, doc_embedding array(float32))
SETTINGS type='kafka', 
         brokers='localhost:9092',
         topic='comments', 
         data_format='JSONEachRow', 
         one_message_per_row=true;

CREATE MATERIALIZED VIEW mv INTO comments_topic AS
  SELECT
  raw:key_id as key_id, raw:by as by, raw:id::int64 as id, raw:parent::int64 as parent, raw:time::int64 as time, raw:type as type, raw:root_id::int64 as root_id, raw:text as text, cast(raw:doc_embedding,'array(float32)') AS doc_embedding
  FROM hn_comments_raw;

You can even filter data with Timeplus Proton to only send relevant content with embedding to Zilliz, such as adding the following WHERE clause to the end of the MATERIALIZED VIEW SQL:

WHERE text ilike '%play%'


Step 7: Forward the data to Zilliz Cloud via Kafka Connect


In the original Bytewax demo app, Milvus Lite is used to run Milvus easily in Python applications. Meanwhile in this example, we are integrating with the managed Milvus in the cloud, i.e. Zilliz Cloud.


Timeplus Proton will write transformed data to Kafka topics. The data is then forwarded to Zilliz Cloud via https://github.com/zilliztech/kafka-connect-milvus.


Please follow the kafka-connect-milvus docs for detailed installation steps. In short, you need to:


  • Set up JVM

  • Install Apache Kafka with Kafka Connect, such as kafka_2.13-3.6.1

  • Put zilliz-kafka-connect-milvus-0.1.1 folder in kafka_2.13-3.6.1\libs\

  • Edit the milvus-sink-connector.properties file with proper Milvus/Zilliz endpoint and token. Then, put it in kafka_2.13-3.6.1\config\:

name=zilliz-kafka-connect-milvus
connector.class=com.milvus.io.kafka.MilvusSinkConnector
public.endpoint=https://THE_ID.api.gcp-us-west1.zillizcloud.com
token=THE_TOKEN
collection.name=comments
topics=comments

Start the Kafka stack by opening a terminal window and changing directory to kafka_2.13-3.6.1, then


1. Start the ZooKeeper service via bin/zookeeper-server-start.sh config/zookeeper.properties

2. Start the Kafka broker service via bin/kafka-server-start.sh config/server.properties

3. Create a topic via bin/kafka-topics.sh --create --topic comments --bootstrap-server localhost:9092

4. Start the Kafka Connect service via bin/connect-standalone.sh config/connect-standalone.properties config/milvus-sink-connector.properties


Keep the Python script, Timeplus server, and Kafka server all running. You will notice new data is added to the collection in Zilliz Cloud.




Step 8: Build a web application to query real-time data

In this final step, we will build a chatbot or search engine to validate your RAG. Make sure the question you are going to ask is transformed to an embedding with the same model. In this example, I choose Snowflake’s streamlit Python framework to build the web application.


The key code snippet is as follows:

import streamlit as st
import pymilvus

def hf_document_embed(chunk, tokenizer, model, torch, length=384):
    ..

with st.form("my_form"):
    st.image("banner.png")
    text_val = st.text_area("What's going on at Hacker News today?",value="What's new for Play Station?")

    submitted = st.form_submit_button("Go!")
    if submitted:
        pymilvus.connections.connect(uri=configs["public.endpoint"].data, token=configs["token"].data)
        collection = pymilvus.Collection(configs["collection.name"].data)

        embedding = hf_document_embed(
            text_val, tokenizer, model, torch, length=VECTOR_DIMENSIONS
        )

        results = collection.search(data=[embedding], anns_field="doc_embedding", param={'level':3}, limit=10, output_fields=["by", "time","text"], expr=f'type == "comment"', consistency_level="Strong")

        with st.container(height=500):
            for r in results[0]:
                matching = r.entity
                time_diff = datetime.now() - datetime.fromtimestamp(matching.get('time'))
                minutes_ago = divmod(time_diff.total_seconds(), 60)[0]
                time_display = datetime.fromtimestamp(matching.get('time'))
                if minutes_ago < 120:
                    time_display = f"{int(minutes_ago)} minutes ago"
                st.markdown(f"{time_display} - {matching.get('by')} 🗣️\n> {matching.get('text')}")
                st.slider("distance:",min_value=0.0, max_value=1.0,value=r.distance,key=r.id,disabled=True,label_visibility="collapsed")

Please note:

  • We use pymilvus lib to connect to Zilliz Cloud with API token, and call collection.search function to ask questions.

  • The question is turned to embedding with the same Hugging Face model.

  • We only show the top 10 most relevant content from Hacker News. Milvus/Zilliz allows you to specify an expression for metadata search (“type==comment” in this example), and return other metadata field (by, time, and text in this example)


 

Congratulations! You just built a real-time RAG with top-notch open source tools including: Bytewax, Unstructured, Hugging Face, Timeplus, Apache Kafka, and Zilliz.





For reference, the full source code is available in the examples folder of the Timeplus Proton repo


By leveraging those open source tools, you can push your AI application to the next level by connecting it with the real world, with real-time data.


Got questions for comments? Share in the Discussions on https://github.com/timeplus-io/proton or join our community Slack: https://timeplus.com/slack.


bottom of page