AI

Nov 08, 2024

Building a Reverse Video Search System With Mixpeek & PostgreSQL

In case you missed the pgai Vectorizer launch, learn why vector databases are the wrong abstraction and how you can create AI embeddings right within your PostgreSQL database.

Building a Reverse Video Search System With Mixpeek & PostgreSQL

Until recently, searching through unstructured data—like text, images, and videos—was a nontrivial problem. Traditional search methods struggled with interpreting the meaning behind this content, making it difficult to extract relevant information efficiently. Enter embeddings: a way of representing data in n-dimensional space that enables computers to recognize patterns and context within this unstructured data.

In this blog post, you’ll learn how to build a reverse video search system using Mixpeek for video processing & embedding generation, combined with PostgreSQL as a vector database (powered by pgvector and pgvectorscale) hosted on Timescale Cloud. This system will allow you to query video data using both video and text queries to retrieve relevant video segments based on semantic similarity.

🔖
Learn more about semantic similarity.

Reverse Video System Overview

Before discussing implementation details, let’s explore the overall architecture of the reverse video system we’re building.

Reverse video search system architecture

Video ingestion process

The ingestion process involves inputting the source video data into a vector database. Here is how it works:

  1. Source video: The process starts with uploading a source video.
  2. Video chunks: The video is split into chunks for optimized embedding creation using Mixpeek’s video indexing tool.
  3. Embeddings: Mixpeek’s indexing tool extracts video features from each video chunk to generate vector embeddings using one of the embedding models integrated with it. 

This indexing tool can also pull other useful features, such as text and emotions, extracted as structured JSON. Therefore, Mixpeek enables both semantic and hybrid search

  1. Vector database: The generated vector embeddings are stored in a database that supports vector similarity search—in this case, PostgreSQL with pgvector and pgvectorscale and hosted on Timescale Cloud. This database not only handles vector similarity searches but also enables you to store metadata, such as start and end times, alongside their embeddings

Video retrieval process

  1. Query: the user submits a text or video query.
  2. Query embeddings: Mixpeek converts the query into embeddings to capture its semantic meaning.
  3. Vector search: these embeddings are then compared against the stored embeddings in the vector database to retrieve the closest matches using vector similarity search.

The Timescale and Mixpeek tech stacks complement each other. Mixpeek generates the vector embeddings, while Timescale’s PostgreSQL database—powered by pgvector and pgvectorscale—ensures optimized storage, management, and retrieval of the video data and its embeddings.

Implementation

Now that we have covered the architecture of the reverse video search system, let's move on to the implementation. To make this tutorial more digestible, I divided the implementation into several sections. You can also follow along with this notebook.

Imports and setup

Setting up environment variables

In this section, we first create a .env file to store our environment variables. Let’s first get these variables:

After creating your database service, get the connection string provided in the dashboard.

Database connection dashboard in the Timescale UI
Database connection dashboard in the Timescale UI

Store these variables in the .env file as follows:

MIXPEEK_API_KEY='...'
DATABASE_CONNECTION_STRING='...'

Installing libraries

To get started, let’s install the required libraries:

%pip install psycopg2-binary python-dotenv requests

psycopg2 enables the connection to PostgreSQL. python-dotenv lets you read the values stored in your environment while requests allows you to send HTTP requests easily.

Then, you can import the libraries and load the environment variables as follows:

import json
import os
import psycopg2
import requests
import time
from dotenv import load_dotenv

load_dotenv()

MIXPEEK_API_KEY= os.environ["MIXPEEK_API_KEY"]
DATABASE_CONNECTION_STRING= os.environ["DATABASE_CONNECTION_STRING"]

Mixpeek workflow

In this section, we define different functions related to video indexing, feature extraction, and retrieving video chunks & their embeddings using Mixpeek’s API. Then, we demonstrate how to get embeddings using a video.

Video indexing and feature extraction

In the index_video_file function, we use Mixpeek’s Index Video Url endpoint to process the source video and divide it into chunks.  For each video chunk, this tool does the following:

  • Reads on-screen text using the video-descriptor-v1 model.
  • Generates a 1408-dimensional vector embedding with the multimodal-v1 model
  • Transcribes the audio in the video chunk using the polyglot-v1 model
  • Creates a comprehensive description of the chunk, including the screenplay and sound details
