RAG Is More Than Just Vector Search

RAG Is More Than Just Vector Search

Many developers mistakenly view RAG applications as simple semantic searches over vector databases that augment large language models (LLMs). However, modern AI apps need a more layered approach to address embedding model limitations.

For example, answering the question "What were the top five most discussed GitHub issues last month related to performance optimization?" requires more than just similarity search, like:

  • Time-based filtering (last month)
  • Text search (performance optimization)
  • Aggregations and filters (top five most discussed issues)

Embedding search alone won’t cut it for a good RAG (retrieval-augmented generation) system.

The good news is that with PostgreSQL and Timescale, we get vector search, time-series capabilities, and all of the flexibility of SQL in a single database. By blending in Timescale with LLMs, we can:

  • Perform vector similarities searches over embeddings.
  • Use language models to extract and augment our existing datasets.
  • Write efficient queries that can join across many attributes.
  • Leverage Timescale's existing functionality with time-series data.

In this post, we'll do four things:

  1. Go beyond embeddings: we'll discuss the need for various capabilities within AI applications and why Timescale is an ideal blend of embeddings and SQL for many use cases that require data retrieval.
  2. Structured extraction: we'll explore using language models to directly extract data into Timescale.
  3. Evals-driven development: we’ll highlight how easy it is to start with test-driven development early in the process of creating AI tools and emphasize the importance of focusing on specific use cases.
  4. Putting it all together: we’ll demonstrate how to implement embedding search and text-to-SQL, showing how simple it is to leverage Timescale's embedding and SQL tools to accomplish your tasks.

To do this, we’ll ground our conversation in an example application that allows us to answer questions about GitHub issues. To develop the Github issues Q+A app, we'll leverage PostgreSQL and Timescale and implement parallel tool calling and text-to-SQL capabilities to illustrate just how far beyond vector search RAG can go.

👋
New to Timescale? Here's a quick intro.

We're making PostgreSQL better for AI builders.

We've built pgvectorscale, to speed up and scale pgvector, and pgai, to bring LLMs closer to your data. If you prefer not spend more time improving your RAG app and less time managing data infrastructure, check out Timescale Cloud.

And if you'd like to skip to the code, you can find all code snippets used in their blog in the companion GitHub repo.

Before we start building: Think about user needs and work backwards

The most impactful AI applications aren't necessarily the most complex and agentic—they're the ones built on a deep understanding of user requirements. When building new AI applications, our goal should be to explore simple workflows and specialized tooling before diving into complex reasoning and planning systems.

Often, the push for complex systems in product development stems from a reluctance to deeply understand user needs. By truly grasping these needs, you can create simple, effective solutions rather than relying on unreliable complex agents. This approach prevents the risk of ending up with an impressive demo but disappointed customers.

We should always ask ourselves these two questions when approaching a new dataset:

  1. What additional filters and indices can we leverage?
  2. Can we extract any additional metadata to simplify the task at hand?

Extraction and Ingestion

🐘
Before continuing with this section, make sure that you've configured a Timescale instance and set a DB_URL environment variable inside your shell. If you don’t have a Timescale instance configured, you can create one in our Getting Started docs.

In the case of GitHub issues, we might recognize that we want to have additional functionality beyond what exists in the dataset. We might care about:

  • Tagging different issues
  • Seeing if issues have been resolved
  • Distinguishing between feature requests and bugs

These can be user-centric features and indices that we can use to significantly improve the system's ability to answer the questions we care about.

Data processing adds functionality beyond what's available in the standard dataset. Ultimately, this improves our ability to answer more complex questions that users might pose. Let's walk through building a custom data pipeline for GitHub issues.

Here’s an overview of what we’ll be working on during this section:

  1. Data models: we'll start by creating Pydantic models to structure our raw and processed GitHub issue data.
  2. Using generators: we’ll then showcase how to use generators to reduce the time taken to iterate through the entire dataset.
  3. Data processing: we’ll asynchronously classify and summarize these issues before embedding them for future reference.
  4. Storing and indexing enhanced data: finally, we'll use Timescale's newly released pgvectorscale extension with pgvector to efficiently store our processed data, setting up appropriate indexes for fast querying and analysis.

As a reminder, you can find all the code for this work on our dedicated GitHub repo.

Data models

First, let's install the necessary dependencies:

pip install instructor openai tqdm pydantic datasets pgvector asyncpg Jinja2 fuzzywuzzy python-Levenshtein

