Category: All posts
Nov 08, 2024
In case you missed the pgai Vectorizer launch, learn why vector databases are the wrong abstraction and how you can create AI embeddings right within your PostgreSQL database.
Until recently, searching through unstructured data—like text, images, and videos—was a nontrivial problem. Traditional search methods struggled with interpreting the meaning behind this content, making it difficult to extract relevant information efficiently. Enter embeddings: a way of representing data in n-dimensional space that enables computers to recognize patterns and context within this unstructured data.
In this blog post, you’ll learn how to build a reverse video search system using Mixpeek for video processing & embedding generation, combined with PostgreSQL as a vector database (powered by pgvector and pgvectorscale) hosted on Timescale Cloud. This system will allow you to query video data using both video and text queries to retrieve relevant video segments based on semantic similarity.
Before discussing implementation details, let’s explore the overall architecture of the reverse video system we’re building.
The ingestion process involves inputting the source video data into a vector database. Here is how it works:
This indexing tool can also pull other useful features, such as text and emotions, extracted as structured JSON. Therefore, Mixpeek enables both semantic and hybrid search!
The Timescale and Mixpeek tech stacks complement each other. Mixpeek generates the vector embeddings, while Timescale’s PostgreSQL database—powered by pgvector and pgvectorscale—ensures optimized storage, management, and retrieval of the video data and its embeddings.
Now that we have covered the architecture of the reverse video search system, let's move on to the implementation. To make this tutorial more digestible, I divided the implementation into several sections. You can also follow along with this notebook.
In this section, we first create a .env
file to store our environment variables. Let’s first get these variables:
After creating your database service, get the connection string provided in the dashboard.
Store these variables in the .env
file as follows:
MIXPEEK_API_KEY='...'
DATABASE_CONNECTION_STRING='...'
To get started, let’s install the required libraries:
%pip install psycopg2-binary python-dotenv requests
psycopg2
enables the connection to PostgreSQL. python-dotenv
lets you read the values stored in your environment while requests allows you to send HTTP requests easily.
Then, you can import the libraries and load the environment variables as follows:
import json
import os
import psycopg2
import requests
import time
from dotenv import load_dotenv
load_dotenv()
MIXPEEK_API_KEY= os.environ["MIXPEEK_API_KEY"]
DATABASE_CONNECTION_STRING= os.environ["DATABASE_CONNECTION_STRING"]
In this section, we define different functions related to video indexing, feature extraction, and retrieving video chunks & their embeddings using Mixpeek’s API. Then, we demonstrate how to get embeddings using a video.
In the index_video_file
function, we use Mixpeek’s Index Video Url endpoint to process the source video and divide it into chunks. For each video chunk, this tool does the following:
video-descriptor-v1
model.multimodal-v1
modelpolyglot-v1
modelBASE_URL = "https://api.mixpeek.com"
headers = {
'Authorization': f'Bearer {MIXPEEK_API_KEY}',
'Content-Type': 'application/json'
}
def index_video_file(video_url, video_name, chunking_interval):
payload = json.dumps({
"url": video_url,
"collection_id": "mixpeek_timescaledb",
"metadata": {
"name": video_name
},
"video_settings": [
{
"interval_sec": chunking_interval,
"read": {"model_id": "video-descriptor-v1"},
"embed": {"model_id": "multimodal-v1"},
"transcribe": {"model_id": "polyglot-v1"},
"describe": {
"model_id": "video-descriptor-v1",
"prompt": "Create a holistic description of the video, include sounds and screenplay"
},
}
]
})
indexing_response = requests.post(url=f"{BASE_URL}/index/videos/url",
headers=headers,
data=payload)
task_id = indexing_response.json()["task_id"]
print(f"Indexing started. Task ID: {task_id}")
return task_id
Let's use the task_id
associated with the indexing process to check its status through the Get Task endpoint.
def check_task_status(task_id):
response = requests.get(f"{BASE_URL}/tasks/{task_id}", headers=headers)
return response.json()["status"]
def get_asset_id(task_id):
# poll task status every 5 seconds until video processing is done.
while True:
status = check_task_status(task_id)
print(f"Current task status: {status}")
if status == "DONE":
break
time.sleep(5)
get_task_response = requests.get(url=f"{BASE_URL}/tasks/{task_id}",
headers=headers)
asset_id = get_task_response.json()["asset_id"]
print(f"Task Done. Asset ID: {asset_id}")
return asset_id
In this part, we access the metadata (start_time
and end_time
) and feature_ids
of the video chunks created using the Get Asset With Features endpoint and the asset_id
from the response from the Get Task endpoint.
def retrieve_video_chunks(asset_id):
get_asset_response = requests.get(url=f"{BASE_URL}/assets/{asset_id}/features",
headers=headers)
return get_asset_response.json()["features"]["video"]
Then using the feature_id
, let's access the generated vector embedding from each video chunk through the Get Feature endpoint.
def retrieve_video_chunks_embeddings(video_chunks):
chunks_embeddings = []
for chunk in video_chunks:
get_feature_response = requests.get(url=f"{BASE_URL}/features/{chunk['feature_id']}",
headers=headers,
params={"include_vectors":True})
chunks_embeddings.append({
"start_time": chunk["start_time"],
"end_time": chunk["end_time"],
"embedding": get_feature_response.json()["vectors"]["multimodal-v1"]
})
return chunks_embeddings
Let's now combine all these parts into one function, get_mixpeek_embeddings
, and demonstrate using a video file.
def get_mixpeek_embeddings(video_url, video_name, chunking_interval):
task_id = index_video_file(video_url, video_name, chunking_interval)
asset_id = get_asset_id(task_id)
video_chunks = retrieve_video_chunks(asset_id)
return retrieve_video_chunks_embeddings(video_chunks)
source_video = "https://mixpeek-public-demo.s3.us-east-2.amazonaws.com/starter/jurassic_park_trailer.mp4"
source_video_embeddings = get_mixpeek_embeddings(video_url=source_video,
video_name="source_video",
chunking_interval=10)
In this tutorial, we'll use PostgreSQL with the pgvector and pgvectorscale extensions as our vector database. This database instance is hosted on Timescale Cloud.
The pgvectorscale extension builds on top of pgvector, enabling PostgreSQL to efficiently store and query vector embeddings. You might wonder why you should upgrade from PostgreSQL with pgvector to Timescale Cloud’s AI stack (pgai, pgvectorscale, and pgai Vectorizer). Here’s why:
ef_search
cutoffs and its streaming model, it enhances query speed and accuracy, continuously retrieving the “next closest” item, potentially even traversing the entire graph! Use the code below to connect to your database service and confirm database access:
def connect_db():
return psycopg2.connect(DATABASE_CONNECTION_STRING)
# Ensures database access
with connect_db() as conn:
with conn.cursor() as curs:
curs.execute("SELECT 'hello world'; ")
print(curs.fetchone())
Since we are working with embedding data, we need to ensure our PostgreSQL service can support it. Therefore, we install the pgvector and pgvectorscale extensions before creating the table, video_embeddings
, that stores information about video segments (or chunks) and their embeddings.
with connect_db() as conn:
with conn.cursor() as curs:
# Installs both pgvector and pgvectorscale
curs.execute("CREATE EXTENSION IF NOT EXISTS vectorscale CASCADE;")
with conn.cursor() as curs:
curs.execute("""
CREATE TABLE IF NOT EXISTS video_embeddings(
id BIGINT PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY,
embedding VECTOR(1408),
start_time NUMERIC,
end_time NUMERIC
);
""")
Here's a breakdown of the columns:
id
: a unique identifier for each video chunk.embedding
: a 1408-dimensional vector embedding of the video chunk. start_time
: the starting time of the video chunk. For example, if a video is split into segments, this could be each segment's start time (in seconds or another unit).end_time
: the ending time of the video chunk, indicating when the segment finishes.Let’s ingest the video chunks and their embeddings into our database.
with connect_db() as conn:
with conn.cursor() as curs:
for chunk in source_video_embeddings:
curs.execute('''
INSERT INTO video_embeddings (embedding, start_time, end_time)
VALUES (%(embedding)s, %(start_time)s, %(end_time)s);
''',chunk)
with conn.cursor() as curs:
curs.execute('''
SELECT start_time, end_time, vector_dims(embedding)
FROM video_embeddings;
''')
for row in curs.fetchall():
print(f"start_time: {row[0]}, end_time: {row[1]}, embedding_dimensions: {row[2]}")
Vector search queries will primarily target the embedding
column, so we create an index on this column using StreamingDiskANN. It significantly speeds up vector similarity searches.
with connect_db() as conn:
with conn.cursor() as curs:
curs.execute('''
CREATE INDEX video_embeddings_idx
ON video_embeddings
USING diskann (embedding);
''')
In this section, we demonstrate two search functions for retrieving relevant video chunks: one based on video input and the other based on text query. The idea is to search for similar video chunks stored in the database by comparing embeddings, allowing us to match the content of the video query or find similar scenes based on descriptive text.
For each query, we first generate vector embeddings and then use them to search for the closest video chunks through the source video embeddings, ranking results by cosine distance. Let’s first define a helper function for vector similarity search.
# helper function for vector similarity search
def retrieve_closest_video_chunks(query_embedding, limit):
with connect_db() as conn:
with conn.cursor() as curs:
curs.execute('''
SELECT start_time, end_time
FROM video_embeddings
ORDER BY embedding <=> %s::vector
LIMIT %s
''', (query_embedding['embedding'], limit))
print("CLOSEST VIDEO CHUNKS:")
closest_video_chunks = []
for row in curs.fetchall():
print(f"start_time: {row[0]}, end_time: {row[1]}")
closest_video_chunks.append({
"start_time": row[0],
"end_time": row[1]
})
Video query search
video_query = "https://mixpeek-public-demo.s3.us-east-2.amazonaws.com/starter/jurassic_bunny.mp4"
video_query_embeddings = get_mixpeek_embeddings(video_url=video_query,
video_name="video_query",
chunking_interval=5)
retrieve_closest_video_chunks(video_query_embeddings[0], 2)
Here are the results of this query:
CLOSEST VIDEO CHUNKS:
start_time: 60.0, end_time: 70.0
start_time: 20.0, end_time: 30.0
In this part, let's use the Index Text endpoint to generate embeddings for the text query and then use them to perform a vector similarity search.
text_query = "two people in a car"
payload = json.dumps({
"text": text_query,
"collection_id": "mixpeek_timescale",
"metadata": {
"author": "user"
},
"text_settings": {
"embed": {"model_id": "multimodal-v1"}
}
})
index_text_response = requests.post(url=f"{BASE_URL}/index/text",
headers=headers,
data=payload)
task_id = index_text_response.json()["task_id"]
print(f"Indexing started. Task ID: {task_id}")
# retrieve feature extracted from the text query
asset_id = get_asset_id(index_text_response.json()["task_id"])
get_asset_response = requests.get(url=f"{BASE_URL}/assets/{asset_id}/features",
headers=headers)
text_asset = get_asset_response.json()["features"]["text"]
# extract the generated text embedding
get_feature_response = requests.get(url=f"{BASE_URL}/features/{text_asset[0]['feature_id']}",
headers=headers,
params={"include_vectors":True})
text_query_embedding = {
"embedding": get_feature_response.json()["vectors"]['multimodal-v1']
}
retrieve_closest_video_chunks(text_query_embedding, 2)
Here are the results of this query:
CLOSEST VIDEO CHUNKS:
start_time: 30.0, end_time: 40.0
start_time: 40.0, end_time: 50.0
This demo uses a single video. However, we can extend the same approach to handle a collection of videos.
Unstructured data, particularly videos, can be challenging to manage, but Mixpeek’s video processing and embedding generation simplify the task, making the data searchable and more accessible. With content creation and generation increasingly in the hands of everyday users, data accumulation will only continue to grow. As a result, search engines need to evolve.
In this article, we covered how to build a reverse video search engine using Mixpeek and Timescale Cloud’s mature PostgreSQL cloud platform. This stack potentially paves the way for many enhancements in multi-modal video analysis and retrieval. We can deploy add-ons to the current system, for example, integrating AI-generated sentiment analysis or treating support queries in several languages.
AI is still in its early stages. Video search and understanding will continue to evolve. If you're interested in implementing these solutions, check out Mixpeek’s API documentation and Timescale’s AI stack to start building your own advanced video search engine.