Category: All posts
Oct 01, 2024
Many developers mistakenly view RAG applications as simple semantic searches over vector databases that augment large language models (LLMs). However, modern AI apps need a more layered approach to address embedding model limitations.
For example, answering the question "What were the top five most discussed GitHub issues last month related to performance optimization?" requires more than just similarity search, like:
Embedding search alone won’t cut it for a good RAG (retrieval-augmented generation) system.
The good news is that with PostgreSQL and Timescale, we get vector search, time-series capabilities, and all of the flexibility of SQL in a single database. By blending in Timescale with LLMs, we can:
In this post, we'll do four things:
To do this, we’ll ground our conversation in an example application that allows us to answer questions about GitHub issues. To develop the Github issues Q+A app, we'll leverage PostgreSQL and Timescale and implement parallel tool calling and text-to-SQL capabilities to illustrate just how far beyond vector search RAG can go.
And if you'd like to skip to the code, you can find all code snippets used in the blog in the companion GitHub repo.
The most impactful AI applications aren't necessarily the most complex and agentic—they're the ones built on a deep understanding of user requirements. When building new AI applications, our goal should be to explore simple workflows and specialized tooling before diving into complex reasoning and planning systems.
Often, the push for complex systems in product development stems from a reluctance to deeply understand user needs. By truly grasping these needs, you can create simple, effective solutions rather than relying on unreliable complex agents. This approach prevents the risk of ending up with an impressive demo but disappointed customers.
We should always ask ourselves these two questions when approaching a new dataset:
DB_URL
environment variable inside your shell. If you don’t have a Timescale instance configured, you can create one in our Getting Started docs.In the case of GitHub issues, we might recognize that we want to have additional functionality beyond what exists in the dataset. We might care about:
These can be user-centric features and indices that we can use to significantly improve the system's ability to answer the questions we care about.
Data processing adds functionality beyond what's available in the standard dataset. Ultimately, this improves our ability to answer more complex questions that users might pose. Let's walk through building a custom data pipeline for GitHub issues.
Here’s an overview of what we’ll be working on during this section:
Pydantic
models to structure our raw and processed GitHub issue data.pgvectorscale
extension with pgvector
to efficiently store our processed data, setting up appropriate indexes for fast querying and analysis.As a reminder, you can find all the code for this work on our dedicated GitHub repo.
First, let's install the necessary dependencies:
pip install instructor openai tqdm pydantic datasets pgvector asyncpg Jinja2 fuzzywuzzy python-Levenshtein
We'll use Pydantic
models to ensure type safety. To do so, we’ll define two Pydantic
classes: ProcessedIssue
, which represents a generated summary, and GithubIssue
, which will represent the raw data that we will be extracting from the dataset.
from pydantic import BaseModel
from typing import Literal, Any, Optional
from datetime import datetime
class ClassifiedSummary(BaseModel):
chain_of_thought: str
label: Literal["OPEN", "CLOSED"]
summary: str
class ProcessedIssue(BaseModel):
issue_id: int
text: str
label: Literal["OPEN", "CLOSED"]
repo_name: str
embedding: Optional[list[float]]
class GithubIssue(BaseModel):
issue_id: int
metadata: dict[str, Any]
text: str
repo_name: str
start_ts: datetime
end_ts: Optional[datetime]
embedding: Optional[list[float]]
Let's grab some GitHub issues to work with. We'll use the bigcode/the-stack-github-issues
dataset and the datasets
library to make our lives easier.
Here's what we're doing:
take
function to snag a subset of issues. This lets us work with a significantly smaller slice of the dataset, allowing us to iterate faster and do more experiments.from datasets import load_dataset
def get_issues(n: int, repos: list[str]):
dataset = (
load_dataset("bigcode/the-stack-github-issues", split="train", streaming=True)
.filter(lambda x: x["repo"] in repos)
.take(n)
)
for row in dataset:
start_time = None
end_time = None
for event in row["events"]:
event_type = event["action"]
timestamp = event["datetime"]
timestamp = timestamp.replace("Z", "+00:00")
if event_type == "opened":
start_time = datetime.fromisoformat(timestamp)
elif event_type == "closed":
end_time = datetime.fromisoformat(timestamp)
# Small Fall Back here - Some issues have no Creation event
elif event_type == "created" and not start_time:
start_time = datetime.fromisoformat(timestamp)
elif event_type == "reopened" and not start_time:
start_time = datetime.fromisoformat(timestamp)
yield GithubIssue(
issue_id=row["issue_id"],
metadata={},
text=row["content"],
repo_name=row["repo"],
start_ts=start_time,
end_ts=end_time,
embedding=None,
)
We can use Python’s async
functionality and the instructor
library to quickly process issues in parallel. Instead of waiting for each task to finish, we can work on multiple issues simultaneously.
Better yet, to ensure we stay within a reasonable rate limit, we can also use a Semaphore
to control the number of concurrent tasks being executed.
from asyncio import run, Semaphore
from tqdm.asyncio import tqdm_asyncio as asyncio
from textwrap import dedent
from instructor import from_openai
from openai import AsyncOpenAI
from jinja2 import Template
async def batch_classify_issue(
batch: list[GithubIssue], max_concurrent_requests: int = 20
) -> list[ProcessedIssue]:
async def classify_issue(issue: GithubIssue, semaphore: Semaphore):
client = from_openai(AsyncOpenAI())
async with semaphore:
classification = await client.chat.completions.create(
response_model=ClassifiedSummary,
messages=[
{
"role": "system",
"content": "You are a helpful assistant that classifies and summarizes GitHub issues. When summarizing the issues, make sure to expand on specific accronyms and add additional explanation where necessary.",
},
{
"role": "user",
"content": Template(
dedent(
"""
Repo Name: {{ repo_name }}
Issue Text: {{ issue_text}}
"""
)
).render(repo_name=issue.repo_name, issue_text=issue.text),
},
],
model="gpt-4o-mini",
)
return ProcessedIssue(
issue_id=issue.issue_id,
repo_name=issue.repo_name,
text=classification.summary,
label=classification.label,
embedding=None,
)
semaphore = Semaphore(max_concurrent_requests)
coros = [classify_issue(item, semaphore) for item in batch]
results = await asyncio.gather(*coros)
return results
We’ll also define a function to process our embeddings simultaneously. These will be useful for performing similarity search across our different issues using pgvector
and pgvectorscale
in a later section.
from openai import AsyncOpenAI
async def batch_embeddings(
data: list[ProcessedIssue],
max_concurrent_calls: int = 20,
) -> list[ProcessedIssue]:
oai = AsyncOpenAI()
async def embed_row(
item: ProcessedIssue,
semaphore: Semaphore,
):
async with semaphore:
input_text = item.text if len(item.text) < 8000 else item.text[:6000]
embedding = (
(
await oai.embeddings.create(
input=input_text, model="text-embedding-3-small"
)
)
.data[0]
.embedding
)
item.embedding = embedding
return item
semaphore = Semaphore(max_concurrent_calls)
coros = [embed_row(item, semaphore) for item in data]
results = await asyncio.gather(*coros)
return results
Now that we’ve figured out how to process and embed our summaries at scale, we can work on loading it into Timescale. We're using asyncpg
, which will help us automatically batch our insertions using the execute_many
function.
We can manage all our embeddings with pgvectorscale
and pgvector
natively within Timescale. This allows us to enjoy up to 28x lower p95 latency, 16x higher query throughput, and a 75 % cost reduction compared to Pinecone in our latest benchmark.
All we need to do is to enable the pgvectorscale
extension. This will help us set up pgvector
and pgvectorscale
in our Timescale project. Once we've done so, we can create a table for our embeddings and index them for optimal performance.
Let's see how we can do so in the following section.
import os
from pgvector.asyncpg import register_vector
import asyncpg
init_sql = """
CREATE EXTENSION IF NOT EXISTS vectorscale CASCADE;
DROP TABLE IF EXISTS github_issue_summaries CASCADE;
DROP TABLE IF EXISTS github_issues CASCADE;
CREATE TABLE IF NOT EXISTS github_issues (
issue_id INTEGER,
metadata JSONB,
text TEXT,
repo_name TEXT,
start_ts TIMESTAMPTZ NOT NULL,
end_ts TIMESTAMPTZ,
embedding VECTOR(1536) NOT NULL
);
CREATE INDEX github_issue_embedding_idx
ON github_issues
USING diskann (embedding);
-- Create a Hypertable that breaks it down by 1 month intervals
SELECT create_hypertable('github_issues', 'start_ts', chunk_time_interval => INTERVAL '1 month');
CREATE UNIQUE INDEX ON github_issues (issue_id, start_ts);
CREATE TABLE github_issue_summaries (
issue_id INTEGER,
text TEXT,
label issue_label NOT NULL,
repo_name TEXT,
embedding VECTOR(1536) NOT NULL
);
CREATE INDEX github_issue_summaries_embedding_idx
ON github_issue_summaries
USING diskann (embedding);
"""
async def get_conn():
conn = await asyncpg.connect(os.getenv("DB_URL"))
await register_vector(conn)
return conn
conn = await get_conn()
await conn.execute(init_sql)
With our GitHub Issue and Issue Summary tables in place, let's create two functions to populate our database with the relevant information.
import json
async def insert_github_issue_summaries(conn, issues: list[GithubIssue]):
insert_query = """
INSERT INTO github_issue_summaries (issue_id, text, label, embedding,repo_name)
VALUES ($1, $2, $3, $4, $5)
"""
summarized_issues = await batch_classify_issue(issues)
embedded_summaries = await batch_embeddings(summarized_issues)
await conn.executemany(
insert_query,
[
(item.issue_id, item.text, item.label, item.embedding, item.repo_name)
for item in embedded_summaries
],
)
print("GitHub issue summaries inserted successfully.")
async def insert_github_issues(conn, issues: list[GithubIssue]):
insert_query = """
INSERT INTO github_issues (issue_id, metadata, text, repo_name, start_ts, end_ts, embedding)
VALUES ($1, $2, $3, $4, $5, $6, $7)
"""
embedded_issues = await batch_embeddings(issues)
await conn.executemany(
insert_query,
[
(
item.issue_id,
json.dumps(item.metadata),
item.text,
item.repo_name,
item.start_ts,
item.end_ts,
item.embedding,
)
for item in embedded_issues
],
)
print("GitHub issues inserted successfully.")
We can combine our previous functions into a single process_issues
function to ingest GitHub issue data into our database:
async def process_issues():
repos = [
"rust-lang/rust",
"kubernetes/kubernetes",
"apache/spark",
]
conn = await get_conn()
issues = list(get_issues(100,repos))
await insert_github_issues(conn,issues)
await insert_github_issue_summaries(conn,issues)
await process_issues()
We've now created a powerful pipeline that can process our GitHub issue data to extract valuable insights. With that in mind, let’s shift our focus to developing specialized tooling for customer needs using evaluation-driven development.
While we develop the tables and indices we might use to build out the RAG application, we can also engage in eval-driven development and test our language model's ability to choose the right tools before implementing the specific tools.
Here, we can be very creative in expressing the tools we want to give to the language model.
In Python, Pydantic
schemas are great for prototyping agent tools because they create a clear contract for your agent's actions. This contract makes evaluating the performance and the impact of more complex tooling easy before moving on to implementation.
Let’s explore how we can implement this using instructor
, where we have an agent with three tools, as seen below.
Here's how to define these tools using Pydantic
models:
from pydantic import BaseModel, Field
class SearchIssues(BaseModel):
"""
Use this when the user wants to get original issue information from the database
"""
query: Optional[str]
repo: str = Field(
description="the repo to search for issues in, should be in the format of 'owner/repo'"
)
class RunSQLReturnPandas(BaseModel):
"""
Use this function when the user wants to do time-series analysis or data analysis and we don't have a tool that can supply the necessary information
"""
query: str = Field(description="Description of user's query")
repos: list[str] = Field(
description="the repos to run the query on, should be in the format of 'owner/repo'"
)
class SearchSummaries(BaseModel):
"""
This function retrieves summarized information about GitHub issues that match/are similar to a specific query, It's particularly useful for obtaining a quick snapshot of issue trends or patterns within a project.
"""
query: Optional[str] = Field(description="Relevant user query if any")
repo: str = Field(
description="the repo to search for issues in, should be in the format of 'owner/repo'"
)
We can test the model’s ability to choose the appropriate tool(s) for the query using the implementation below with instructor
.
from typing import Iterable, Union
def one_step_agent(question: str):
import instructor
import openai
client = instructor.from_openai(
openai.OpenAI(), mode=instructor.Mode.PARALLEL_TOOLS
)
return client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{
"role": "system",
"content": "You are an AI assistant that helps users query and analyze GitHub issues stored in a PostgreSQL database. Search for summaries when the user wants to understand the trends or patterns within a project. Otherwise just get the issues and return them. Only resort to SQL queries if the other tools are not able to answer the user's query.",
},
{"role": "user", "content": question},
],
response_model=Iterable[
Union[
RunSQLReturnPandas,
SearchIssues,
SearchSummaries,
]
],
)
Testing this agent is simple using the following loop.
Since we expect the agent to call only a single tool for these simple queries, we can verify its ability to identify and choose the appropriate tool for each task correctly. As our test suite expands, we'll likely need to transition to the Async Client for improved efficiency.
tests = [
[
"What is the average time to first response for issues in the azure repository over the last 6 months? Has this metric improved or worsened?",
[RunSQLReturnPandas],
],
[
"How many issues mentioned issues with Cohere in the 'vercel/next.js' repository in the last 6 months?",
[SearchIssues],
],
[
"What were some of the big features that were implemented in the last 4 months for the scipy repo that addressed some previously open issues?",
[SearchSummaries],
],
]
for query, expected_result in tests:
response = one_step_agent(query)
for expected_call, agent_call in zip(expected_result, response):
assert isinstance(agent_call, expected_call)
By doing so, we can build unit-like test cases for model selection, driving a direct parallel between the eval-based testing and unit tests—all without having to test the implementation jointly.
Now, let’s shift gears and see how we can perform embedding search on the GitHub issues we’ve ingested into our database.
It’s as simple as writing a single SQL query once we’ve got our embeddings on hand using the pgvector
and pgvectorscale
extensions in PostgreSQL.
We’ll do so by implementing an execute
method that uses the asyncpg
library on each of the search tools that will return a list of relevant search entries when provided with a user query.
from pydantic import BaseModel, Field
from typing import Optional
from openai import OpenAI
from jinja2 import Template
from asyncpg import Connection
class SearchIssues(BaseModel):
"""
Use this when the user wants to get original issue information from the database
"""
query: Optional[str]
repo: str = Field(
description="the repo to search for issues in, should be in the format of 'owner/repo'"
)
async def execute(self, conn: Connection, limit: int):
if self.query:
embedding = (
OpenAI()
.embeddings.create(input=self.query, model="text-embedding-3-small")
.data[0]
.embedding
)
args = [self.repo, limit, embedding]
else:
args = [self.repo, limit]
embedding = None
sql_query = Template(
"""
SELECT *
FROM {{ table_name }}
WHERE repo_name = $1
{%- if embedding is not none %}
ORDER BY embedding <=> $3
{%- endif %}
LIMIT $2
"""
).render(table_name="github_issues", embedding=embedding)
return await conn.fetch(sql_query, *args)
class RunSQLReturnPandas(BaseModel):
"""
Use this function when the user wants to do time-series analysis or data analysis and we don't have a tool that can supply the necessary information
"""
query: str = Field(description="Description of user's query")
repos: list[str] = Field(
description="the repos to run the query on, should be in the format of 'owner/repo'"
)
async def execute(self, conn: Connection, limit: int):
pass
class SearchSummaries(BaseModel):
"""
This function retrieves summarized information about GitHub issues that match/are similar to a specific query, It's particularly useful for obtaining a quick snapshot of issue trends or patterns within a project.
"""
query: Optional[str] = Field(description="Relevant user query if any")
repo: str = Field(
description="the repo to search for issues in, should be in the format of 'owner/repo'"
)
async def execute(self, conn: Connection, limit: int):
if self.query:
embedding = (
OpenAI()
.embeddings.create(input=self.query, model="text-embedding-3-small")
.data[0]
.embedding
)
args = [self.repo, limit, embedding]
else:
args = [self.repo, limit]
embedding = None
sql_query = Template(
"""
SELECT *
FROM {{ table_name }}
WHERE repo_name = $1
{%- if embedding is not none %}
ORDER BY embedding <=> $3
{%- endif %}
LIMIT $2
"""
).render(table_name="github_issue_summaries", embedding=embedding)
return await conn.fetch(sql_query, *args)
We can then verify that our embedding search is working by running the following snippet of code.
query = "What are the main problems people are facing with installation with Kubernetes"
conn = await get_conn()
limit = 10
resp = await SearchSummaries(query=query, repo="kubernetes/kubernetes").execute(
conn, limit
)
for row in resp[:3]:
print(row["text"])
This results in the following output.
Discussion on the need for better release processes and documentation within the Kubernetes project, with a strong emphasis on improving the user experience for setting up the software on Ubuntu. Users express frustrations over a confusing setup process, the necessity for synchronized documentation updates with code changes, and propose structured solutions for improving documentation efficacy.
The issue involved failures in creating a Kubernetes pod sandbox due to the Calico Network Plugin not functioning correctly. The user described encountering multiple error messages regarding pod creation and provided environmental details, including Kubernetes version and CentOS configuration. Suggestions from other users included downgrading Calico or updating the network plugin. The issue was marked as resolved after the user confirmed that using Calico version 2.6 was successful.
User reported an issue with the 'kubectl top' command failing due to an unmarshalling error after creating a Kubernetes cluster with kubeadm and deploying Heapster. The error was resolved by removing the HTTP proxy settings from the kube-apiserver configuration, leading to questions about the command's requirements.
Just like that, we've filtered our results to a specific repository while still leveraging the power of embedding search.
But what happens if users make spelling mistakes such as kubrntes
instead of kubernetes/kubernetes
or fail to spell out the entire repository in their prompt?
We can get around this by using the fuzzywuzzy
library to do string matching with the following function.
from fuzzywuzzy import process
def find_closest_repo(query: str, repos: list[str]) -> str | None:
if not query:
return None
best_match = process.extractOne(query, repos)
return best_match[0] if best_match[1] >= 80 else None
We can verify that this works with a few unit tests below.
repos = [
"rust-lang/rust",
"kubernetes/kubernetes",
"apache/spark",
"golang/go",
"tensorflow/tensorflow",
"MicrosoftDocs/azure-docs",
"pytorch/pytorch",
"Microsoft/TypeScript",
"python/cpython",
"facebook/react",
"django/django",
"rails/rails",
"bitcoin/bitcoin",
"nodejs/node",
"ocaml/opam-repository",
"apache/airflow",
"scipy/scipy",
"vercel/next.js",
]
test = [
["kuberntes", "kubernetes/kubernetes"],
["next.js", "vercel/next.js"],
["scipy", "scipy/scipy"],
["", None],
["fakerepo", None],
]
for query, expected in test:
assert find_closest_repo(query, repos) == expected
We can then modify our original tools to use this new find_closest_repo
function.
from pydantic import BaseModel, Field
from typing import Optional
from openai import OpenAI
from jinja2 import Template
from asyncpg import Connection
class SearchIssues(BaseModel):
"""
Use this when the user wants to get original issue information from the database
"""
query: Optional[str]
repo: str = Field(
description="the repo to search for issues in, should be in the format of 'owner/repo'"
)
@field_validator("repo")
def validate_repo(cls, v: str, info: ValidationInfo):
matched_repo = find_closest_repo(v, info.context["repos"])
if matched_repo is None:
raise ValueError(
f"Unable to match repo {v} to a list of known repos of {info.context['repos']}"
)
return matched_repo
async def execute(self, conn: Connection, limit: int):
if self.query:
embedding = (
OpenAI()
.embeddings.create(input=self.query, model="text-embedding-3-small")
.data[0]
.embedding
)
args = [self.repo, limit, embedding]
else:
args = [self.repo, limit]
embedding = None
sql_query = Template(
"""
SELECT *
FROM {{ table_name }}
WHERE repo_name = $1
{%- if embedding is not none %}
ORDER BY embedding <=> $3
{%- endif %}
LIMIT $2
"""
).render(table_name="github_issues", embedding=embedding)
return await conn.fetch(sql_query, *args)
class RunSQLReturnPandas(BaseModel):
"""
Use this function when the user wants to do time-series analysis or data analysis and we don't have a tool that can supply the necessary information
"""
query: str = Field(description="Description of user's query")
repos: list[str] = Field(
description="the repos to run the query on, should be in the format of 'owner/repo'"
)
async def execute(self, conn: Connection, limit: int):
pass
class SearchSummaries(BaseModel):
"""
This function retrieves summarized information about GitHub issues that match/are similar to a specific query, It's particularly useful for obtaining a quick snapshot of issue trends or patterns within a project.
"""
query: Optional[str] = Field(description="Relevant user query if any")
repo: str = Field(
description="the repo to search for issues in, should be in the format of 'owner/repo'"
)
@field_validator("repo")
def validate_repo(cls, v: str, info: ValidationInfo):
matched_repo = find_closest_repo(v, info.context["repos"])
if matched_repo is None:
raise ValueError(
f"Unable to match repo {v} to a list of known repos of {info.context['repos']}"
)
return matched_repo
async def execute(self, conn: Connection, limit: int):
if self.query:
embedding = (
OpenAI()
.embeddings.create(input=self.query, model="text-embedding-3-small")
.data[0]
.embedding
)
args = [self.repo, limit, embedding]
else:
args = [self.repo, limit]
embedding = None
sql_query = Template(
"""
SELECT *
FROM {{ table_name }}
WHERE repo_name = $1
{%- if embedding is not none %}
ORDER BY embedding <=> $3
{%- endif %}
LIMIT $2
"""
).render(table_name="github_issue_summaries", embedding=embedding)
return await conn.fetch(sql_query, *args)
And then validate that this works by running our original execute
function on a SearchSummary
call, as seen below, where we pass in a misspelled repository name of kuberntes
.
repos = [
"rust-lang/rust",
"kubernetes/kubernetes",
"apache/spark",
"golang/go",
"tensorflow/tensorflow",
]
query = (
"What are the main problems people are facing with installation with Kubernetes"
)
conn = await get_conn()
limit = 10
resp = await SearchSummaries.model_validate_json(
json.dumps({"query": query, "repo": "kuberntes"}),
context={"repos": repos},
).execute(conn, limit)
for row in resp[:3]:
print(row["text"])
await conn.close()
In short, by leveraging SQL for embedding search, you can easily combine issue queries with metadata filters and complex joins, dramatically improving search relevance and speed.
This approach lets you quickly extract meaningful insights from vast amounts of GitHub data, streamlining your issue management and decision-making processes.
The final step of our application also involves building a text-to-SQL (Text2SQL) tool as a catch-all for more complex queries.
Developing an effective Text2SQL agent is crucial for translating natural language queries into precise database operations.
In this section, we’ll review some tips for developing these agents by looking at a prompt we developed for TimescaleDB-specific query generation.
While concise prompts seem appealing, we've found that verbose prompts are key to unlocking accurate SQL generation. Here's why:
Consider this excerpt from our text-to-SQL prompt:
You are a SQL expert tasked with writing queries including a time attribute for the relevant table. The user wants to execute a query for the following repos: {self.repos} to answer the query of {self.user_query_summary}.
Key guidelines:
- Use the `repo_name` column for repository filtering.
- Employ the `time_bucket` function for time-based partitioning when specified.
- The `metadata` field is currently empty, so do not use it.
- Use the `issue_label` column in `github_issue_summaries` to determine issue status.
-- Database Information Below--
These guidelines help to prevent a few failure modes that we saw when we tested our agent with this prompt.
github_issue_summaries
table to determine an issue's status, we helped prevent other cases where the model hallucinated an issue_label
field on the github_issue
tables. This often happened when we asked it to calculate things like the number of closed or open issues.time_bucket
feature is very useful for obtaining arbitrary periods and should be used over a custom hand-rolled PostgreSQL function. Explicitly providing an instruction to use the time_bucket
function for partitioning when an interval is specified helps prevent potentially faulty implementations.Without a detailed and concise summary of what each column is supposed to represent, a language model may struggle to intuit how each column or table is meant to be used.
With a rich schema description, the model can make more informed decisions when constructing queries. Let’s take the following bullet point from our prompt above; knowing that the github_issue_summaries
table contains a label
column of type issue_label
allows the model to use this for status-based queries:
Use the `issue_label` column in `github_issue_summaries` to determine issue status.
Now that everything is wired up, we have tons of options to choose from.
An easy first step is to generate some SQL query, execute it, and then use the data that’s returned to answer the user’s query.
Another easy first step is to generate a SQL query, execute it, and then use the data returned from our database to generate a response to the user’s query. Since the Pydantic
objects are just code, testing individual functions becomes very easy. We can use these primitives as part of the LLM or as another developer.
We might want to use these AI tools as part of a data analyst's workflow. We can have them in Jupyter notebooks, return pandas objects, and continue our data analysis. We can even think about building a caching layer to resolve the query and reuse it for later! These are now tools that both humans and AI systems use interchangeably!
And there you have it. By using OpenAI's Parallel Tool Calling API, we can expose a suite of text-to-SQL tools and its semantic search tools to build a question-answering system over GitHub issues.
By also having a nice separation of concerns, we can evaluate tool selection separately from implementation as we prototype.
Let’s see how this might work in practice by getting our model to generate a quick summary of the challenges that users faced in the kubernetes/kubernetes
issues for installation. To do so, we'll define a function that can summarize the retrieved results when we execute our model's chosen tool.
We’ll do so using instructor
and feed in the text chunks from the relevant issues that we retrieved from our database.
import instructor
from pydantic import BaseModel
from asyncpg import Record
from typing import Optional
from jinja2 import Template
from openai import OpenAI
class Summary(BaseModel):
chain_of_thought: str
summary: str
def summarize_content(issues: list[Record], query: Optional[str]):
client = instructor.from_openai(OpenAI())
return client.chat.completions.create(
messages=[
{
"role": "system",
"content": """You're a helpful assistant that summarizes information about issues from a github repository. Be sure to output your response in a single paragraph that is concise and to the point.""",
},
{
"role": "user",
"content": Template(
"""
Here are the relevant issues:
{% for issue in issues %}
- {{ issue['text'] }}
{% endfor %}
{% if query %}
My specific query is: {{ query }}
{% else %}
Please provide a broad summary and key insights from the issues above.
{% endif %}
"""
).render(issues=issues, query=query),
},
],
response_model=Summary,
model="gpt-4o-mini",
)
We’ll also need to modify our one-step agent so that it has access to a validation context to validate the extracted repositories.
def one_step_agent(question: str, repos: list[str]):
client = instructor.from_openai(OpenAI(), mode=instructor.Mode.PARALLEL_TOOLS)
return client.chat.completions.create(
model="gpt-4o-mini",
messages=[
{
"role": "system",
"content": "You are an AI assistant that helps users query and analyze GitHub issues stored in a PostgreSQL database. Search for summaries when the user wants to understand the trends or patterns within a project. Otherwise just get the issues and return them. Only resort to SQL queries if the other tools are not able to answer the user's query.",
},
{
"role": "user",
"content": Template(
"""
Here is the user's question: {{ question }}
Here is a list of repos that we have stored in our database. Choose the one that is most relevant to the user's query:
{% for repo in repos %}
- {{ repo }}
{% endfor %}
"""
).render(question=question, repos=repos),
},
],
validation_context={"repos": repos},
response_model=Iterable[
Union[
RunSQLReturnPandas,
SearchIssues,
SearchSummaries,
]
],
)
Now let's see this function in action by seeing how we can summarize the information on the Kubernetes installation in our database.
When you run this code, you'll get a summary of the challenges people faced with the kubernetes/kubernetes
repo when working with different pods.
Here's an example of what the output might look like:
query = "What are the main issues people face with endpoint connectivity between different pods in kubernetes?"
repos = [
"rust-lang/rust",
"kubernetes/kubernetes",
"apache/spark",
"golang/go",
"tensorflow/tensorflow",
"MicrosoftDocs/azure-docs",
"pytorch/pytorch",
"Microsoft/TypeScript",
"python/cpython",
"facebook/react",
"django/django",
"rails/rails",
"bitcoin/bitcoin",
"nodejs/node",
"ocaml/opam-repository",
"apache/airflow",
"scipy/scipy",
"vercel/next.js",
]
resp = one_step_agent(query, repos)
conn = await get_conn()
limit = 10
tools = [tool for tool in resp]
print(tools)
#> [SearchSummaries(query='endpoint connectivity pods kubernetes', repo='kubernetes/kubernetes')]
result = await tools[0].execute(conn, limit)
summary = summarize_content(result, query)
print(summary.summary)
#> Users face endpoint connectivity issues in Kubernetes potentially due to networking setup errors with plugins like Calico, misconfigured network interfaces, and lack of clear documentation that hinders proper setup and troubleshooting. Proper handling of these aspects is essential for ensuring connectivity between different pods.
The main issues faced with endpoint connectivity between different pods include network latency, communication failures, misconfigurations in network policies, DNS resolution problems, challenges in service discovery, resource limits causing throttling, and complexities in multi-cluster communication.
This was possible because we focused on the development of our entire application from a bottom-up approach, starting with a strong evaluation suite to verify tool usage before moving to implementation.
In this article, we combined PostgreSQL with pgvectorscale and pgvector to create a RAG system that can answer complex user queries. We then demonstrated how to test function calling capabilities using instructor before walking through some best practices for text-to-SQL generation.
At Timescale, we're working to make PostgreSQL a better database for AI builders with all the capabilities you need to build and improve your RAG systems. Subscribe to our newsletter to be the first to hear about new educational content like this and new features to help you build AI applications with PostgreSQL. And if the mission sounds interesting to you, we're hiring.
One thing you’ll notice about our code is the boilerplate around extraction and embedding creation. The Timescale AI Engineering team is actively working on making this easier, so look out for an exciting announcement from us in the coming weeks.
In upcoming articles, we’ll cover how to utilize advanced techniques, such as synthetic data generation and automated metadata generation. Using pgvectorscale to enhance pgvector and PostgreSQL for these use cases will enable you to build faster and more scalable AI applications.
Finally, if you're building a RAG application, here are some things we've built to help you (GitHub ⭐s welcome!):