How We Designed a Resilient Vector Embedding Creation System for PostgreSQL Data

How We Designed a Resilient Vector Embedding Creation System for PostgreSQL Data

Embedding data stored in a PostgreSQL table is undoubtedly useful—with applications ranging from semantic search and recommendation systems to generative AI applications and retrieval augmented generation. But creating and managing embeddings for data in PostgreSQL tables can be tricky, with many considerations and edge cases to take into account, such as keeping embeddings up to date with table updates and deletes, ensuring resilience against failures, and impact to existing systems dependent on the table.

In a previous blog post, we detailed a step-by-step guide on the process of creating and managing embeddings for data residing in PostgreSQL using PgVectorizer—our simple and resilient embedding creation system for data residing in PostgreSQL. Using a blog application with data stored in a PostgreSQL database as an example, we covered how to create and keep up-to-date vector embeddings using Python, LangChain, and pgai on Timescale.

In this blog post, we’ll discuss the technical design decisions and the trade-offs we made while building PgVectorizer to ensure simplicity, resilience, and high performance. We’ll also discuss alternative designs if you want to roll your own.

Let’s jump into it.

Design of a High-Performance Vectorizer for PostgreSQL Data (PgVectorizer)

First, let’s describe how the system we are building will work. Feel free to skip this section if you already read the PgVectorizer post.

System overview

As an illustrative example, we’ll use a simple blog application storing data in PostgreSQL using a table defined as follows:

CREATE TABLE blog (
  id              SERIAL PRIMARY KEY NOT NULL,
  title           TEXT NOT NULL, 
  author          TEXT NOT NULL,
  contents        TEXT NOT NULL,
  category        TEXT NOT NULL,
  published_time  TIMESTAMPTZ NULL --NULL if not yet published
);

We want to create embeddings on the contents of the blog post so we can later use it for semantic search and power retrieval augmented generation. Embeddings should only exist and be searchable for blogs that have been published (where the published_time is NOT NULL). 

While building this embeddings system, we were able to identify a number of goals that any straightforward and resilient system that creates embeddings should have:

  • No modifications to the original table. This allows systems and applications that already use this table not to be impacted by changes to the embedding system. This is especially important for legacy systems.
  • No modification to the applications that interact with the table. Having to modify the code that alters the table may not be possible for legacy systems. It’s also poor software design because it couples systems that don’t use embeddings with code that generates the embedding.
  • Automatically update embeddings when rows in the source table change (in this case, the blog table). This lessens the maintenance burden and contributes to worry-free software. At the same time, this update need not be instantaneous or within the same commit. For most systems, “eventual consistency” is just fine.
  • Ensure resilience against network and service failures: Most systems generate embeddings via a call to an external system, such as the OpenAI API. In scenarios where the external system is down, or a network malfunction occurs, it's imperative that the remainder of your database system continues working.

These guidelines were the basis of a robust architecture that we implemented using the Python Vector library, a library for working with vector data using PostgreSQL. To complete the job successfully, added new functionality to this library—PgVectorizer—to make embedding PostgreSQL data as simple as possible.

Here’s the architecture we settled on:

Reference architecture for a simple and resilient system for embedding data in an existing PostgreSQL table. We use the example use case of a blogging application, hence the table names above.
Reference architecture for a simple and resilient system for embedding data in an existing PostgreSQL table. We use the example use case of a blogging application, hence the table names above.

In this design, we first add a trigger to the blog table that monitors for changes and, upon seeing a modification, inserts a job into the blog_work_queue table that indicates that a row in the blog table is out-of-date with its embedding.

On a fixed schedule, an embeddings creator job will poll the blog_work_queue table, and if it finds work to do, will do the following in a loop:

  1. Read and lock a row in the blog_work_queue table 
  2. Read the corresponding row in the blog table
  3. Create an embedding for the data in the blog row
  4. Write the embedding to the blog_embedding table
  5. Delete the locked row in the  blog_work_queue table