BASE_URL = "https://api.mixpeek.com"

headers = {
    'Authorization': f'Bearer {MIXPEEK_API_KEY}',
    'Content-Type': 'application/json'
}

def index_video_file(video_url, video_name, chunking_interval):
    payload = json.dumps({
        "url": video_url,
        "collection_id": "mixpeek_timescaledb",
        "metadata": {
            "name": video_name
        },
        "video_settings": [
            {
                "interval_sec": chunking_interval,
                "read": {"model_id": "video-descriptor-v1"},
                "embed": {"model_id": "multimodal-v1"},
                "transcribe": {"model_id": "polyglot-v1"},
                "describe": {
                    "model_id": "video-descriptor-v1",
                    "prompt": "Create a holistic description of the video, include sounds and screenplay"
                },
            }
        ]
    })

    indexing_response = requests.post(url=f"{BASE_URL}/index/videos/url", 
                                      headers=headers, 
                                      data=payload)

    task_id = indexing_response.json()["task_id"]
    print(f"Indexing started. Task ID: {task_id}")
    return task_id

Let's use the task_id associated with the indexing process to check its status through the Get Task endpoint. 

def check_task_status(task_id):
    response = requests.get(f"{BASE_URL}/tasks/{task_id}", headers=headers)
    return response.json()["status"]

def get_asset_id(task_id):
    # poll task status every 5 seconds until video processing is done.
    while True:
        status = check_task_status(task_id)
        print(f"Current task status: {status}")
        if status == "DONE":
            break
        time.sleep(5)

    get_task_response = requests.get(url=f"{BASE_URL}/tasks/{task_id}", 
                                     headers=headers)

    asset_id = get_task_response.json()["asset_id"]
    print(f"Task Done. Asset ID: {asset_id}")
    return asset_id

Retrieving video chunks and embeddings

In this part, we access the metadata (start_time and end_time) and feature_ids of the video chunks created using the Get Asset With Features endpoint and the asset_id from the response from the Get Task endpoint.

def retrieve_video_chunks(asset_id):
    get_asset_response = requests.get(url=f"{BASE_URL}/assets/{asset_id}/features", 
                                      headers=headers)
    return get_asset_response.json()["features"]["video"]

Then using the feature_id, let's access the generated vector embedding from each video chunk through the Get Feature endpoint.

def retrieve_video_chunks_embeddings(video_chunks):
    chunks_embeddings = []
    for chunk in video_chunks:
        get_feature_response = requests.get(url=f"{BASE_URL}/features/{chunk['feature_id']}", 
                                            headers=headers, 
                                            params={"include_vectors":True})

        chunks_embeddings.append({
            "start_time": chunk["start_time"],
            "end_time": chunk["end_time"],
            "embedding": get_feature_response.json()["vectors"]["multimodal-v1"]
        })
    return chunks_embeddings

Example

Let's now combine all these parts into one function, get_mixpeek_embeddings, and demonstrate using a video file.

def get_mixpeek_embeddings(video_url, video_name, chunking_interval):
    task_id = index_video_file(video_url, video_name, chunking_interval)
    asset_id = get_asset_id(task_id)
    video_chunks = retrieve_video_chunks(asset_id)
    return retrieve_video_chunks_embeddings(video_chunks)

source_video = "https://mixpeek-public-demo.s3.us-east-2.amazonaws.com/starter/jurassic_park_trailer.mp4"
source_video_embeddings = get_mixpeek_embeddings(video_url=source_video, 
                                                 video_name="source_video", 
                                                 chunking_interval=10)

PostgreSQL workflow

Connecting to PostgreSQL using Timescale Cloud

In this tutorial, we'll use PostgreSQL with the pgvector and pgvectorscale extensions as our vector database. This database instance is hosted on Timescale Cloud. 

