A Step-By-Step Guide
Retrieval-Augmented Generation (RAG) is rapidly becoming a cornerstone in the world of artificial intelligence. According to the latest report from Menlo Ventures, RAG now leads with a 51% adoption rate—a remarkable leap from 31% just last year.
But what exactly is RAG, and why is it experiencing such explosive growth?
In this blog, I’ll explain the RAG concept and its immense popularity through a practical example: building an end-to-end question-answering system based on Timeplus knowledge using RAG. Whether you're an AI enthusiast or a developer looking to implement cutting-edge solutions, this walkthrough will help you understand how RAG bridges generative AI and real-time retrieval to deliver exceptional results.
What is Retrieval-Augmented Generation ?
Retrieval-Augmented Generation (RAG) is an AI/LLM method that combines searching for relevant information (retrieval) with generating responses (generation). This makes LLM answers more accurate and up-to-date by using real-time data instead of relying only on pre-trained knowledge.
Why is RAG Useful?
Enhanced AccuracyBy grounding responses in retrieved information, RAG reduces the risk of "hallucinations"—situations where the model generates plausible but incorrect answers.
Domain SpecificityIt allows for the integration of private or specialized knowledge bases, making it ideal for industries like healthcare, finance, and customer support.
Dynamic KnowledgeRAG can incorporate the latest data and updates, overcoming the limitations of static training datasets.
EfficiencyInstead of relying on a massive, all-encompassing model, RAG can use smaller, targeted knowledge sources, reducing computational overhead.
So, let's go with a real world examples to explain how to use RAG.
Question-Answer System Architecture
Our goal is to build an LLM-based question-answering system to answer user questions about Timeplus. Here’s what we’ll build:
Workflow
User Input: A user asks a question about Timeplus.
Text Embedding: The application converts the input question into an embedding using Ollama.
Vector Search: The application uses the embedding to run a vector search on the knowledge database to retrieve relevant information.
Prompt Construction: The application constructs a prompt using the retrieved information.
Response Generation: The application sends the prompt to the LLM, which generates a response based on the retrieved information.
Build a Vector Store of Knowledge using Timeplus
The key component of RAG is a vector database, which stores knowledge in vector format and enables semantic searches based on vector similarity. In this case, we’ll use Timeplus as our vector database.
Steps to Build the Vector Store
Convert all Timeplus knowledge documents from text to vector format.
Save the data into a Timeplus stream.
Here’s the schema for the Timeplus stream:
CREATE STREAM IF NOT EXISTS vector_store (
`name` string,
`id` string DEFAULT to_string(uuid()),
`text` string,
`vector` array(float64),
`metadata` map(string, string)
);
name: Name of the document collection.
id: A generated unique ID.
text: The original document text.
vector: Embedding of the text (an array of float64).
metadata: Additional metadata for filtering and search.
Generating Embeddings with Ollama
We’ll use Ollama to generate embeddings for the documents. Ollama supports running large language models locally on your machine. Here’s the Python code:
import os
from openai import OpenAI
client = OpenAI(
base_url=os.getenv("LLM_BASE_URL"),
api_key="ollama"
)
def embedding(input):
response = client.embeddings.create(
input=input,
model="mxbai-embed-large:latest"
)
return response.data[0].embedding
Indexing Documents into the Vector Store
The following script reads documents from a local folder, generates embeddings, and inserts the data into the Timeplus stream:
import os
from pathlib import Path
from proton_driver import client
timeplus_host = os.getenv("TIMEPLUS_HOST")
timeplus_user = os.getenv("TIMEPLUS_USER")
timeplus_password = os.getenv("TIMEPLUS_PASSWORD")
c = client.Client(
host=timeplus_host, port=8463, user=timeplus_user, password=timeplus_password
)
def read_files_from_path(folder_path, suffix):
text_content = []
for file_path in Path(folder_path).glob(f"*.{suffix}"):
with file_path.open("r", encoding="utf-8") as file:
text_content.append((file.read(), file_path))
return text_content
class Indexer:
def __init__(self, name, path):
self._doc_path = path
self._name = name
def index(self):
doc_texts = read_files_from_path(self._doc_path, "md")
for content in doc_texts:
text = content[0]
filename = os.path.basename(content[1])
embedding_vector = embedding(input=text)
metadata = {"filename": filename}
c.execute(
"INSERT INTO vector_store (name, text, vector, metadata) VALUES",
[[self._name, text, embedding_vector, metadata]]
)
indexer = Indexer("timeplus_doc", "./path/to/docs")
indexer.index()
In the above python code, we read all the documents one by one, turn these documents into vector format and insert into stream vector_store. We use the file path as the metadata. And all named under timeplus_doc.
Now we have indexed all the related knowledge into a Timeplus stream, next I am going to show how we can run a vector search.
Run A Vector Search
With the vector database populated, we can run a vector search to find the most relevant documents. The similarity between vectors is measured using distance metrics such as:
L2 Distance (Euclidean)Measures the straight-line distance between points.
L1 Distance (Manhattan)Measures the grid-like distance between points.
Cosine DistanceMeasures the angle between vectors, focusing on orientation over magnitude.
A vector search will just return the closest vectors based on the distance of the input vector and all other vectors in the store that meet the conditio. This can be represented in following SQL:
SELECT * FROM table(stream)
ORDER BY DistanceFunction(vectors, reference_vector)
LIMIT N;
To run such a search, the reference_vector is the embedding of the text that we want to search. Which also means we need to call the embedding model when running such SQL.
Timeplus Remote UDF can be used to call external services or tools. By leveraging UDF, we run above search SQL without extra data processing that turns our question into embedding.
Here is the sample search SQL:
SELECT
text, metadata, l2_distance(vector, embedding('what is a streaming query')) AS score
FROM
table(vector_store)
ORDER BY
score ASC
LIMIT 3;
According to this sample query, the input question is ‘what is a streaming query’ and the most relevant documents according to l2 distance are those three documents, functions_for_streaming.md, query-syntax.md and working-with-streams.md.
Construct RAG-Based Prompt
As I mentioned, the Key points of RAG are to put relevant context into the prompt. In our case, the RAG means searching the most relevant documents and putting them into the prompt.
Here is the SQL to construct the prompt:
WITH 'what is a streaming query' AS question
SELECT
array_string_concat(array_reduce('group_array', group_array(text))) AS relevant_docs,
concat('Based on following relevant information: ', relevant_docs,' Answer following question : ', question) as prompt
FROM (
SELECT
text, l2_distance(vector, embedding(question)) AS score
FROM
table(vector_store)
ORDER BY
score ASC
LIMIT 3
)
The WITH clause defines what is the question we are going to ask LLM
The subquery in the FROM is the vector search that returns the top 3 most relevant documents
The array_string_concat with array_deduce merge the three documents text into one text, which is used as RAG input for prompt
The concat just build the final prompt, combining the relevant documents and the question together into the final prompt
Running this query we got a prompt with relevant context:
Based on following relevant information: # Streaming Processing
Relevant docs top1 …
Relevant docs top2 …
Relevant docs top3 …
Answer following question : what is a streaming query
Generating Responses Using LLMs
In the last step, we just send this Prompt to LLM and return the result to the User, similar to using the remote UDF for embedding, we use another remote UDF called chat which sends requested text to LLM.
Here is underline python code of the remote chat function:
import os
from openai import OpenAI
client = OpenAI(
base_url=os.getenv("LLM_BASE_URL"),
api_key="ollama"
)
def chat(messages):
response = client.chat.completions.create(
model="llama3.2:1b",
messages=[
{"role": "user", "content": messages},
],
temperature=0
)
return response.choices[0].message.content
We are using local Ollama with model https://ollama.com/library/llama3.2:1b
The final question answer SQL will be:
WITH 'what is a streaming query' AS question
SELECT
array_string_concat(array_reduce('group_array', group_array(text))) AS relevant_docs,
concat('Based on following relevant information: ', relevant_docs,' Answer following question : ', question) as prompt,
chat(prompt) as response
FROM (
SELECT
text, l2_distance(vector, embedding(question)) AS score
FROM
table(vector_store)
ORDER BY
score ASC
LIMIT 3
)
Here comes the answer from that llama3.2:1b model:
A streaming query is a type of SQL query that runs continuously on a stream of data, rather than waiting for the entire dataset to be loaded into memory before executing the query.
In traditional relational databases, queries are typically executed against a fixed dataset, and the results are returned in a single pass. However, with Timeplus, you can create streams that continuously scan new events and output new results as they become available.
Streaming queries allow you to analyze data in real-time, without having to wait for the entire dataset to be loaded into memory. This is particularly useful when working with large datasets or streaming data from IoT devices, social media platforms, or other sources.
Some common use cases for streaming queries include:
* Real-time analytics: Analyzing data as it becomes available, such as tracking website traffic or monitoring stock prices.
* Event-driven processing: Processing events in real-time, such as sending notifications when a new record is inserted into a database.
* IoT data analysis: Analyzing sensor data from IoT devices to gain insights into the behavior of the device.
To create a streaming query with Timeplus, you can specify the stream name and the type of query you want to execute. For example:
```sql
SELECT count(*) FROM stream1 WHERE timestamp > '2022-01-01 00:00:00';
```
This would run a query on the `stream1` stream, counting the number of rows where the `timestamp` field is greater than January 1st, 2022.
Streaming queries can be used with various Timeplus functions, such as:
* `table`: Create or select from a stream.
* `ingest`: Ingest data into a stream.
* `query`: Run a query on a stream.
* `history`: Get the history of a stream.
* `purge`: Purge old records from a stream.
By using streaming queries, you can gain real-time insights into your data and make more informed decisions.
So far, we have successfully built a question answer system and to run such a system, you just need one simple SQL.
Summary
This step-by-step guide demonstrates how to build a RAG-based question-answering system using Ollama and Timeplus. By combining retrieval and generation, you can create AI applications that are both accurate and dynamic, leveraging real-time data to deliver meaningful results. Explore the power of RAG in your own projects today!
The complete code can be found here at https://github.com/timeplus-io/examples/tree/main/rag_question_and_answer_system
To see a demo, check out this video: https://youtu.be/IgSjFKW2o54