We'll use Pydantic models to ensure type safety. To do so, we’ll define two Pydantic classes: ProcessedIssue, which represents a generated summary, and GithubIssue, which will represent the raw data that we will be extracting from the dataset.

from pydantic import BaseModel
from typing import Literal, Any, Optional
from datetime import datetime

class ClassifiedSummary(BaseModel):
    chain_of_thought: str
    label: Literal["OPEN", "CLOSED"]
    summary: str

class ProcessedIssue(BaseModel):
    issue_id: int
    text: str
    label: Literal["OPEN", "CLOSED"]
    repo_name: str
    embedding: Optional[list[float]]

class GithubIssue(BaseModel):
    issue_id: int
    metadata: dict[str, Any]
    text: str
    repo_name: str
    start_ts: datetime
    end_ts: Optional[datetime]
    embedding: Optional[list[float]]

Using generators

Let's grab some GitHub issues to work with. We'll use the bigcode/the-stack-github-issues dataset and the datasets library to make our lives easier.

Here's what we're doing:

  1. Cherry-pick repos: We'll filter issues to focus only on the repositories we care about. This will allow us to run more targeted data analysis on the final dataset.
  2. Grab a manageable chunk: We'll use the take function to snag a subset of issues. This lets us work with a significantly smaller slice of the dataset, allowing us to iterate faster and do more experiments.
from datasets import load_dataset

def get_issues(n: int, repos: list[str]):
    dataset = (
        load_dataset("bigcode/the-stack-github-issues", split="train", streaming=True)
        .filter(lambda x: x["repo"] in repos)
        .take(n)
    )

    for row in dataset:
        start_time = None
        end_time = None
        for event in row["events"]:
            event_type = event["action"]
            timestamp = event["datetime"]
            timestamp = timestamp.replace("Z", "+00:00")

            if event_type == "opened":
                start_time = datetime.fromisoformat(timestamp)

            elif event_type == "closed":
                end_time = datetime.fromisoformat(timestamp)

            # Small Fall Back here - Some issues have no Creation event
            elif event_type == "created" and not start_time:
                start_time = datetime.fromisoformat(timestamp)

            elif event_type == "reopened" and not start_time:
                start_time = datetime.fromisoformat(timestamp)

        yield GithubIssue(
            issue_id=row["issue_id"],
            metadata={},
            text=row["content"],
            repo_name=row["repo"],
            start_ts=start_time,
            end_ts=end_time,
            embedding=None,
        )

Data processing

We can use Python’s async functionality and the instructor library to quickly process issues in parallel. Instead of waiting for each task to finish, we can work on multiple issues simultaneously.

Better yet, to ensure we stay within a reasonable rate limit, we can also use a Semaphore to control the number of concurrent tasks being executed.

from asyncio import run, Semaphore
from tqdm.asyncio import tqdm_asyncio as asyncio
from textwrap import dedent
from instructor import from_openai
from openai import AsyncOpenAI
from jinja2 import Template

async def batch_classify_issue(
    batch: list[GithubIssue], max_concurrent_requests: int = 20
) -> list[ProcessedIssue]:
    async def classify_issue(issue: GithubIssue, semaphore: Semaphore):
        client = from_openai(AsyncOpenAI())
        async with semaphore:
            classification = await client.chat.completions.create(
                response_model=ClassifiedSummary,
                messages=[
                    {
                        "role": "system",
                        "content": "You are a helpful assistant that classifies and summarizes GitHub issues. When summarizing the issues, make sure to expand on specific accronyms and add additional explanation where necessary.",
                    },
                    {
                        "role": "user",
                        "content": Template(
                            dedent(
                            """
                            Repo Name: {{ repo_name }}
                            Issue Text: {{ issue_text}}
                            """
                            )
                        ).render(repo_name=issue.repo_name, issue_text=issue.text),
                    },
                ],
                model="gpt-4o-mini",
            )
            return ProcessedIssue(
                issue_id=issue.issue_id,
                repo_name=issue.repo_name,
                text=classification.summary,
                label=classification.label,
                embedding=None,
            )

    semaphore = Semaphore(max_concurrent_requests)
    coros = [classify_issue(item, semaphore) for item in batch]
    results = await asyncio.gather(*coros)
    return results

We’ll also define a function to process our embeddings simultaneously. These will be useful for performing similarity search across our different issues using pgvector and pgvectorscale in a later section.

from openai import AsyncOpenAI