To see this system in action, see an example of usage to create and maintain embeddings in a PostgreSQL table using OpenAI, LangChain, and pgai on Timescale in this blog post.

Going back to the example of our blog application table, on a high level, PgVectorizer has to do two things:

  1. Track changes to the blog rows to know which rows have changed.
  2. Provide a method to process the changes to create embeddings.

Both of these have to be highly concurrent and performant. Let’s see how it works.

Track change to the blog table with the blog_work_queue table

You can create a simple work queue table with the following:

CREATE TABLE blog_embedding_work_queue (
  id  INT 
);

CREATE INDEX ON blog_embedding_work_queue(id);

This is a very simple table, but there is one item of note: this table has no unique key. This was done to avoid locking issues when processing the queue, but it does mean that we may have duplicates. We discuss the trade-off later in Alternative 1 below.

Then you create a trigger to track any changes made to blog:

CREATE OR REPLACE FUNCTION blog_wq_for_embedding() RETURNS TRIGGER LANGUAGE PLPGSQL AS $$ 
BEGIN 
  IF (TG_OP = 'DELETE') THEN
    INSERT INTO blog_embedding_work_queue 
      VALUES (OLD.id);
  ELSE
    INSERT INTO blog_embedding_work_queue 
      VALUES (NEW.id);
  END IF;
  RETURN NULL;
END; 
$$;

CREATE TRIGGER track_changes_for_embedding 
AFTER INSERT OR UPDATE OR DELETE
ON blog 
FOR EACH ROW EXECUTE PROCEDURE blog_wq_for_embedding();

INSERT INTO blog_embedding_work_queue 
  SELECT id FROM blog WHERE published_time is NOT NULL;

The trigger inserts the ID of the blog that has changed into blog_work_queue. We install the trigger and then insert any existing blogs into the work_queue. This ordering is important to make sure that no IDs get dropped.

Now, let’s describe some alternative designs and why we rejected them.

Alternative 1: Implement a primary or unique key for the blog_work_queue table.

Introducing this key would eliminate the problem of duplicate entries. However, it's not without its challenges, particularly because such a key would force us to use the INSERT…ON CONFLICT DO NOTHING clause to insert new IDs into the table, and that clause takes a lock on the ID in the B-tree. 

Here's the dilemma: during the processing phase, it's necessary to delete the rows being worked on to prevent simultaneous processing. Yet, committing this deletion can only be done after the corresponding embedding has been placed into blog_embeddings. This ensures no IDs are lost if there's a disruption midway—say, if the embedding creation crashes post-deletion but before the embedding is written.

Now, if we create a unique or primary key, the transaction overseeing the deletion stays open. Consequently, this acts as a lock on those specific IDs, preventing their insertion back into the blog_work_queue for the entire duration of the embedding creation job. Given that it takes longer to create embeddings than your typical database transaction, this spells trouble. The lock would stall the trigger for the main 'blog' table, leading to a dip in the primary application's performance. Making things worse, if processing multiple rows in a batch, deadlocks become a potential problem as well.

However, the potential issues arising from occasional duplicate entries can be managed during the processing stage, as illustrated later. A sporadic duplicate here and there isn't a problem as it only marginally increases the amount of work the embedding job performs. This is certainly more palatable than grappling with the above-mentioned locking challenges.

Alternative 2: Track the work that needs to be done by adding a column to the blog table to track whether an up-to-date embedding exists.

For example, we could add an embedded boolean column set to false on modification and flipped to true when the embedding is created. There are three reasons to reject this design: 

  1. We don’t want to modify the blog table for the reasons we already mentioned above.
  2. Efficiently getting a list of non-embedded blogs would require an additional index (or partial index) on the blog table. This would slow down other operations.
  3. This increases churn on the table because every modification would now be written twice (once with embedding=false and once with embedding=true) due to the MVCC nature of PostgreSQL.

A separate work_queue_table solves these issues.

Alternative 3: Create the embeddings directly in the trigger.

