Our Cofounder and Head of Product, Jove Zhong, shares how to retrieve real-time data from external systems to ground large language model (LLM) on up-to-date information, providing users with real-time insights.
This is the talk I gave at the Unstructured Data and LLM Seattle Meetup on April 18, 2024, transcribed as a blog. The presentation recording is available below and on Zilliz’s YouTube channel.
Real-Time AI Use Case
One of the best examples of why we need real-time data for LLM/GAI comes from Confluent. In this scenario, a customer is interacting with a chatbot, looking for options to rebook a flight to avoid delays.
With the fast evolving of AI, we now have bigger and better models. On the same day I presented this talk, Meta open-sourced their Llama 3, with 8B and 70B parameters that can support a broad range of use cases. However, LLMs can be inconsistent – sometimes they regurgitate random facts from their training data. No matter how big the training data set is, it won’t include the latest information on the internet when you ask the question.
RAG
Retrieval-augmented generation (RAG) is an AI framework for improving the quality of LLM-generated responses by grounding the model on external sources of knowledge to supplement the LLM’s internal representation of information.
In the rest of this blog, I will demonstrate how to use a set of open-source tools to retrieve the latest stories and comments on Hacker News, how to turn the text into embeddings, and send them to vector databases. During this process, you can use Apache Kafka and streaming SQL to pre-process or filter the data. This is a simplified version of real-time RAG.
All code shared in this tutorial is available here. When you run the streamlit web application, you can search for certain topics. It will show related discussions which are taking place on Hacker News in real-time.
Here’s a screenshot of the demo application:
Overall data flow
Input: Stream stories and comments from Hacker News API.
Pre-process: Retrieve updates and filter for stories/comments via Bytewax.
Retrieve Content: Download the html and parse it into usable text. Thanks to the awesome Unstructured.io.
Vectorize: Create an embedding or list of embeddings for text using Hugging Face Transformers.
Output: Write the JSON document with embeddings and other fields to a local Timeplus Proton server, into the data streams.
Filter/Route: Use streaming SQL to filter/transform JSON data and send output to a local Kafka server.
Forward: Use Milvus plugin for Kafka Connect to forward the data to Zilliz Cloud.
Query: Build a streamlit web application to query the live data with the same embedding model.
Check out all the code here.
Step-By-Step Walkthrough
Prerequisites: Set up the development environment
Python 3.11 is recommended to set up a virtual environment and install all dependencies:
python3.11 -m venv py311
source py311/bin/activate
pip install -r requirements.txt
Please check the full requirements.txt for details. The key dependencies are:
torch==2.2.0
transformers>=4.38.0
unstructured==0.12.4
bytewax==0.18.2
proton-driver
pymilvus
streamlit
You also need to install Timeplus locally, instead of as a Docker container, so that it can connect to the local Kafka server without extra configuration.
Create a new folder and run following commands:
curl https://install.timeplus.com | sh
proton server
The real-time Hacker News feed will arrive in Timeplus first. We will set up Apache Kafka and Kafka Connect later on (in Step 7), to forward the data to Zilliz Cloud as the fully-managed cloud vector database.
Step 1: Get data from Hacker News API
You can use Bytewax’s SimplePollingSource to create a new source to load data from Hacker News HTTP API.
First, get the latest story id, available at this link:
import requests
from bytewax.inputs import SimplePollingSource
class HNSource(SimplePollingSource):
def next_item(self):
return(
"GLOBAL_ID",
requests.get("https://hacker-news.firebaseio.com/v0/maxitem.json").json(),
)
Next, get the last 150 items on the first run and get a new ID every 15 seconds from now on.
import bytewax.operators as op
from bytewax.dataflow import Dataflow
def get_id_stream(old_max_id, new_max_id) -> Tuple[str,list]:
if old_max_id is None:
# Get the last 150 items on the first run.
old_max_id = new_max_id - 150
return ic(new_max_id, range(old_max_id, new_max_id))
# main entry
def run_hn_flow(polling_interval=15):
flow = Dataflow("hn_stream")
max_id = op.input("in", flow, HNSource(timedelta(seconds=polling_interval)))
id_stream = op.stateful_map("range", max_id, lambda: None, get_id_stream).then(
op.flat_map, "strip_key_flatten", lambda key_ids: key_ids[1]).then(op.redistribute, "scaling")
Step 2: Get the metadata
Once you’ve got the Hacker News item id, you can download the text content and other metadata.
import bytewax.operators as op
def download_metadata(hn_id) -> Optional[dict]:
return requests.get(
f"https://hacker-news.firebaseio.com/v0/item/{hn_id}.json"
).json()
def run_hn_flow(polling_interval=15):
..
enriched = op.filter_map("enrich", id_stream, download_metadata)
Step 3: Retrieve the text and parse HTML content
The Hacker News story can contain a URL link, while the comments are in plain text. You can use Python built-in libraries to remove HTML tags. Unstructured also provides handy text clean up functions.
import html
import re
from unstructured.cleaners.core import (
clean,
replace_unicode_quotes,
clean_non_ascii_chars,
)
from unstructured.staging.huggingface import chunk_by_attention_window
pattern = re.compile("<.*?>")
def prep_text(metadata_content, tokenizer):
metadata_content["text"] = html.unescape(metadata_content["text"])
metadata_content["text"] = re.sub(pattern, "", metadata_content["text"])
metadata_content["text"] = clean_non_ascii_chars(
replace_unicode_quotes(metadata_content["text"])
)
metadata_content["text"] = chunk_by_attention_window(
metadata_content["text"], tokenizer
)
return metadata_content
def run_hn_flow(polling_interval=15):
..
comments = op.map(
"clean_text", comments, lambda document: prep_text(document, tokenizer)
)
You can also leverage Unstructured to parse the HTML content of the story link and extract and clean up the web content. Please check utils.py for details.
Step 4: Vectorize - Create text embeddings
So, we’ve got the text content for Hacker News stories and comments. The next step is to vectorize the content and create embeddings. Embedding is a magical process to “compress” the information as an array of float numbers, no matter how long the sentence is. For individual words, a good embedding model can create “vectors” that are close enough for similar words. For example, in this 2-dimensional model, the x and y for “football” is significantly closer to the x and y for “soccer”, instead of the coordinate for “footwear”.
In our example, we will create high-dimensional embeddings with 384 FloatVector. If you don’t have one already, create a free account in Zilliz Cloud and create a new collection called “comments” with the following schema:
You can use OpenAI’s embedding API to create high quality embeddings, but it’s not the only option. In this tutorial, we will be using Hugging Facing’s state-of-the-art machine learning for PyTorch, TensorFlow, and JAX. The Auto Classes can automatically retrieve the relevant model given the name/path to the pretrained weights/config/vocabulary.
from transformers import AutoTokenizer, AutoModel
import torch
tokenizer = AutoTokenizer.from_pretrained("sentence-transformers/all-MiniLM-L6-v2")
model = AutoModel.from_pretrained("sentence-transformers/all-MiniLM-L6-v2")
def hf_document_embed(chunk, tokenizer, model, torch, length=384):
inputs = tokenizer(
chunk, padding=True, truncation=True, return_tensors="pt", max_length=length
)
with torch.no_grad():
embed = model(**inputs).last_hidden_state[:, 0].cpu().detach().numpy()
return embed.flatten()
def run_hn_flow(polling_interval=15):
..
comments = op.map(
"comment_embeddings",
comments,
lambda document: hf_document_embed(
document, tokenizer, model, torch, length=VECTOR_DIMENSIONS
),
)
Step 5: Write JSON documents with embeddings and other fields to Timeplus Proton
As the last step of the Bytewax data flow, create an output to send the streaming results to a data stream in Timeplus Proton:
def run_hn_flow(polling_interval=15):
..
op.output(
"comments_out",
comments,
ProtonSink("hn_comments_raw", os.environ.get("PROTON_HOST","127.0.0.1"))
)
The ProtonSink extends Bytewax’s DynamicSink. It’ll call CREATE STREAM IF NOT EXISTS DDL to create a stream with a single string column if the specified stream doesn’t exist. Then, run a set of INSERT INTO to send the JSON string into the stream. Please note that the embedding results are NumPy array, which is not serializable with Python’s default JSONEncoder, so we built our own encoder:
from bytewax.outputs import DynamicSink, StatelessSinkPartition
from proton_driver import client
import json
import numpy as np
class NumpyEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, np.ndarray):
return obj.tolist()
elif isinstance(obj, bytes):
return "bytes"
return json.JSONEncoder.default(self, obj)
class _ProtonSinkPartition(StatelessSinkPartition):
def __init__(self, stream: str, host: str):
self.client=client.Client(host=host, port=8463)
self.stream=stream
sql=f"CREATE STREAM IF NOT EXISTS `{stream}` (raw string)"
self.client.execute(sql)
def write_batch(self, items):
rows=[]
for item in items:
str=json.dumps(item[0],cls=NumpyEncoder)
rows.append([str]) # single column in each row
sql = f"INSERT INTO `{self.stream}` (raw) VALUES"
self.client.execute(sql,rows)
class ProtonSink(DynamicSink):
def __init__(self, stream: str, host: str):
self.stream = stream
self.host = host if host is not None and host != "" else "127.0.0.1"
def build(self, worker_index, worker_count):
return _ProtonSinkPartition(self.stream, self.host)
Start the python script via:
python -m bytewax.run "pipeline:run_hn_flow()"
Then, check the data in Timeplus by launching the Proton client:
proton client -h 127.0.0.1
Run this SQL to check the sample data:
select raw from table(hn_comments_raw) limit 1
The sample output is:
{"key_id": "39997057_0", "by": "jdsully", "id": 39997057, "parent": 39997048, "time": 1712794288, "type": "comment", "root_id": 39996504, "text": "They asked for it and at that time you didnt really need to justify it.", "doc_embedding": [-0.05919811129570007, 0.35973143577575684, -0.049948230385780334, .., -0.14836473762989044]}
Step 6: Filter/Route - Use streaming SQL to filter/transform JSON data and send output to a local Kafka server
The following SQL statements will set up a pipeline to read all hacker news comments and send them to the Kafka topic. In the next step, we will set up zilliz-kafka-connect-milvus to forward the live data to Zilliz Cloud.
CREATE EXTERNAL STREAM comments_topic(
key_id string, by string, id int64, parent int64, time int64, type string,
root_id int64, text string, doc_embedding array(float32))
SETTINGS type='kafka',
brokers='localhost:9092',
topic='comments',
data_format='JSONEachRow',
one_message_per_row=true;
CREATE MATERIALIZED VIEW mv INTO comments_topic AS
SELECT
raw:key_id as key_id, raw:by as by, raw:id::int64 as id, raw:parent::int64 as parent, raw:time::int64 as time, raw:type as type, raw:root_id::int64 as root_id, raw:text as text, cast(raw:doc_embedding,'array(float32)') AS doc_embedding
FROM hn_comments_raw;
You can even filter data with Timeplus Proton to only send relevant content with embedding to Zilliz, such as adding the following WHERE clause to the end of the MATERIALIZED VIEW SQL:
WHERE text ilike '%play%'
Step 7: Forward the data to Zilliz Cloud via Kafka Connect
In the original Bytewax demo app, Milvus Lite is used to run Milvus easily in Python applications. Meanwhile in this example, we are integrating with the managed Milvus in the cloud, i.e. Zilliz Cloud.
Timeplus Proton will write transformed data to Kafka topics. The data is then forwarded to Zilliz Cloud via https://github.com/zilliztech/kafka-connect-milvus.
Please follow the kafka-connect-milvus docs for detailed installation steps. In short, you need to:
Set up JVM
Install Apache Kafka with Kafka Connect, such as kafka_2.13-3.6.1
Put zilliz-kafka-connect-milvus-0.1.1 folder in kafka_2.13-3.6.1\libs\
Edit the milvus-sink-connector.properties file with proper Milvus/Zilliz endpoint and token. Then, put it in kafka_2.13-3.6.1\config\:
name=zilliz-kafka-connect-milvus
connector.class=com.milvus.io.kafka.MilvusSinkConnector
public.endpoint=https://THE_ID.api.gcp-us-west1.zillizcloud.com
token=THE_TOKEN
collection.name=comments
topics=comments
Start the Kafka stack by opening a terminal window and changing directory to kafka_2.13-3.6.1, then
1. Start the ZooKeeper service via bin/zookeeper-server-start.sh config/zookeeper.properties
2. Start the Kafka broker service via bin/kafka-server-start.sh config/server.properties
3. Create a topic via bin/kafka-topics.sh --create --topic comments --bootstrap-server localhost:9092
4. Start the Kafka Connect service via bin/connect-standalone.sh config/connect-standalone.properties config/milvus-sink-connector.properties
Keep the Python script, Timeplus server, and Kafka server all running. You will notice new data is added to the collection in Zilliz Cloud.
Step 8: Build a web application to query real-time data
In this final step, we will build a chatbot or search engine to validate your RAG. Make sure the question you are going to ask is transformed to an embedding with the same model. In this example, I choose Snowflake’s streamlit Python framework to build the web application.
The key code snippet is as follows:
import streamlit as st
import pymilvus
def hf_document_embed(chunk, tokenizer, model, torch, length=384):
..
with st.form("my_form"):
st.image("banner.png")
text_val = st.text_area("What's going on at Hacker News today?",value="What's new for Play Station?")
submitted = st.form_submit_button("Go!")
if submitted:
pymilvus.connections.connect(uri=configs["public.endpoint"].data, token=configs["token"].data)
collection = pymilvus.Collection(configs["collection.name"].data)
embedding = hf_document_embed(
text_val, tokenizer, model, torch, length=VECTOR_DIMENSIONS
)
results = collection.search(data=[embedding], anns_field="doc_embedding", param={'level':3}, limit=10, output_fields=["by", "time","text"], expr=f'type == "comment"', consistency_level="Strong")
with st.container(height=500):
for r in results[0]:
matching = r.entity
time_diff = datetime.now() - datetime.fromtimestamp(matching.get('time'))
minutes_ago = divmod(time_diff.total_seconds(), 60)[0]
time_display = datetime.fromtimestamp(matching.get('time'))
if minutes_ago < 120:
time_display = f"{int(minutes_ago)} minutes ago"
st.markdown(f"{time_display} - {matching.get('by')} 🗣️\n> {matching.get('text')}")
st.slider("distance:",min_value=0.0, max_value=1.0,value=r.distance,key=r.id,disabled=True,label_visibility="collapsed")
Please note:
We use pymilvus lib to connect to Zilliz Cloud with API token, and call collection.search function to ask questions.
The question is turned to embedding with the same Hugging Face model.
We only show the top 10 most relevant content from Hacker News. Milvus/Zilliz allows you to specify an expression for metadata search (“type==comment” in this example), and return other metadata field (by, time, and text in this example)
Congratulations! You just built a real-time RAG with top-notch open source tools including: Bytewax, Unstructured, Hugging Face, Timeplus, Apache Kafka, and Zilliz.
For reference, the full source code is available in the examples folder of the Timeplus Proton repo.
By leveraging those open source tools, you can push your AI application to the next level by connecting it with the real world, with real-time data.
Got questions for comments? Share in the Discussions on https://github.com/timeplus-io/proton or join our community Slack: https://timeplus.com/slack.