使用 TwelveLabs 的 Marengo 视频嵌入模型与 Amazon Bedrock 和 Elasticsearch
摘要: Elasticsearch与TwelveLabs的Marengo视频嵌入模型集成,通过AWS Bedrock平台实现视频内容搜索。文章演示了如何将电影预告片上传至S3存储桶,利用Bedrock异步生成视频嵌入,并将结果存入Elasticsearch进行向量搜索。该方法突破了传统依赖文本元数据的局限,直接捕捉视频动态内容特征。作者还比较了不同量化方法对搜索性能的影响,展示了1024维密集向量
作者:来自 Elastic Dave Erickson
创建一个小应用来搜索来自 TwelveLabs 的 Marengo 模型的视频嵌入
从向量搜索到强大的 REST API,Elasticsearch 为开发者提供了最全面的搜索工具包。深入 GitHub 上的示例笔记本来尝试新东西。你也可以立即开始免费试用或在本地运行 Elasticsearch。
这篇博客文章探讨了 TwelveLabs 为其视频嵌入模型 Marengo 推出的新的 Bedrock 集成,并演示了如何将生成的视频嵌入与 Elasticsearch 向量数据库一起使用。下面的演练详细说明了如何利用这一组合来搜索近期夏季大片的预告片。
动机
真实数据不仅仅是文本。在当今的 TikTok、工作视频通话和直播会议的世界里,内容越来越多地基于视频。在企业领域也是如此。无论是为了公共安全、合规审计,还是客户满意度,多模态 AI 都有潜力为知识应用解锁音频和视频。
然而,当我在大量内容中搜索时,我经常感到沮丧,因为除非我搜索的词语被捕捉到元数据里或在录制中被说出,否则我找不到视频。在移动应用时代的“它就是能用”的期望,已经在 AI 时代转变为 “它就是理解我的数据”。要实现这一点,AI 需要能够原生访问视频,而不是先将其转换为文本。
空间推理和视频理解这样的术语在视频和机器人领域都有应用。将视频理解加入到我们的工具集中,将是构建能够超越文本的 AI 系统的重要一步。
视频模型的超级能力
在使用专门的视频模型之前,我的常规方法是使用像 Whisper 这样的模型生成音频转录,再结合来自图像模型的密集向量嵌入,用于视频中提取的静态帧。这种方法在一些视频中效果不错,但当主题快速变化,或者关键信息实际上存在于拍摄对象的运动中时,就会失败。
简单来说,仅仅依赖图像模型会遗漏视频内容中大量的信息。
几个月前,我第一次通过 TwelveLabs 的 SaaS 平台接触到他们,它允许你上传视频进行一站式异步处理。他们有两个模型系列:
- Marengo 是一个多模态嵌入模型,它不仅能从静态图像中捕捉含义,还能从动态视频片段中捕捉含义 —— 类似于文本嵌入模型能够从整段文字中捕捉含义,而不仅仅是单词。
- Pegasus 是一个视频理解模型,可以用来生成字幕,或在片段上下文中回答类似 RAG 的问题。
虽然我喜欢这个 SaaS 服务的易用性和 API,但上传数据并不总是可行的。我的客户往往有数 TB 的敏感数据,不允许离开他们的控制范围。这就是 AWS Bedrock 发挥作用的地方。
TwelveLabs 已经将他们的核心模型提供在按需的 Bedrock 平台上,使源数据可以保存在我控制的 S3 存储桶中,并且只在安全计算模式下访问,而无需持久化在第三方系统中。这是个好消息,因为企业客户的视频用例通常包含商业机密、带有 PII 的记录,或其他受严格安全和隐私法规约束的信息。
我认为 Bedrock 集成能够解锁许多用例。
让我们搜索一些电影预告片
注意:Python 导入和通过 .env 文件处理环境变量的完整代码在 Python notebook 版本中。
依赖项:
-
你需要一个可以由你的 AWS ID 写入的 S3 存储桶
-
你需要 Elasticsearch 的主机 URL 和 API key,可以是本地部署或 Elastic Cloud
-
这段代码假设 Elasticsearch 版本为 8.17+ 或 9.0+
-
一个很好的快速测试数据源是电影预告片。它们剪辑快速、视觉效果惊艳,并且通常包含高动作场景。你可以获取自己的 .mp4 文件,或者使用 https://github.com/yt-dlp/yt-dlp 从 YouTube 获取小规模的文件。
一旦文件在我们的本地文件系统中,我们需要将它们上传到我们的 S3 存储桶:
# Initialize AWS session
session = boto3.session.Session(
aws_access_key_id=AWS_ACCESS_KEY_ID,
aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
region_name=AWS_REGION
)
#########
## Validate S3 Configuration
#########
aws_account_id = session.client('sts').get_caller_identity()["Account"]
print(f"AWS Account ID: {aws_account_id}")
s3_client = session.client('s3')
# Verify bucket access
try:
s3_client.head_bucket(Bucket=S3_BUCKET_NAME)
print(f"✅ Successfully connected to S3 bucket: {S3_BUCKET_NAME}")
except Exception as e:
print(f"❌ Error accessing S3 bucket: {e}")
print("Please ensure the bucket exists and you have proper permissions.")
#########
## Upload videos to S3, and make note of where we put them in data object
#########
for video_object in video_objects:
# Get the video file path
video_path = video_object.get_video_path()
# Skip if video path is not set
if not video_path:
print(f"Skipping {video_object.get_video_string()} - No video path set")
continue
# Define S3 destination key - organize by platform and video ID
# put this information in our data object for later
s3_key = video_object.get_s3_key()
if not s3_key:
s3_key = f"{S3_VIDEOS_PATH}/{video_object.get_platform()}/{video_object.get_video_id()}/{os.path.basename(video_path)}"
video_object.set_s3_key(s3_key)
try:
# Check if file already exists in S3
try:
s3_client.head_object(Bucket=S3_BUCKET_NAME, Key=s3_key)
print(f"Video {video_object.get_video_string()} already exists in S3. Skipping upload.")
continue
except botocore.exceptions.ClientError as e:
if e.response['Error']['Code'] == '404':
# File doesn't exist in S3, proceed with upload
pass
else:
# Some other error occurred
raise e
# Upload the video to S3
print(f"Uploading {video_object.get_video_string()} to S3...")
s3_client.upload_file(video_path, S3_BUCKET_NAME, s3_key)
print(f"Successfully uploaded {video_object.get_video_string()} to S3")
except Exception as e:
print(f"Error uploading {video_object.get_video_string()} to S3: {str(e)}")
现在我们可以使用异步 Bedrock 调用来创建视频嵌入:
#########
## Use Bedrock hosted Twelve Labs models to create video embeddings
#########
# Helper function to wait for async embedding results
def wait_for_embedding_output(s3_bucket: str, s3_prefix: str, invocation_arn: str, verbose: bool = False) -> list:
"""
Wait for Bedrock async embedding task to complete and retrieve results
Args:
s3_bucket (str): The S3 bucket name
s3_prefix (str): The S3 prefix for the embeddings
invocation_arn (str): The ARN of the Bedrock async embedding task
Returns:
list: A list of embedding data
Raises:
Exception: If the embedding task fails or no output.json is found
"""
# Wait until task completes
status = None
while status not in ["Completed", "Failed", "Expired"]:
response = bedrock_client.get_async_invoke(invocationArn=invocation_arn)
status = response['status']
if verbose:
clear_output(wait=True)
tqdm.tqdm.write(f"Embedding task status: {status}")
time.sleep(5)
if status != "Completed":
raise Exception(f"Embedding task failed with status: {status}")
# Retrieve the output from S3
response = s3_client.list_objects_v2(Bucket=s3_bucket, Prefix=s3_prefix)
for obj in response.get('Contents', []):
if obj['Key'].endswith('output.json'):
output_key = obj['Key']
obj = s3_client.get_object(Bucket=s3_bucket, Key=output_key)
content = obj['Body'].read().decode('utf-8')
data = json.loads(content).get("data", [])
return data
raise Exception("No output.json found in S3 prefix")
# Create video embedding
def create_video_embedding(video_s3_uri: str, video_id: str) -> list:
"""
Create embeddings for video using Marengo on Bedrock
Args:
video_s3_uri (str): The S3 URI of the video to create an embedding for
video_id (str): the identifying unique id of the video, to be used as a uuid
Returns:
list: A list of embedding data
"""
unique_id = video_id
s3_output_prefix = f'{S3_EMBEDDINGS_PATH}/{S3_VIDEOS_PATH}/{unique_id}'
response = bedrock_client.start_async_invoke(
modelId=MARENGO_MODEL_ID,
modelInput={
"inputType": "video",
"mediaSource": {
"s3Location": {
"uri": video_s3_uri,
"bucketOwner": aws_account_id
}
}
},
outputDataConfig={
"s3OutputDataConfig": {
"s3Uri": f's3://{S3_BUCKET_NAME}/{s3_output_prefix}'
}
}
)
invocation_arn = response["invocationArn"]
print(f"Video embedding task started: {invocation_arn}")
# Wait for completion and get results
try:
embedding_data = wait_for_embedding_output(S3_BUCKET_NAME, s3_output_prefix, invocation_arn)
except Exception as e:
print(f"Error waiting for embedding output: {e}")
return None
return embedding_data
def check_existing_embedding(video_id: str) -> bool:
"""Check S3 folder to see if this video already has an embedding created to avoid re-inference"""
s3_output_prefix = f'{S3_EMBEDDINGS_PATH}/{S3_VIDEOS_PATH}/{video_id}'
print(s3_output_prefix)
try:
# Check if any files exist at this prefix
response = s3_client.list_objects_v2(Bucket=S3_BUCKET_NAME, Prefix=s3_output_prefix)
if 'Contents' in response and any(obj['Key'].endswith('output.json') for obj in response.get('Contents', [])):
print(f"Embedding {video_object.get_video_string()} already has an embedding. Skipping embedding creation.")
# Find the output.json file
for obj in response.get('Contents', []):
if obj['Key'].endswith('output.json'):
output_key = obj['Key']
# Get the object from S3
obj = s3_client.get_object(Bucket=S3_BUCKET_NAME, Key=output_key)
# Read the content and parse as JSON
content = obj['Body'].read().decode('utf-8')
embedding_data = json.loads(content).get("data", [])
return embedding_data
else:
print(f"No existing embedding found for {video_object.get_video_string()}.")
return None
except botocore.exceptions.ClientError as e:
if e.response['Error']['Code'] == '404':
# File doesn't exist in S3, proceed with upload
print("Did not find embedding in s3")
return None
else:
# Some other error occurred
raise e
def create_s3_uri(bucket_name: str, key: str)-> str:
video_uri = f"s3://{bucket_name}/{key}"
return video_uri
## Generate the embeddings one at a time, use S3 as cache to prevent double embedding generations
for video_object in tqdm.tqdm(video_objects, desc="Processing videos"):
s3_key = video_object.get_s3_key()
video_id = video_object.get_video_id()
video_uri = create_s3_uri(S3_BUCKET_NAME, s3_key)
retrieved_embeddings = check_existing_embedding(video_id)
if retrieved_embeddings:
video_object.set_embeddings_list(retrieved_embeddings)
else:
video_embedding_data = create_video_embedding(video_uri, video_id)
video_object.set_embeddings_list(video_embedding_data)
现在我们已经在本地内存中的视频对象里得到了嵌入,这里做一个快速打印测试,看看返回了什么:
video_embedding_data = video_objects[0].get_embeddings_list()
##Preview Print
for i, embedding in enumerate(video_embedding_data[:3]):
print(f"{i}")
for key in embedding:
if "embedding" == key:
print(f"\t{key}: len {len(embedding[key])}")
else:
print(f"\t{key}: {embedding[key]}")
输出如下:
0
embedding: len 1024
embeddingOption: visual-text
startSec: 0.0
endSec: 6.199999809265137
1
embedding: len 1024
embeddingOption: visual-text
startSec: 6.199999809265137
endSec: 10.399999618530273
2
embedding: len 1024
embeddingOption: visual-text
startSec: 10.399999618530273
endSec: 17.299999237060547
插入到 Elasticsearch
我们将把对象上传到 Elasticsearch —— 在我的例子中,大约有 155 个视频片段的元数据和嵌入。在如此小的规模下,使用平铺的 float32 索引进行暴力最近邻搜索是最有效且最经济的方法。不过,下面的示例演示了如何为 Elasticsearch 支持的大规模用例中的每个流行量化级别创建不同的索引。参见 Elastic 关于更好的二进制量化(BBQ)功能的这篇文章。
es = Elasticsearch(
hosts=[ELASTICSEARCH_ENDPOINT],
api_key=ELASTICSEARCH_API_KEY
)
es_detail = es.info().body
if "version" in es_detail:
identifier = es_detail['version']['build_flavor'] if 'build_flavor' in es_detail['version'] else es_detail['version']['number']
print(f"✅ Successfully connected to Elasticsearch: {es_detail['version']['build_flavor']}")
docs = []
for video_object in video_objects:
persist_object = video_object.get_video_object()
embeddings = video_object.get_embeddings_list()
for embedding in embeddings:
if embedding["embeddingOption"] == "visual-image":
# Create a copy of the persist object and add embedding details
doc = copy.deepcopy(persist_object)
doc["embedding"] = embedding["embedding"]
doc["start_sec"] = embedding["startSec"]
doc["end_sec"] = embedding["endSec"]
docs.append(doc)
index_varieties = [
"flat", ## brute force float32
"hnsw", ## float32 hnsw graph data structure
"int8_hnsw", ## int8 hnsw graph data structure, default for lower dimension models
"bbq_hnsw", ## Better Binary Qunatization HNSW, default for higher dimension models
"bbq_flat" ## brute force + Better Binary Quantization
]
for index_variety in index_varieties:
# Create an index for the movie trailer embeddings
# Define mapping with proper settings for dense vector search
index_name = f"twelvelabs-movie-trailer-{index_variety}"
mappings = {
"properties": {
"url": {"type": "keyword"},
"platform": {"type": "keyword"},
"video_id": {"type": "keyword"},
"title": {"type": "text", "analyzer": "standard"},
"embedding": {
"type": "dense_vector",
"dims": 1024,
"similarity": "cosine",
"index_options": {
"type": index_variety
}
},
"start_sec": {"type": "float"},
"end_sec": {"type": "float"}
}
}
# Check if index already exists
if es.indices.exists(index=index_name):
print(f"Deleting Index '{index_name}' and then sleeping for 2 seconds")
es.indices.delete(index=index_name)
sleep(2)
# Create the index
es.indices.create(index=index_name, mappings=mappings)
print(f"Index '{index_name}' created successfully")
for index_variety in index_varieties:
# Create an index for the movie trailer embeddings
# Define mapping with proper settings for dense vector search
index_name = f"twelvelabs-movie-trailer-{index_variety}"
# Bulk insert docs into Elasticsearch index
print(f"Indexing {len(docs)} documents into {index_name}...")
# Create actions for bulk API
actions = []
for doc in docs:
actions.append({
"_index": index_name,
"_source": doc
})
# Perform bulk indexing with error handling
try:
success, failed = bulk(es, actions, chunk_size=100, max_retries=3,
initial_backoff=2, max_backoff=60)
print(f"\tSuccessfully indexed {success} documents into {index_name}")
if failed:
print(f"\tFailed to index {len(failed)} documents")
except Exception as e:
print(f"Error during bulk indexing: {e}")
print(f"Completed indexing documents into {index_name}")
运行搜索
TwelveLabs 的 Bedrock 实现允许异步调用将文本生成向量嵌入到 S3。然而,下面我们将使用延迟更低的同步 invoke_model,直接为我们的搜索查询获取文本嵌入。(文本 Marengo 文档示例在这里。)
# Create text embedding
def create_text_embedding(text_query: str) -> list:
text_model_id = TEXT_EMBEDDING_MODEL_ID
text_model_input = {
"inputType": "text",
"inputText": text_query
}
response = bedrock_client.invoke_model(
modelId=text_model_id,
body=json.dumps(text_model_input)
)
response_body = json.loads(response['body'].read().decode('utf-8'))
embedding_data = response_body.get("data", [])
if embedding_data:
return embedding_data[0]["embedding"]
else:
return None
def vector_query(index_name: str, text_query: str) -> dict:
query_embedding = create_text_embedding(text_query)
query = {
"retriever": {
"knn": {
"field": "embedding",
"query_vector": query_embedding,
"k": 10,
"num_candidates": "25"
}
},
"size": 10,
"_source": False,
"fields": ["title", "video_id", "start_sec"]
}
return es.search(index=index_name, body=query).body
text_query = "Show me scenes with dinosaurs"
print (vector_query("twelvelabs-movie-trailer-flat", text_query))
返回的 JSON 就是我们的搜索结果!但为了创建一个更易用的测试界面,我们可以使用一些快速的 iPython 小部件:
from ipywidgets import widgets, HTML as WHTML, HBox, Layout
from IPython.display import display
def display_search_results_html(query):
results = vector_query("twelvelabs-movie-trailer-flat", query)
hits = results.get('hits', {}).get('hits', [])
if not hits:
return "<p>No results found</p>"
items = []
for hit in hits:
fields = hit.get('fields', {})
title = fields.get('title', ['No Title'])[0]
score = hit.get('_score', 0)
video_id = fields.get('video_id', [''])[0]
start_sec = fields.get('start_sec', [0])[0]
url =
f"https://www.youtube.com/watch?v={video_id}&t={int(start_sec)}s"
items.append(f'<li><a href="{url}" target="_blank">{title} (Start: {float(start_sec):.1f}s)</a> <span>Score: {score}</span></li>')
return "<h3>Search Results:</h3><ul>" + "\n".join(items) + "</ul>"
def search_videos():
search_input = widgets.Text(
value='',
placeholder='Enter your search query…',
description='Search:',
layout=Layout(width='70%')
)
search_button = widgets.Button(
description='Search Videos',
button_style='primary',
layout=Layout(width='20%')
)
# Use a single HTML widget for output; update its .value to avoid double-rendering
results_box = WHTML(value="")
def on_button_click(_):
q = search_input.value.strip()
if not q:
results_box.value = "<p>Please enter a search query</p>"
return
results_box.value = "<p>Searching…</p>"
results_box.value = display_search_results_html(q)
# Avoid multiple handler attachments if the cell is re-run
try:
search_button._click_handlers.callbacks.clear()
except Exception:
pass
search_button.on_click(on_button_click)
display(HBox([search_input, search_button]))
display(results_box)
# Call this to create the UI
search_videos()
让我们在预告片中搜索一些视觉内容。
比较量化方法
较新的 Elasticsearch 版本默认对 1024 维密集向量使用 bbq_hnsw,它在通过对原始 float32 在过采样候选窗口中重评分来保持准确性的同时,提供最佳的速度和可扩展性。
为了通过简单的 UI 比较量化对搜索结果的影响,可以查看一个名为 Relevance Studio 的新项目。
如果我们在 Kibana 中检查索引管理,或使用 curl 执行 GET /_cat/indices ,我们会看到每个选项的存储大小大致相同。乍一看,这可能会让人困惑,但请记住存储大小大致相等,因为索引包含用于重评分的向量 float32 表示。在 bbq_hnsw 中,图中只使用量化的二进制向量表示,从而在索引和搜索时节省成本并提高性能。
最后的想法
对于单个 1024 维密集向量来说,这些结果令人印象深刻。我很期待尝试将 Marengo 模型的强大功能与混合搜索方法结合,包括音频转录,以及 Elasticsearch 的地理空间过滤和 RBAC/ABAC 访问控制。你希望 AI 对哪些视频了解一切?
原文:https://www.elastic.co/search-labs/blog/twelvelabs-marengo-video-embedding-amazon-bedrock
更多推荐
所有评论(0)