async def batch_embeddings(
    data: list[ProcessedIssue],
    max_concurrent_calls: int = 20,
) -> list[ProcessedIssue]:
    oai = AsyncOpenAI()

    async def embed_row(
        item: ProcessedIssue,
        semaphore: Semaphore,
    ):
        async with semaphore:
            input_text = item.text if len(item.text) < 8000 else item.text[:6000]
            embedding = (
                (
                    await oai.embeddings.create(
                        input=input_text, model="text-embedding-3-small"
                    )
                )
                .data[0]
                .embedding
            )
            item.embedding = embedding
            return item

    semaphore = Semaphore(max_concurrent_calls)
    coros = [embed_row(item, semaphore) for item in data]
    results = await asyncio.gather(*coros)
    return results

Now that we’ve figured out how to process and embed our summaries at scale, we can work on loading it into Timescale. We're using asyncpg, which will help us automatically batch our insertions using the execute_many function.

Storing and indexing enhanced data

We can manage all our embeddings with pgvectorscale and pgvector natively within Timescale. This allows us to enjoy up to 28x lower p95 latency, 16x higher query throughput, and a 75 % cost reduction compared to Pinecone in our latest benchmark.

All we need to do is to enable the pgvectorscale extension. This will help us set up pgvector and pgvectorscale in our Timescale project. Once we've done so, we can create a table for our embeddings and index them for optimal performance.

Let's see how we can do so in the following section.

import os
from pgvector.asyncpg import register_vector
import asyncpg

init_sql = """
CREATE EXTENSION IF NOT EXISTS vectorscale CASCADE;

DROP TABLE IF EXISTS github_issue_summaries CASCADE;
DROP TABLE IF EXISTS github_issues CASCADE;

CREATE TABLE IF NOT EXISTS github_issues (
    issue_id INTEGER,
    metadata JSONB,
    text TEXT,
    repo_name TEXT,
    start_ts TIMESTAMPTZ NOT NULL,
    end_ts TIMESTAMPTZ,
    embedding VECTOR(1536) NOT NULL
);

CREATE INDEX github_issue_embedding_idx
ON github_issues
USING diskann (embedding);

-- Create a Hypertable that breaks it down by 1 month intervals
SELECT create_hypertable('github_issues', 'start_ts', chunk_time_interval => INTERVAL '1 month');

CREATE UNIQUE INDEX ON github_issues (issue_id, start_ts);

CREATE TABLE github_issue_summaries (
    issue_id INTEGER,
    text TEXT,
    label issue_label NOT NULL,
    repo_name TEXT,
    embedding VECTOR(1536) NOT NULL
);

CREATE INDEX github_issue_summaries_embedding_idx
ON github_issue_summaries
USING diskann (embedding);
"""

async def get_conn():
    conn = await asyncpg.connect(os.getenv("DB_URL"))
    await register_vector(conn)
    return conn

conn = await get_conn()
await conn.execute(init_sql)

With our GitHub Issue and Issue Summary tables in place, let's create two functions to populate our database with the relevant information.

import json

async def insert_github_issue_summaries(conn, issues: list[GithubIssue]):
    insert_query = """
    INSERT INTO github_issue_summaries (issue_id, text, label, embedding,repo_name)
    VALUES ($1, $2, $3, $4, $5)
    """
    summarized_issues = await batch_classify_issue(issues)
    embedded_summaries = await batch_embeddings(summarized_issues)

    await conn.executemany(
        insert_query,
        [
            (item.issue_id, item.text, item.label, item.embedding, item.repo_name)
            for item in embedded_summaries
        ],
    )

    print("GitHub issue summaries inserted successfully.")

async def insert_github_issues(conn, issues: list[GithubIssue]):
    insert_query = """
    INSERT INTO github_issues (issue_id, metadata, text, repo_name, start_ts, end_ts, embedding)
    VALUES ($1, $2, $3, $4, $5, $6, $7)
    """
    embedded_issues = await batch_embeddings(issues)

    await conn.executemany(
        insert_query,
        [
            (
                item.issue_id,
                json.dumps(item.metadata),
                item.text,
                item.repo_name,
                item.start_ts,
                item.end_ts,
                item.embedding,
            )
            for item in embedded_issues
        ],
    )
    print("GitHub issues inserted successfully.")

We can combine our previous functions into a single process_issues function to ingest GitHub issue data into our database:

async def process_issues():
		repos = [
				"rust-lang/rust",
       		 	"kubernetes/kubernetes",
        		"apache/spark",
    ]
		conn = await get_conn()
		issues = list(get_issues(100,repos))
		await insert_github_issues(conn,issues)
		await insert_github_issue_summaries(conn,issues)

await process_issues()