The pgvectorscale extension builds on top of pgvector, enabling PostgreSQL to efficiently store and query vector embeddings. You might wonder why you should upgrade from PostgreSQL with pgvector to Timescale Cloud’s AI stack (pgai, pgvectorscale, and pgai Vectorizer). Here’s why:

  • Faster and more cost-effective ANN search: PostgreSQL, powered by pgvector and pgvectorscale, is a faster, more accurate, and more affordable vector database. Compared to popular vector databases like Pinecone, PostgreSQL with pgvector and pgvectorscale achieved 28x lower p95 query latency and 16x higher query throughput for approximate nearest neighbor (ANN) queries at 99 % recall—at only 25 % of Pinecone’s monthly cost.
  • High-performance and scalability for your AI Applications: pgvectorscale boosts PostgreSQL’s ANN capabilities with StreamingDiskANN, a disk-based ANN algorithm that outperforms memory-based indexes like pgvector’s IVFFlat. With no ef_search cutoffs and its streaming model, it enhances query speed and accuracy,  continuously retrieving the “next closest” item, potentially even traversing the entire graph! 
  • A simplified AI stack: Timescale’s AI stack integrates vector embeddings, relational data, and time-series data in one place. This consolidation significantly reduces the complexity of infrastructure management and data synchronization, allowing you to focus on building AI applications.
  • Seamless PostgreSQL compatibility: Since Timescale inherits PostgreSQL’s syntax and robustness, developers with PostgreSQL experience can integrate AI capabilities in their application development without a steep learning curve. 

Use the code below to connect to your database service and confirm database access:

def connect_db():
    return psycopg2.connect(DATABASE_CONNECTION_STRING)


# Ensures database access
with connect_db() as conn:
       with conn.cursor() as curs:
              curs.execute("SELECT 'hello world'; ")
              print(curs.fetchone())

Creating a table for video chunks

Since we are working with embedding data, we need to ensure our PostgreSQL service can support it. Therefore, we install the pgvector and pgvectorscale extensions before creating the table, video_embeddings, that stores information about video segments (or chunks) and their embeddings.

with connect_db() as conn:
    with conn.cursor() as curs:
        # Installs both pgvector and pgvectorscale
        curs.execute("CREATE EXTENSION IF NOT EXISTS vectorscale CASCADE;")
    
    with conn.cursor() as curs:
        curs.execute("""
            CREATE TABLE IF NOT EXISTS video_embeddings(
                id BIGINT PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY,
                embedding VECTOR(1408),
                start_time NUMERIC,
                end_time NUMERIC
            ); 
        """)   

Here's a breakdown of the columns:

  • id: a unique identifier for each video chunk.
  • embedding: a 1408-dimensional vector embedding of the video chunk. 
  • start_time: the starting time of the video chunk. For example, if a video is split into segments, this could be each segment's start time (in seconds or another unit).
  • end_time: the ending time of the video chunk, indicating when the segment finishes.

Data insertion

Let’s ingest the video chunks and their embeddings into our database.

with connect_db() as conn:
    with conn.cursor() as curs:
        for chunk in source_video_embeddings:
            curs.execute('''
                INSERT INTO video_embeddings (embedding, start_time, end_time)
                VALUES (%(embedding)s, %(start_time)s, %(end_time)s);
            ''',chunk)
            
    with conn.cursor() as curs:
        curs.execute('''
            SELECT start_time, end_time, vector_dims(embedding) 
            FROM video_embeddings;
        ''')
        for row in curs.fetchall():
            print(f"start_time: {row[0]}, end_time: {row[1]}, embedding_dimensions: {row[2]}")

Creating the index on embeddings

Vector search queries will primarily target the embedding column, so we create an index on this column using StreamingDiskANN. It significantly speeds up vector similarity searches. 

with connect_db() as conn:
    with conn.cursor() as curs:
        curs.execute('''
                CREATE INDEX video_embeddings_idx 
                ON video_embeddings
                USING diskann (embedding);
        ''')

Search functions

In this section, we demonstrate two search functions for retrieving relevant video chunks: one based on video input and the other based on text query. The idea is to search for similar video chunks stored in the database by comparing embeddings, allowing us to match the content of the video query or find similar scenes based on descriptive text.