This approach has several issues:

  1. If the embedding service is down, either the trigger needs to fail (aborting your transaction), or you need to create a backup code path that … stores the IDs that couldn’t be embedded in a queue. The latter solution gets us back to our proposed design but with more complexity bolted on top.
  2. This trigger will probably be much slower than the rest of the database operations because of the latency required to contact an external service. This will slow down the rest of your database operations on the table.
  3. It forces the user to write the creation embedding code directly in the database. Given that the lingua franca of AI is Python and that embedding creation often requires many other libraries, this isn’t always easy or even possible (especially if running within a hosted PostgreSQL cloud environment). It’s much better to have a design where you have a choice to create embeddings inside or outside of the database.

Now we have a list of blogs that need to be embedded, let’s process the list!

Create the embeddings

There are many ways to create embeddings. We recommend using an external Python script. This script will scan the work queue and the related blog posts, invoke an external service to craft the embeddings, and then store these embeddings back into the database. Our reasoning for this strategy is as follows:

  • Choice of Python: We recommend Python because it offers a rich, unmatched ecosystem for AI data tasks, highlighted by powerful LLM development and data libraries like LangChain and LlamaIndex.
  • Opting for an external script instead of PL/Python: We wanted users to have control over how they embed their data. Yet, at the same time, many Postgres cloud providers don’t allow the execution of arbitrary Python code inside the database because of security concerns. So, to allow users to have flexibility in both their embedding scripts as well as where they host their database, we went with a design that used external Python scripts.

The jobs must be both performant and concurrency-safe. Concurrency guarantees that if jobs start running behind, the schedulers can start more jobs to help the system catch up and handle the load.

We’ll go through how to set up each of those methods later, but first, let’s see what the Python script would look like. Fundamentally, the script has three parts:

  1. Read the work queue and the blog post
  2. Create an embedding for the blog post
  3. Write the embedding to the blog_embedding table

Steps 2 and 3 are performed by an embed_and_write callback that we define in the PgVectorizer blog post. So, let’s look more deeply at how we process the work queue.

Process the work queue

We’ll first show you the code and then highlight the key elements at play:

def process_queue(embed_and_write_cb, batch_size:int=10):            
    with psycopg2.connect(TIMESCALE_SERVICE_URL) as conn:
        with conn.cursor(cursor_factory=psycopg2.extras.DictCursor) as cursor:
            cursor.execute(f"""
                SELECT to_regclass('blog_embedding_work_queue')::oid; 
                """)
            table_oid = cursor.fetchone()[0]
            
            cursor.execute(f"""
                WITH selected_rows AS (
                    SELECT id
                    FROM blog_embedding_work_queue                         
                    LIMIT {int(batch_size)}
                    FOR UPDATE SKIP LOCKED
                ), 
                locked_items AS (
                    SELECT id, 
                           pg_try_advisory_xact_lock(
                                {int(table_oid)}, id) AS locked
                    FROM (
                        SELECT DISTINCT id 
                        FROM selected_rows 
                        ORDER BY id
                     ) as ids
                ),
                deleted_rows AS (
                    DELETE FROM blog_embedding_work_queue
                    WHERE id IN (
                        SELECT id 
                        FROM locked_items 
                        WHERE locked = true ORDER BY id
                   )
                )
                SELECT locked_items.id as locked_id, {self.table_name}.*
                FROM locked_items
                LEFT JOIN blog ON blog.id = locked_items.id
                WHERE locked = true
                ORDER BY locked_items.id
            """)
            res = cursor.fetchall()
            if len(res) > 0:
                embed_and_write_cb(res)
            return len(res)

process_queue(embed_and_write)