We've now created a powerful pipeline that can process our GitHub issue data to extract valuable insights. With that in mind, let’s shift our focus to developing specialized tooling for customer needs using evaluation-driven development.

Evals-Driven Development

While we develop the tables and indices we might use to build out the RAG application, we can also engage in eval-driven development and test our language model's ability to choose the right tools before implementing the specific tools.

Here, we can be very creative in expressing the tools we want to give to the language model.

In Python, Pydantic schemas are great for prototyping agent tools because they create a clear contract for your agent's actions. This contract makes evaluating the performance and the impact of more complex tooling easy before moving on to implementation.

Let’s explore how we can implement this using instructor, where we have an agent with three tools, as seen below.

  • Raw SQL query generation
  • Fetching the raw ingested issues
  • Fetching issue summaries

Here's how to define these tools using Pydantic models:

from pydantic import BaseModel, Field

class SearchIssues(BaseModel):
    """
    Use this when the user wants to get original issue information from the database 
    """

    query: Optional[str]
    repo: str = Field(
        description="the repo to search for issues in, should be in the format of 'owner/repo'"
    )
 
class RunSQLReturnPandas(BaseModel):
    """
    Use this function when the user wants to do time-series analysis or data analysis and we don't have a tool that can supply the necessary information
    """

    query: str = Field(description="Description of user's query")
    repos: list[str] = Field(
        description="the repos to run the query on, should be in the format of 'owner/repo'"
    )

class SearchSummaries(BaseModel):
    """
		This function retrieves summarized information about GitHub issues that match/are similar to a specific query, It's particularly useful for obtaining a quick snapshot of issue trends or patterns within a project.
    """

    query: Optional[str] = Field(description="Relevant user query if any")
    repo: str = Field(
        description="the repo to search for issues in, should be in the format of 'owner/repo'"
    )

We can test the model’s ability to choose the appropriate tool(s) for the query using the implementation below with instructor.

from typing import Iterable, Union

def one_step_agent(question: str):
    import instructor
    import openai

    client = instructor.from_openai(
        openai.OpenAI(), mode=instructor.Mode.PARALLEL_TOOLS
    )

    return client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {
                "role": "system",
                "content": "You are an AI assistant that helps users query and analyze GitHub issues stored in a PostgreSQL database. Search for summaries when the user wants to understand the trends or patterns within a project. Otherwise just get the issues and return them. Only resort to SQL queries if the other tools are not able to answer the user's query.",
            },
            {"role": "user", "content": question},
        ],
        response_model=Iterable[
            Union[
                RunSQLReturnPandas,
                SearchIssues,
                SearchSummaries,
            ]
        ],
    )

Testing our agent

Testing this agent is simple using the following loop.

Since we expect the agent to call only a single tool for these simple queries, we can verify its ability to identify and choose the appropriate tool for each task correctly. As our test suite expands, we'll likely need to transition to the Async Client for improved efficiency.

tests = [
    [
        "What is the average time to first response for issues in the azure repository over the last 6 months? Has this metric improved or worsened?",
        [RunSQLReturnPandas],
    ],
    [
        "How many issues mentioned issues with Cohere in the 'vercel/next.js' repository in the last 6 months?",
        [SearchIssues],
    ],
    [
        "What were some of the big features that were implemented in the last 4 months for the scipy repo that addressed some previously open issues?",
        [SearchSummaries],
    ],
 ]
 
for query, expected_result in tests:
    response = one_step_agent(query)
    for expected_call, agent_call in zip(expected_result, response):
        assert isinstance(agent_call, expected_call)

By doing so, we can build unit-like test cases for model selection, driving a direct parallel between the eval-based testing and unit tests—all without having to test the implementation jointly.

Now, let’s shift gears and see how we can perform embedding search on the GitHub issues we’ve ingested into our database.

It’s as simple as writing a single SQL query once we’ve got our embeddings on hand using the pgvector and pgvectorscale extensions in PostgreSQL.

We’ll do so by implementing an execute method that uses the asyncpg library on each of the search tools that will return a list of relevant search entries when provided with a user query.

from pydantic import BaseModel, Field
from typing import Optional
from openai import OpenAI
from jinja2 import Template
from asyncpg import Connection