For each query, we first generate vector embeddings and then use them to search for the closest video chunks through the source video embeddings, ranking results by cosine distance. Let’s first define a helper function for vector similarity search. 

# helper function for vector similarity search 
def retrieve_closest_video_chunks(query_embedding, limit):
  with connect_db() as conn:
    with conn.cursor() as curs:
        curs.execute('''
                    SELECT start_time, end_time
                    FROM video_embeddings
                    ORDER BY embedding <=> %s::vector
                    LIMIT %s
            ''', (query_embedding['embedding'], limit))

        print("CLOSEST VIDEO CHUNKS:")
        closest_video_chunks = []
        for row in curs.fetchall():
            print(f"start_time: {row[0]}, end_time: {row[1]}")
            closest_video_chunks.append({
               "start_time": row[0],
               "end_time": row[1]
            })

Video query search

video_query = "https://mixpeek-public-demo.s3.us-east-2.amazonaws.com/starter/jurassic_bunny.mp4" 
video_query_embeddings = get_mixpeek_embeddings(video_url=video_query, 
                                                video_name="video_query", 
                                                chunking_interval=5)


retrieve_closest_video_chunks(video_query_embeddings[0], 2)

Here are the results of this query:

CLOSEST VIDEO CHUNKS:
start_time: 60.0, end_time: 70.0
start_time: 20.0, end_time: 30.0
One of the frames in the video_segment (start_time: 60.0, end_time: 70.0)
One of the frames in the video_segment (start_time: 60.0, end_time: 70.0)

In this part, let's use the Index Text endpoint to generate embeddings for the text query and then use them to perform a vector similarity search.

text_query = "two people in a car"

payload = json.dumps({
    "text": text_query,
    "collection_id": "mixpeek_timescale",
    "metadata": {
        "author": "user"
    },
    "text_settings": {
        "embed": {"model_id": "multimodal-v1"}
    }
})

index_text_response = requests.post(url=f"{BASE_URL}/index/text", 
                                    headers=headers, 
                                    data=payload)

task_id = index_text_response.json()["task_id"]
print(f"Indexing started. Task ID: {task_id}")

# retrieve feature extracted from the text query
asset_id = get_asset_id(index_text_response.json()["task_id"])
get_asset_response = requests.get(url=f"{BASE_URL}/assets/{asset_id}/features", 
                                  headers=headers)
text_asset = get_asset_response.json()["features"]["text"]

# extract the generated text embedding
get_feature_response = requests.get(url=f"{BASE_URL}/features/{text_asset[0]['feature_id']}", 
                                    headers=headers, 
                                    params={"include_vectors":True})

text_query_embedding = {
    "embedding": get_feature_response.json()["vectors"]['multimodal-v1']
}


retrieve_closest_video_chunks(text_query_embedding, 2)

Here are the results of this query:

CLOSEST VIDEO CHUNKS:
start_time: 30.0, end_time: 40.0
start_time: 40.0, end_time: 50.0
One of the frames from video segments (start_time: 30.0, end_time: 40.0)
One of the frames from video segments (start_time: 30.0, end_time: 40.0)

This demo uses a single video. However, we can extend the same approach to handle a collection of videos. 

Conclusion

Unstructured data, particularly videos, can be challenging to manage, but Mixpeek’s video processing and embedding generation simplify the task, making the data searchable and more accessible. With content creation and generation increasingly in the hands of everyday users, data accumulation will only continue to grow. As a result, search engines need to evolve.

In this article, we covered how to build a reverse video search engine using Mixpeek and Timescale Cloud’s mature PostgreSQL cloud platform. This stack potentially paves the way for many enhancements in multi-modal video analysis and retrieval. We can deploy add-ons to the current system, for example, integrating AI-generated sentiment analysis or treating support queries in several languages.

AI is still in its early stages. Video search and understanding will continue to evolve. If you're interested in implementing these solutions, check out Mixpeek’s API documentation and Timescale’s AI stack to start building your own advanced video search engine.

Further reading:

Originally posted

Nov 08, 2024

Share

pgai

3.1k

pgvectorscale

1.5k

Subscribe to the Timescale Newsletter

By submitting you acknowledge Timescale's Privacy Policy.