The SQL code in the above snippet is subtle because it is designed to be both performant and concurrency-safe, so let’s go through it:

  • Getting items off the work queue: Initially, the system retrieves a specified number of entries from the work queue, determined by the batch queue size parameter. A FOR UPDATE lock is taken to ensure that concurrently executing scripts don’t try processing the same queue items. The SKIP LOCKED directive ensures that if any entry is currently being handled by another script, the system will skip it instead of waiting, avoiding unnecessary delays.
  • Locking blog IDs: Due to the possibility of duplicate entries for the same blog_id within the work-queue table, simply locking said table is insufficient. Concurrent processing of the same ID by different jobs would be detrimental. Consider the following potential race-condition:
  1. Job 1 initiates and accesses a blog, retrieving version 1.
  2. An external update to the blog occurs.
  3. Subsequently, Job 2 begins, obtaining version 2.
  4. Both jobs commence the embedding generation process.
  5. Job 2 concludes, storing the embedding corresponding to blog version 2.
  6. Job 1, upon conclusion, erroneously overwrites the version 2 embedding with the outdated version 1.

While one could counter this issue by introducing explicit version tracking, it introduces considerable complexity without performance benefit. The strategy we opted for not only mitigates this issue but also prevents redundant operations and wasted work by concurrently executing scripts.

A Postgres advisory lock, prefixed with the table identifier to avoid potential overlaps with other such locks, is employed. The try variant, analogous to the earlier application of SKIP LOCKED, ensures the system avoids waiting on locks. The inclusion of the ORDER BY blog_id clause helps prevent potential deadlocks. We’ll cover some alternatives below.

  • Cleaning up the work queue: The script then deletes all the work queue items for blogs we have successfully locked. If these queue items are visible via Multi-Version Concurrency Control (MVCC), their updates are manifested in the retrieved blog row. Note that we delete all items with the given blog ID, not only the items read when selecting the rows: this effectively handles duplicate entries for the same blog ID. It's crucial to note that this deletion only commits after invoking the embed_and_write() function and the subsequent storage of the updated embedding. This sequence ensures we don’t lose any updates even if the script fails during the embedding generation phase.
  • Getting the blogs to process: In the last step, we fetch the blogs to process. Note the use of the left join: that allows us to retrieve the blog IDs for deleted items that won’t have a blog row. We need to track those items to delete their embeddings. In the embed_and_write callback, we use published_time being NULL as a sentinel for the blog being deleted (or unpublished, in which case we also want to delete the embedding).

Alternative 4: Avoid using advisory locks by using another table. 

If the system already uses advisory locks and you are worried about collisions, it’s possible to use a table with a blog ID as the primary key and lock the rows. In fact, this can be the blog table itself if you are sure these locks won’t slow down any other system (remember, these locks have to be held throughout the embedding process, which can take a while).

Alternatively, you can have a blog_embedding_locks table just for this purpose. We didn’t suggest creating that table because we think it can get quite wasteful in terms of space, and using advisory locks avoids this overhead. 

Conclusion and Next Steps

We introduced PgVectorizer and outlined a system adept at generating vector embeddings from data stored in PostgreSQL and automatically keeping them up to date. This architecture ensures the embeddings remain synchronized with the perpetually evolving data, responding seamlessly to insertions, modifications, and deletions.

In this blog post, we gave you a behind-the-scenes look at how we created a system that boasts resilience, effectively handling potential downtimes of the embedding-generation service. Its design is adept at managing a high rate of data modifications and can seamlessly use concurrent embedding-generation processes to accommodate heightened loads.

Moreover, the paradigm of committing data to PostgreSQL and using the database to manage embedding generation in the background emerges as an easy mechanism to supervise embedding upkeep amidst data modifications. A myriad of demos and tutorials in the AI space focus singularly on the initial creation of data from documents, overlooking the intricate nuances associated with preserving data synchronization as it evolves. 

However, in real production environments, data invariably changes, and grappling with the complexities of tracking and synchronizing these shifts is no trivial endeavor. But that’s what a database is designed to do! Why not just use it?

Here are some resources to continue your learning journey:

This post was written by

Originally posted

Last updated

11 min read
PostgreSQL
Contributors

Related posts