class SearchIssues(BaseModel):
    """
    Use this when the user wants to get original issue information from the database
    """

    query: Optional[str]
    repo: str = Field(
        description="the repo to search for issues in, should be in the format of 'owner/repo'"
    )

    async def execute(self, conn: Connection, limit: int):
        if self.query:
            embedding = (
                OpenAI()
                .embeddings.create(input=self.query, model="text-embedding-3-small")
                .data[0]
                .embedding
            )
            args = [self.repo, limit, embedding]
        else:
            args = [self.repo, limit]
            embedding = None

        sql_query = Template(
            """
            SELECT *
            FROM {{ table_name }}
            WHERE repo_name = $1
            {%- if embedding is not none %}
            ORDER BY embedding <=> $3
            {%- endif %}
            LIMIT $2
            """
        ).render(table_name="github_issues", embedding=embedding)

        return await conn.fetch(sql_query, *args)

class RunSQLReturnPandas(BaseModel):
    """
    Use this function when the user wants to do time-series analysis or data analysis and we don't have a tool that can supply the necessary information
    """

    query: str = Field(description="Description of user's query")
    repos: list[str] = Field(
        description="the repos to run the query on, should be in the format of 'owner/repo'"
    )

    async def execute(self, conn: Connection, limit: int):
        pass

class SearchSummaries(BaseModel):
    """
    This function retrieves summarized information about GitHub issues that match/are similar to a specific query, It's particularly useful for obtaining a quick snapshot of issue trends or patterns within a project.
    """

    query: Optional[str] = Field(description="Relevant user query if any")
    repo: str = Field(
        description="the repo to search for issues in, should be in the format of 'owner/repo'"
    )

    async def execute(self, conn: Connection, limit: int):
        if self.query:
            embedding = (
                OpenAI()
                .embeddings.create(input=self.query, model="text-embedding-3-small")
                .data[0]
                .embedding
            )
            args = [self.repo, limit, embedding]
        else:
            args = [self.repo, limit]
            embedding = None

        sql_query = Template(
            """
            SELECT *
            FROM {{ table_name }}
            WHERE repo_name = $1
            {%- if embedding is not none %}
            ORDER BY embedding <=> $3
            {%- endif %}
            LIMIT $2
            """
        ).render(table_name="github_issue_summaries", embedding=embedding)

        return await conn.fetch(sql_query, *args)

We can then verify that our embedding search is working by running the following snippet of code.


query = "What are the main problems people are facing with installation with Kubernetes"

conn = await get_conn()
limit = 10
resp = await SearchSummaries(query=query, repo="kubernetes/kubernetes").execute(
    conn, limit
)

for row in resp[:3]:
    print(row["text"])

This results in the following output.

Discussion on the need for better release processes and documentation within the Kubernetes project, with a strong emphasis on improving the user experience for setting up the software on Ubuntu. Users express frustrations over a confusing setup process, the necessity for synchronized documentation updates with code changes, and propose structured solutions for improving documentation efficacy.
The issue involved failures in creating a Kubernetes pod sandbox due to the Calico Network Plugin not functioning correctly. The user described encountering multiple error messages regarding pod creation and provided environmental details, including Kubernetes version and CentOS configuration. Suggestions from other users included downgrading Calico or updating the network plugin. The issue was marked as resolved after the user confirmed that using Calico version 2.6 was successful.
User reported an issue with the 'kubectl top' command failing due to an unmarshalling error after creating a Kubernetes cluster with kubeadm and deploying Heapster. The error was resolved by removing the HTTP proxy settings from the kube-apiserver configuration, leading to questions about the command's requirements.

Just like that, we've filtered our results to a specific repository while still leveraging the power of embedding search.

Dealing with misspellings

But what happens if users make spelling mistakes such as kubrntes instead of kubernetes/kubernetes or fail to spell out the entire repository in their prompt?

We can get around this by using the fuzzywuzzy library to do string matching with the following function.

from fuzzywuzzy import process

def find_closest_repo(query: str, repos: list[str]) -> str | None:
    if not query:
        return None

    best_match = process.extractOne(query, repos)
    return best_match[0] if best_match[1] >= 80 else None

We can verify that this works with a few unit tests below.

repos = [
    "rust-lang/rust",
    "kubernetes/kubernetes",
    "apache/spark",
    "golang/go",
    "tensorflow/tensorflow",
    "MicrosoftDocs/azure-docs",
    "pytorch/pytorch",
    "Microsoft/TypeScript",
    "python/cpython",
    "facebook/react",
    "django/django",
    "rails/rails",
    "bitcoin/bitcoin",
    "nodejs/node",
    "ocaml/opam-repository",
    "apache/airflow",
    "scipy/scipy",
    "vercel/next.js",
]

test = [
    ["kuberntes", "kubernetes/kubernetes"],
    ["next.js", "vercel/next.js"],
    ["scipy", "scipy/scipy"],
    ["", None],
    ["fakerepo", None],
]

