How to Build LLM Applications With Pgvector Vector Store in LangChain
LangChain and Pgvector: Up and Running
LangChain is one of the most popular frameworks for building applications with large language models (LLMs). This blog post is a guide to building LLM applications with the LangChain framework in Python. We will use PostgreSQL and pgvector as a vector database for OpenAI embeddings of data.
We'll use the example of creating a chatbot to answer questions about the blog posts from the Timescale blog. This example will illustrate the following concepts:
- How to prepare your documents for insertion into PostgreSQL and pgvector using LangChain document transformer TextSplitter.
- How to create embeddings from your data using the OpenAI embeddings model and insert them into PostgreSQL and pgvector.
- How to use embeddings retrieved from a vector database to augment LLM generation.
The LLM application building process involves creating embeddings, storing data, splitting and loading CSV files, performing similarity searches, and using Retrieval Augmented Generation.
This is a great first step for more advanced LangChain projects in Python like creating a chatbot for your company documentation or an application to answer questions from uploaded PDFs.
Let's get started!
Setup and Configuration
- Sign up for an OpenAI Developer Account and create an API Key. See OpenAI's developer platform.
- Install Python.
- Install and configure a Python virtual environment. We recommend pyenv.
- Install the requirements for this notebook using the following command:
pip install -r requirements.txt
Or, if you already have LangChain installed, run pip install --upgrade langchain
.
import os
# Run export OPENAI_API_KEY=sk-YOUR_OPENAI_API_KEY...
# Get openAI api key by reading local .env file
from dotenv import load_dotenv, find_dotenv
_ = load_dotenv(find_dotenv())
OPENAI_API_KEY = os.environ['OPENAI_API_KEY']
Next, we need a way for LangChain to interact with PostgreSQL and pgvector. This is achieved by importing the PGVector class from the langchain.vectorstores
package as follows.
from langchain.vectorstores.pgvector import PGVector
Next, we'll construct our connection string for LangChain to connect to our PostgreSQL database.
Because LangChain uses SQLAlchemy to connect to SQL databases like PostgreSQL, we need to create our connection string programmatically, reading each of the string's components (host, database name, password, port, etc.) from our environment variables.
In this example, we'll use a PostgreSQL database with pgvector installed and hosted on Timescale. You can create your own cloud PostgreSQL database in minutes using this link to follow along.
If you're using a Timescale database, you can find all this information in the "Cheat Sheet" file you download when creating your new database service. Alternatively, you can also use a local PostgreSQL database if you prefer.
# Build the PGVector Connection String from params
# Found in the credential cheat-sheet or "Connection Info" in the Timescale console
# In terminal, run: export VAR_NAME=value for each of the values below
host= os.environ['TIMESCALE_HOST']
port= os.environ['TIMESCALE_PORT']
user= os.environ['TIMESCALE_USER']
password= os.environ['TIMESCALE_PASSWORD']
dbname= os.environ['TIMESCALE_DBNAME']
# We use postgresql rather than postgres in the conn string since LangChain uses sqlalchemy under the hood
# You can remove the ?sslmode=require if you have a local PostgreSQL instance running without SSL
CONNECTION_STRING = f"postgresql+psycopg2://{user}:{password}@{host}:{port}/{dbname}?sslmode=require"
Ensure you have the pgvector extension installed in your database. You can install it by running the following code. See section 2.1 here for how to install with psycopg2 and Python if you prefer.
CREATE EXTENSION IF NOT EXISTS vector;
Part 1: Use LangChain to split a CSV file into smaller chunks while preserving associated metadata
In this section, we will parse our CSV file into smaller chunks for similarity search and retrieval, with help from LangChains TokenTextSplitter.
First, let's take a look at the CSV file we'll be working with:
import pandas as pd
import numpy as np
df = pd.read_csv('blog_posts_data.csv')
df.head()
title | content | url | |
0 | How to Build a Weather Station With Elixir, Ne... | This is an installment of our “Community Membe... | https://www.timescale.com/blog/how-to-build-a-... |
1 | CloudQuery on Using PostgreSQL for Cloud Asset... | This is an installment of our “Community Membe... | https://www.timescale.com/blog/cloudquery-on-u... |
2 | How a Data Scientist Is Building a Time-Series... | This is an installment of our “Community Membe... | https://www.timescale.com/blog/how-a-data-scie... |
3 | How Conserv Safeguards History: Building an En... | This is an installment of our “Community Membe... | https://www.timescale.com/blog/how-conserv-saf... |
4 | How Messari Uses Data to Open the Cryptoeconom... | This is an installment of our “Community Membe... | https://www.timescale.com/blog/how-messari-use... |
As shown above, this is a CSV file of blog posts about Timescale use cases, in which the developers behind each project explain more about their data infra goals, how they used Timescale to achieve them and share success tips.
Ordinarily, we would use the LangChain CSVLoader to load the contents of a CSV file. But in this case, we need to preprocess the content column of our CSV to prepare for creating embeddings of each blog post within the token limits of the OpenAI embeddings API.
We also need a way to split the text of the content column of the CSV while retaining the associated metadata with that text (i.e., the blog title and URL).
LangChain has several built-in document transformers that make it easy to split, combine, filter, and otherwise manipulate documents.
We'll use LangChain's Token Text Splitter to help us split up the content column of our CSV into chunks of a specified token amount. Alternatively, you can use the Recursive Character Text Splitter if you'd rather split text by the number of characters rather than tokens.
We will split the text into chunks of around 512 tokens, with a 20 % or 103 token overlap.
import tiktoken
from langchain.text_splitter import TokenTextSplitter
# Split text into chunks of 512 tokens, with 20% token overlap
text_splitter = TokenTextSplitter(chunk_size=512,chunk_overlap=103)
Here’s how we’ll split up the chunks:
# Helper func: calculate number of tokens
def num_tokens_from_string(string: str, encoding_name = "cl100k_base") -> int:
if not string:
return 0
# Returns the number of tokens in a text string
encoding = tiktoken.get_encoding(encoding_name)
num_tokens = len(encoding.encode(string))
return num_tokens
#list for smaller chunked text and metadata
new_list = []
# Create a new list by splitting up text into token sizes of around 512 tokens
for i in range(len(df.index)):
text = df['content'][i]
token_len = num_tokens_from_string(text)
if token_len <= 512:
new_list.append([df['title'][i],
df['content'][i],
df['url'][i]])
else:
#split text into chunks using text splitter
split_text = text_splitter.split_text(text)
for j in range(len(split_text)):
new_list.append([df['title'][i],
split_text[j],
df['url'][i]])
Let's take a look at how the content looks after being split:
df_new = pd.DataFrame(new_list, columns=['title', 'content', 'url'])
df_new.head()
Part 2: Insert OpenAI embeddings into PostgreSQL and Pgvector
Now that our original CSV has been split into smaller chunks and the associated metadata preserved, we will use the LangChain Pandas DataFrame Loader to load data from our new Pandas data frame and insert it into our PostgreSQL database with pgvector installed.
Note that we must specify which column in the data frame contains the text we'll create embeddings for.
#load documents from Pandas dataframe for insertion into database
from langchain.document_loaders import DataFrameLoader
# page_content_column is the column name in the dataframe to create embeddings for
loader = DataFrameLoader(df_new, page_content_column = 'content')
docs = loader.load()
We'll use the OpenAI embeddings model for our documents, so let's import the OpenAIEmbeddings
module from the langchain.embeddings
package and create an instance.
This instance can be used to generate embeddings for text data using the OpenAI API.
from langchain.embeddings import OpenAIEmbeddings
embeddings = OpenAIEmbeddings()
Before we create embeddings for all the data in our data frame, let's briefly overview how creating an embedding works.
Here's how we create an embedding for a string:
# Create OpenAI embedding using LangChain's OpenAIEmbeddings class
query_string = "PostgreSQL is my favorite database"
embed = embeddings.embed_query(query_string)
print(len(embed)) # Should be 1536, the dimensionality of OpenAI embeddings
print(embed[:5]) # Should be a list of floats
For the main event, we'll connect to our PostgreSQL database and store the documents we loaded along with their embeddings.
Thanks to LangChain, creating the embeddings and storing the data in our PostgreSQL database is a one-command operation!
We pass in the following arguments:
documents
: The documents we loaded from the Pandas Data Frame.embedding
: Our instance of the OpenAI embeddings class, the model we'll use to create the embeddings.collection_name
: The name of the table we want our embeddings and metadata to live in.distance_strategy
: The distance strategy we want to use to calculate the distance between vectors—in our case, we'll use Cosine distance.connection_string
: The connection string to our PostgreSQL database, which we constructed in the setup section.
# Create a PGVector instance to house the documents and embeddings
from langchain.vectorstores.pgvector import DistanceStrategy
db = PGVector.from_documents(
documents= docs,
embedding = embeddings,
collection_name= "blog_posts",
distance_strategy = DistanceStrategy.COSINE,
connection_string=CONNECTION_STRING)
Now that our data is in the database, let's perform a similarity search to fetch the documents most similar to a query:
from langchain.schema import Document
# Query for which we want to find semantically similar documents
query = "Tell me about how Edeva uses Timescale?"
#Fetch the k=3 most similar documents
docs = db.similarity_search(query, k=3)
The query on our database returns a list of LangChain documents, so let's learn how to interact with those:
# Interact with a document returned from the similarity search on pgvector
doc = docs[0]
# Access the document's content
doc_content = doc.page_content
# Access the document's metadata object
doc_metadata = doc.metadata
print("Content snippet:" + doc_content[:500])
print("Document title: " + doc_metadata['title'])
print("Document url: " + doc_metadata['url'])
Content snippet: map applications. If you are planning to store time-series data, Timescale is the way to go. It makes it easy to get started because it is “just” SQL, and at the same time, you get the important features needed to work with time-series data. I recommend you have a look, especially at continuous aggregations. Think about the whole lifecycle when you start. Will your use cases allow you to use features like compression, or do you need to think about how to store long-term data outside of TimescaleDB Document title: How Edeva Uses Continuous Aggregations and IoT to Build Smarter Cities Document url: https://www.timescale.com/blog/how-edeva-uses-continuous-aggregations-and-iot-to-build-smarter-cities/ |
Part 3: Question answering with retrieval-augmented generation
Next, let's tie everything we've learned together and build a simple question-answering application using LangChain and an LLM from OpenAI. Our application will respond with the most relevant documents from our database.
This technique is called retrieval-augmented generation (RAG) and works as follows:
- Create an embedding vector for the user question.
- Use pgvector to perform a vector similarity search and retrieve the k nearest neighbors to the question embedding from our database of embedding vectors representing the blog content. In our example, we’ll use k=3, finding the three most similar embedding vectors and associated content.
- Supply the content retrieved from the database as additional context to the model and ask it to perform a completion task to answer the user question.
To more easily retrieve documents from our PostgreSQL vector database, we'll use a LangChain retriever.
In LangChain, a retriever is an interface that returns documents given an unstructured query. A retriever's main purpose is only to return (or retrieve) documents.
We will use a vector store-backed retriever. It is a lightweight wrapper around the Vector Store class to make it conform to the retriever interface. It uses the search methods implemented by a vector store, like similarity search, to query the texts in the vector store.
# Create retriever from database
# We specify the number of results we want to retrieve (k=3)
retriever = db.as_retriever(
search_kwargs={"k": 3}
)
Next, we'll import the LLM we want to use to generate a response to our question. In this case, we'll use OpenAI's GPT-3.5 model with a 16k token context window so that we won't have any trouble fitting in retrieved documents as context in addition to the user question.
from langchain.chat_models import ChatOpenAI
llm = ChatOpenAI(temperature = 0.0, model = 'gpt-3.5-turbo-16k')
Then, we'll use one of the most useful chains in LangChain, the Retrieval Q+A chain, which is used for question answering over a vector database (vector store or index, as it’s also known).
We'll combine it with a stuff chain, which takes a list of documents, inserts them all into a prompt (stuffs them in), and passes that prompt to an LLM.
from langchain.chains import RetrievalQA
qa_stuff = RetrievalQA.from_chain_type(
llm=llm,
chain_type="stuff",
retriever=retriever,
verbose=True,
)
And for the final ingredient, let's formulate a question we want the model to answer with the help of the documents in our database and pass it to our chain to process.
query = "How does Edeva use continuous aggregates?"
response = qa_stuff.run(query)
from IPython.display import Markdown, display
display(Markdown(response))
Edeva uses continuous aggregates in their smart city platform, EdevaLive. They collect large amounts of data from IoT devices, including traffic flow data from their dynamic speed bump called Actibump. Continuous aggregates allow them to roll up multiple resolutions of their sensor account data and people count data, making it available in a more efficient way. This helps them analyze and visualize the data faster, enabling them to provide valuable remote monitoring services and statistics to their customers. They also use continuous aggregates to roll up high-resolution data to lower resolutions, optimizing their data processing. |
Bonus: Cite Your Sources With LangChain and Pgvector for RAG
For even more advanced functionality, you might want your answer to include the sources used to give users peace of mind. Here's how you can do that with the RetrievalQA chain using the return_source_documents
argument:
# New chain to return context and sources
qa_stuff_with_sources = RetrievalQA.from_chain_type(
llm=llm,
chain_type="stuff",
retriever=retriever,
return_source_documents=True,
verbose=True,
)
query = "How does Edeva use continuous aggregates?"
# To run the query, we use a different syntax since we're returning more than just the response text
responses = qa_stuff_with_sources({"query": query})
And finally, let's print out the result with the source document cited:
source_documents = responses["source_documents"]
source_content = [doc.page_content for doc in source_documents]
source_metadata = [doc.metadata for doc in source_documents]
# Construct a single string with the LLM output and the source titles and urls
def construct_result_with_sources():
result = responses['result']
result += "\n\n"
result += "Sources used:"
for i in range(len(source_content)):
result += "\n\n"
result += source_metadata[i]['title']
result += "\n\n"
result += source_metadata[i]['url']
return result
display(Markdown(construct_result_with_sources()))
Edeva uses continuous aggregates in their smart city platform, EdevaLive. They collect large amounts of data from IoT devices, including traffic flow data from their dynamic speed bump called Actibump. Continuous aggregates allow them to roll up multiple resolutions of their sensor account data and people count data, making it available in a more efficient way. This helps them analyze and visualize the data faster, enabling them to provide valuable remote monitoring services and statistics to their customers. They also use continuous aggregates to roll up high-resolution data to lower resolutions, optimizing their data processing. Sources used: How Edeva Uses Continuous Aggregations and IoT to Build Smarter Cities How Density Manages Large Real Estate Portfolios Using TimescaleDB https://www.timescale.com/blog/density-measures-large-real-estate-portfolios-using-timescaledb/ How Edeva Uses Continuous Aggregations and IoT to Build Smarter Cities |
The “cite your sources” functionality is helpful because it can help explain unexpected responses from the model should it retrieve irrelevant but similar documents.
Next Steps
- Check out Conversational Retrieval QA Chain for adding memory and using what you learned in a chatbot conversation setting.
- Learn how pgvector finds approximate nearest neighbors in this blog post: What Are ivfflat Indexes in pgvector and How Do They Work.
- Use Chainlit to build your own LLM chatbot in Python (Timescale tutorial coming soon!).
And if you’re looking for a production PostgreSQL database for your vector workloads, try Timescale Cloud. It’s free for 30 days, no credit card required.