for query, expected in test:
    assert find_closest_repo(query, repos) == expected

We can then modify our original tools to use this new find_closest_repo function.

from pydantic import BaseModel, Field
from typing import Optional
from openai import OpenAI
from jinja2 import Template
from asyncpg import Connection

class SearchIssues(BaseModel):
    """
    Use this when the user wants to get original issue information from the database
    """

    query: Optional[str]
    repo: str = Field(
        description="the repo to search for issues in, should be in the format of 'owner/repo'"
    )

    @field_validator("repo")
    def validate_repo(cls, v: str, info: ValidationInfo):
        matched_repo = find_closest_repo(v, info.context["repos"])
        if matched_repo is None:
            raise ValueError(
                f"Unable to match repo {v} to a list of known repos of {info.context['repos']}"
            )
        return matched_repo

    async def execute(self, conn: Connection, limit: int):
        if self.query:
            embedding = (
                OpenAI()
                .embeddings.create(input=self.query, model="text-embedding-3-small")
                .data[0]
                .embedding
            )
            args = [self.repo, limit, embedding]
        else:
            args = [self.repo, limit]
            embedding = None

        sql_query = Template(
            """
            SELECT *
            FROM {{ table_name }}
            WHERE repo_name = $1
            {%- if embedding is not none %}
            ORDER BY embedding <=> $3
            {%- endif %}
            LIMIT $2
            """
        ).render(table_name="github_issues", embedding=embedding)

        return await conn.fetch(sql_query, *args)

class RunSQLReturnPandas(BaseModel):
    """
    Use this function when the user wants to do time-series analysis or data analysis and we don't have a tool that can supply the necessary information
    """

    query: str = Field(description="Description of user's query")
    repos: list[str] = Field(
        description="the repos to run the query on, should be in the format of 'owner/repo'"
    )

    async def execute(self, conn: Connection, limit: int):
        pass

class SearchSummaries(BaseModel):
    """
    This function retrieves summarized information about GitHub issues that match/are similar to a specific query, It's particularly useful for obtaining a quick snapshot of issue trends or patterns within a project.
    """

    query: Optional[str] = Field(description="Relevant user query if any")
    repo: str = Field(
        description="the repo to search for issues in, should be in the format of 'owner/repo'"
    )

    @field_validator("repo")
    def validate_repo(cls, v: str, info: ValidationInfo):
        matched_repo = find_closest_repo(v, info.context["repos"])
        if matched_repo is None:
            raise ValueError(
                f"Unable to match repo {v} to a list of known repos of {info.context['repos']}"
            )
        return matched_repo

    async def execute(self, conn: Connection, limit: int):
        if self.query:
            embedding = (
                OpenAI()
                .embeddings.create(input=self.query, model="text-embedding-3-small")
                .data[0]
                .embedding
            )
            args = [self.repo, limit, embedding]
        else:
            args = [self.repo, limit]
            embedding = None

        sql_query = Template(
            """
            SELECT *
            FROM {{ table_name }}
            WHERE repo_name = $1
            {%- if embedding is not none %}
            ORDER BY embedding <=> $3
            {%- endif %}
            LIMIT $2
            """
        ).render(table_name="github_issue_summaries", embedding=embedding)

        return await conn.fetch(sql_query, *args)

And then validate that this works by running our original execute function on a SearchSummary call, as seen below, where we pass in a misspelled repository name of kuberntes.

repos = [
    "rust-lang/rust",
    "kubernetes/kubernetes",
    "apache/spark",
    "golang/go",
    "tensorflow/tensorflow",
]

query = (
    "What are the main problems people are facing with installation with Kubernetes"
)

conn = await get_conn()
limit = 10
resp = await SearchSummaries.model_validate_json(
    json.dumps({"query": query, "repo": "kuberntes"}),
    context={"repos": repos},
).execute(conn, limit)

for row in resp[:3]:
    print(row["text"])

await conn.close()

In short, by leveraging SQL for embedding search, you can easily combine issue queries with metadata filters and complex joins, dramatically improving search relevance and speed.

This approach lets you quickly extract meaningful insights from vast amounts of GitHub data, streamlining your issue management and decision-making processes.

Implementing Text-to-SQL

The final step of our application also involves building a text-to-SQL (Text2SQL) tool as a catch-all for more complex queries.

Developing an effective Text2SQL agent is crucial for translating natural language queries into precise database operations.

In this section, we’ll review some tips for developing these agents by looking at a prompt we developed for TimescaleDB-specific query generation.

Use verbose prompts

While concise prompts seem appealing, we've found that verbose prompts are key to unlocking accurate SQL generation. Here's why:

  1. Rich context: verbose prompts provide your AI model with comprehensive context, ensuring it grasps the nuances of your specific database schema and requirements.
  2. Domain-specific knowledge: by including detailed information about your database structure, table relationships, and TimescaleDB-specific functions like time_bucket, you equip the model with essential domain knowledge.
  3. Clear boundaries: explicit instructions and constraints create a framework for the model, preventing common pitfalls and ensuring adherence to best practices.

Consider this excerpt from our text-to-SQL prompt:

You are a SQL expert tasked with writing queries including a time attribute for the relevant table. The user wants to execute a query for the following repos: {self.repos} to answer the query of {self.user_query_summary}.

Key guidelines:
- Use the `repo_name` column for repository filtering.
- Employ the `time_bucket` function for time-based partitioning when specified.
- The `metadata` field is currently empty, so do not use it.
- Use the `issue_label` column in `github_issue_summaries` to determine issue status.

-- Database Information Below--

These guidelines help to prevent a few failure modes that we saw when we tested our agent with this prompt.

  1. Non-existent metadata fields: If you have helpful metadata information, you should indicate it. Otherwise, make sure to explicitly tell the model not to use the metadata for filtering.
  2. Hallucinating fields: By prompting the model to look at the github_issue_summaries table to determine an issue's status, we helped prevent other cases where the model hallucinated an issue_label field on the github_issue tables. This often happened when we asked it to calculate things like the number of closed or open issues.
  3. Creating custom functions: TimescaleDB’s time_bucket feature is very useful for obtaining arbitrary periods and should be used over a custom hand-rolled PostgreSQL function. Explicitly providing an instruction to use the time_bucket function for partitioning when an interval is specified helps prevent potentially faulty implementations.

Provide detailed schemas

Without a detailed and concise summary of what each column is supposed to represent, a language model may struggle to intuit how each column or table is meant to be used.

With a rich schema description, the model can make more informed decisions when constructing queries. Let’s take the following bullet point from our prompt above; knowing that the github_issue_summaries table contains a label column of type issue_label allows the model to use this for status-based queries:

Use the `issue_label` column in `github_issue_summaries` to determine issue status.

Now that everything is wired up, we have tons of options to choose from.

An easy first step is to generate some SQL query, execute it, and then use the data that’s returned to answer the user’s query.

Another easy first step is to generate a SQL query, execute it, and then use the data returned from our database to generate a response to the user’s query. Since the Pydantic objects are just code, testing individual functions becomes very easy. We can use these primitives as part of the LLM or as another developer.

We might want to use these AI tools as part of a data analyst's workflow. We can have them in Jupyter notebooks, return pandas objects, and continue our data analysis. We can even think about building a caching layer to resolve the query and reuse it for later! These are now tools that both humans and AI systems use interchangeably!

Putting It All Together

And there you have it. By using OpenAI's Parallel Tool Calling API, we can expose a suite of text-to-SQL tools and its semantic search tools to build a question-answering system over GitHub issues.

By also having a nice separation of concerns, we can evaluate tool selection separately from implementation as we prototype.

Let’s see how this might work in practice by getting our model to generate a quick summary of the challenges that users faced in the kubernetes/kubernetes issues for installation. To do so, we'll define a function that can summarize the retrieved results when we execute our model's chosen tool.

We’ll do so using instructor and feed in the text chunks from the relevant issues that we retrieved from our database.

import instructor
from pydantic import BaseModel
from asyncpg import Record
from typing import Optional
from jinja2 import Template
from openai import OpenAI

class Summary(BaseModel):
    chain_of_thought: str
    summary: str

def summarize_content(issues: list[Record], query: Optional[str]):
    client = instructor.from_openai(OpenAI())
    return client.chat.completions.create(
        messages=[
            {
                "role": "system",
                "content": """You're a helpful assistant that summarizes information about issues from a github repository. Be sure to output your response in a single paragraph that is concise and to the point.""",
            },
            {
                "role": "user",
                "content": Template(
                    """
                    Here are the relevant issues:
                    {% for issue in issues %}
                    - {{ issue['text'] }}
                    {% endfor %}
                    {% if query %}
                    My specific query is: {{ query }}
                    {% else %}
                    Please provide a broad summary and key insights from the issues above.
                    {% endif %}
                    """
                ).render(issues=issues, query=query),
            },
        ],
        response_model=Summary,
        model="gpt-4o-mini",
    )

We’ll also need to modify our one-step agent so that it has access to a validation context to validate the extracted repositories.

def one_step_agent(question: str, repos: list[str]):
    client = instructor.from_openai(OpenAI(), mode=instructor.Mode.PARALLEL_TOOLS)

    return client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[
            {
                "role": "system",
                "content": "You are an AI assistant that helps users query and analyze GitHub issues stored in a PostgreSQL database. Search for summaries when the user wants to understand the trends or patterns within a project. Otherwise just get the issues and return them. Only resort to SQL queries if the other tools are not able to answer the user's query.",
            },
            {
                "role": "user",
                "content": Template(
                    """
                    Here is the user's question: {{ question }}
                    Here is a list of repos that we have stored in our database. Choose the one that is most relevant to the user's query:
                    {% for repo in repos %}
                    - {{ repo }}
                    {% endfor %}
                    """
                ).render(question=question, repos=repos),
            },
        ],
        validation_context={"repos": repos},
        response_model=Iterable[
            Union[
                RunSQLReturnPandas,
                SearchIssues,
                SearchSummaries,
            ]
        ],
    )

Now let's see this function in action by seeing how we can summarize the information on the Kubernetes installation in our database.

When you run this code, you'll get a summary of the challenges people faced with the kubernetes/kubernetes repo when working with different pods.

Here's an example of what the output might look like:

query = "What are the main issues people face with endpoint connectivity between different pods in kubernetes?"
repos = [
    "rust-lang/rust",
    "kubernetes/kubernetes",
    "apache/spark",
    "golang/go",
    "tensorflow/tensorflow",
    "MicrosoftDocs/azure-docs",
    "pytorch/pytorch",
    "Microsoft/TypeScript",
    "python/cpython",
    "facebook/react",
    "django/django",
    "rails/rails",
    "bitcoin/bitcoin",
    "nodejs/node",
    "ocaml/opam-repository",
    "apache/airflow",
    "scipy/scipy",
    "vercel/next.js",
]

resp = one_step_agent(query, repos)

conn = await get_conn()
limit = 10

tools = [tool for tool in resp]
print(tools)
#> [SearchSummaries(query='endpoint connectivity pods kubernetes', repo='kubernetes/kubernetes')]

result = await tools[0].execute(conn, limit)

summary = summarize_content(result, query)
print(summary.summary)
#> Users face endpoint connectivity issues in Kubernetes potentially due to networking setup errors with plugins like Calico, misconfigured network interfaces, and lack of clear documentation that hinders proper setup and troubleshooting. Proper handling of these aspects is essential for ensuring connectivity between different pods.

The main issues faced with endpoint connectivity between different pods include network latency, communication failures, misconfigurations in network policies, DNS resolution problems, challenges in service discovery, resource limits causing throttling, and complexities in multi-cluster communication.

This was possible because we focused on the development of our entire application from a bottom-up approach, starting with a strong evaluation suite to verify tool usage before moving to implementation.

The Future of Your RAG Applications

In this article, we combined PostgreSQL with pgvectorscale and pgvector to create a RAG system that can answer complex user queries. We then demonstrated how to test function calling capabilities using instructor before walking through some best practices for text-to-SQL generation.

At Timescale, we're working to make PostgreSQL a better database for AI builders with all the capabilities you need to build and improve your RAG systems. Subscribe to our newsletter to be the first to hear about new educational content like this and new features to help you build AI applications with PostgreSQL. And if the mission sounds interesting to you, we're hiring.

One thing you’ll notice about our code is the boilerplate around extraction and embedding creation. The Timescale AI Engineering team is actively working on making this easier, so look out for an exciting announcement from us in the coming weeks.

In upcoming articles, we’ll cover how to utilize advanced techniques, such as synthetic data generation and automated metadata generation. Using pgvectorscale to enhance pgvector and PostgreSQL for these use cases will enable you to build faster and more scalable AI applications. 

Finally, if you're building a RAG application, here are some things we've built to help you (GitHub ⭐s welcome!):

  • Pgvectorscale brings high-performance search and scalability to pgvector. It's open-source under the PostgreSQL license.
  • Pgai brings LLMs closer to your data enabling embedding creation and LLM reasoning right in PostgreSQL. (Also open-source under the PostgreSQL License).
  • If you want to spend more time improving your RAG app and less time managing a database, try Timescale Cloud. Every database comes with pgvector, pgvectorscale, and pgai and supports all the RAG improvement approaches we discussed in this article.
This post was written by
21 min read
AI
Contributors

Related posts