作者:来自 Elastic  Dave Erickson

创建一个小应用来搜索来自 TwelveLabs 的 Marengo 模型的视频嵌入

从向量搜索到强大的 REST API,Elasticsearch 为开发者提供了最全面的搜索工具包。深入 GitHub 上的示例笔记本来尝试新东西。你也可以立即开始免费试用或在本地运行 Elasticsearch

这篇博客文章探讨了 TwelveLabs 为其视频嵌入模型 Marengo 推出的新的 Bedrock 集成,并演示了如何将生成的视频嵌入与 Elasticsearch 向量数据库一起使用。下面的演练详细说明了如何利用这一组合来搜索近期夏季大片的预告片。

动机

真实数据不仅仅是文本。在当今的 TikTok、工作视频通话和直播会议的世界里,内容越来越多地基于视频。在企业领域也是如此。无论是为了公共安全、合规审计,还是客户满意度,多模态 AI 都有潜力为知识应用解锁音频和视频。

然而,当我在大量内容中搜索时,我经常感到沮丧,因为除非我搜索的词语被捕捉到元数据里或在录制中被说出,否则我找不到视频。在移动应用时代的“它就是能用”的期望,已经在 AI 时代转变为 “它就是理解我的数据”。要实现这一点,AI 需要能够原生访问视频,而不是先将其转换为文本。

空间推理和视频理解这样的术语在视频和机器人领域都有应用。将视频理解加入到我们的工具集中,将是构建能够超越文本的 AI 系统的重要一步。

视频模型的超级能力

在使用专门的视频模型之前,我的常规方法是使用像 Whisper 这样的模型生成音频转录,再结合来自图像模型的密集向量嵌入,用于视频中提取的静态帧。这种方法在一些视频中效果不错,但当主题快速变化,或者关键信息实际上存在于拍摄对象的运动中时,就会失败。

简单来说,仅仅依赖图像模型会遗漏视频内容中大量的信息。

几个月前,我第一次通过 TwelveLabs 的 SaaS 平台接触到他们,它允许你上传视频进行一站式异步处理。他们有两个模型系列:

  • Marengo 是一个多模态嵌入模型,它不仅能从静态图像中捕捉含义,还能从动态视频片段中捕捉含义 —— 类似于文本嵌入模型能够从整段文字中捕捉含义,而不仅仅是单词。
  • Pegasus 是一个视频理解模型,可以用来生成字幕,或在片段上下文中回答类似 RAG 的问题。

虽然我喜欢这个 SaaS 服务的易用性和 API,但上传数据并不总是可行的。我的客户往往有数 TB 的敏感数据,不允许离开他们的控制范围。这就是 AWS Bedrock 发挥作用的地方。

TwelveLabs 已经将他们的核心模型提供在按需的 Bedrock 平台上,使源数据可以保存在我控制的 S3 存储桶中,并且只在安全计算模式下访问,而无需持久化在第三方系统中。这是个好消息,因为企业客户的视频用例通常包含商业机密、带有 PII 的记录,或其他受严格安全和隐私法规约束的信息。

我认为 Bedrock 集成能够解锁许多用例。

让我们搜索一些电影预告片

注意:Python 导入和通过 .env 文件处理环境变量的完整代码在 Python notebook 版本中。

依赖项

  • 你需要一个可以由你的 AWS ID 写入的 S3 存储桶

  • 你需要 Elasticsearch 的主机 URL 和 API key,可以是本地部署Elastic Cloud

  • 这段代码假设 Elasticsearch 版本为 8.17+ 或 9.0+

  • 一个很好的快速测试数据源是电影预告片。它们剪辑快速、视觉效果惊艳,并且通常包含高动作场景。你可以获取自己的 .mp4 文件,或者使用 https://github.com/yt-dlp/yt-dlp 从 YouTube 获取小规模的文件。

一旦文件在我们的本地文件系统中,我们需要将它们上传到我们的 S3 存储桶:

# Initialize AWS session
session = boto3.session.Session(
    aws_access_key_id=AWS_ACCESS_KEY_ID,
    aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
    region_name=AWS_REGION
)

######### 
## Validate S3 Configuration
#########

aws_account_id = session.client('sts').get_caller_identity()["Account"]
print(f"AWS Account ID: {aws_account_id}")
s3_client = session.client('s3')

# Verify bucket access
try:
    s3_client.head_bucket(Bucket=S3_BUCKET_NAME)
    print(f"✅ Successfully connected to S3 bucket: {S3_BUCKET_NAME}")
except Exception as e:
    print(f"❌ Error accessing S3 bucket: {e}")
    print("Please ensure the bucket exists and you have proper permissions.")


#########
## Upload videos to S3, and make note of where we put them in data object
#########

for video_object in video_objects:
    # Get the video file path
    video_path = video_object.get_video_path()
    
    # Skip if video path is not set
    if not video_path:
        print(f"Skipping {video_object.get_video_string()} - No video path set")
        continue
        
    # Define S3 destination key - organize by platform and video ID
    # put this information in our data object for later
    s3_key = video_object.get_s3_key()
    if not s3_key:
        s3_key = f"{S3_VIDEOS_PATH}/{video_object.get_platform()}/{video_object.get_video_id()}/{os.path.basename(video_path)}"
    video_object.set_s3_key(s3_key)

    try:
        # Check if file already exists in S3
        try:
            s3_client.head_object(Bucket=S3_BUCKET_NAME, Key=s3_key)
            print(f"Video {video_object.get_video_string()} already exists in S3. Skipping upload.")
            continue
        except botocore.exceptions.ClientError as e:
            if e.response['Error']['Code'] == '404':
                # File doesn't exist in S3, proceed with upload
                pass
            else:
                # Some other error occurred
                raise e
        
        # Upload the video to S3
        print(f"Uploading {video_object.get_video_string()} to S3...")
        s3_client.upload_file(video_path, S3_BUCKET_NAME, s3_key)
        print(f"Successfully uploaded {video_object.get_video_string()} to S3")
        
    
    except Exception as e:
        print(f"Error uploading {video_object.get_video_string()} to S3: {str(e)}")

现在我们可以使用异步 Bedrock 调用来创建视频嵌入:

#########
## Use Bedrock hosted Twelve Labs models to create video embeddings
#########


# Helper function to wait for async embedding results
def wait_for_embedding_output(s3_bucket: str, s3_prefix: str, invocation_arn: str, verbose: bool = False) -> list:
    """
    Wait for Bedrock async embedding task to complete and retrieve results

    Args:
        s3_bucket (str): The S3 bucket name
        s3_prefix (str): The S3 prefix for the embeddings
        invocation_arn (str): The ARN of the Bedrock async embedding task

    Returns:
        list: A list of embedding data
        
    Raises:
        Exception: If the embedding task fails or no output.json is found
    """
    
    # Wait until task completes
    status = None
    while status not in ["Completed", "Failed", "Expired"]:
        response = bedrock_client.get_async_invoke(invocationArn=invocation_arn)
        status = response['status']
        if verbose:
            clear_output(wait=True)
            tqdm.tqdm.write(f"Embedding task status: {status}")
        time.sleep(5)
    
    if status != "Completed":
        raise Exception(f"Embedding task failed with status: {status}")
    
    # Retrieve the output from S3
    response = s3_client.list_objects_v2(Bucket=s3_bucket, Prefix=s3_prefix)
    
    for obj in response.get('Contents', []):
        if obj['Key'].endswith('output.json'):
            output_key = obj['Key']
            obj = s3_client.get_object(Bucket=s3_bucket, Key=output_key)
            content = obj['Body'].read().decode('utf-8')
            data = json.loads(content).get("data", [])
            return data
    
    raise Exception("No output.json found in S3 prefix")


# Create video embedding
def create_video_embedding(video_s3_uri: str, video_id: str) -> list:
    """
    Create embeddings for video using Marengo on Bedrock
    
    Args:
        video_s3_uri (str): The S3 URI of the video to create an embedding for
        video_id (str): the identifying unique id of the video, to be used as a uuid
        
    Returns:
        list: A list of embedding data
    """
    
    unique_id = video_id
    s3_output_prefix = f'{S3_EMBEDDINGS_PATH}/{S3_VIDEOS_PATH}/{unique_id}'
    
    response = bedrock_client.start_async_invoke(
        modelId=MARENGO_MODEL_ID,
        modelInput={
            "inputType": "video",
            "mediaSource": {
                "s3Location": {
                    "uri": video_s3_uri,
                    "bucketOwner": aws_account_id
                }
            }
        },
        outputDataConfig={
            "s3OutputDataConfig": {
                "s3Uri": f's3://{S3_BUCKET_NAME}/{s3_output_prefix}'
            }
        }
    )
    
    invocation_arn = response["invocationArn"]
    print(f"Video embedding task started: {invocation_arn}")
    
    # Wait for completion and get results
    try:
        embedding_data = wait_for_embedding_output(S3_BUCKET_NAME, s3_output_prefix, invocation_arn)
    except Exception as e:
        print(f"Error waiting for embedding output: {e}")
        return None
    
    return embedding_data


def check_existing_embedding(video_id: str) -> bool:
    """Check S3 folder to see if this video already has an embedding created to avoid re-inference"""

    s3_output_prefix = f'{S3_EMBEDDINGS_PATH}/{S3_VIDEOS_PATH}/{video_id}'
    print(s3_output_prefix)

    try:
        # Check if any files exist at this prefix
        response = s3_client.list_objects_v2(Bucket=S3_BUCKET_NAME, Prefix=s3_output_prefix)
        
        if 'Contents' in response and any(obj['Key'].endswith('output.json') for obj in response.get('Contents', [])):
            print(f"Embedding {video_object.get_video_string()} already has an embedding. Skipping embedding creation.")
            # Find the output.json file
            for obj in response.get('Contents', []):
                if obj['Key'].endswith('output.json'):
                    output_key = obj['Key']
                    # Get the object from S3
                    obj = s3_client.get_object(Bucket=S3_BUCKET_NAME, Key=output_key)
                    # Read the content and parse as JSON
                    content = obj['Body'].read().decode('utf-8')
                    embedding_data = json.loads(content).get("data", [])
                    return embedding_data
        else:
            print(f"No existing embedding found for {video_object.get_video_string()}.")
            return None
    except botocore.exceptions.ClientError as e:
        if e.response['Error']['Code'] == '404':
            # File doesn't exist in S3, proceed with upload
            print("Did not find embedding in s3")
            return None
        else:
            # Some other error occurred
            raise e

def create_s3_uri(bucket_name: str, key: str)-> str:
    video_uri = f"s3://{bucket_name}/{key}"
    return video_uri



## Generate the embeddings one at a time, use S3 as cache to prevent double embedding generations
for video_object in tqdm.tqdm(video_objects, desc="Processing videos"):
    s3_key = video_object.get_s3_key()
    video_id = video_object.get_video_id()
    video_uri = create_s3_uri(S3_BUCKET_NAME, s3_key)  

    retrieved_embeddings = check_existing_embedding(video_id)
    if retrieved_embeddings:   
        video_object.set_embeddings_list(retrieved_embeddings)
    else:
        video_embedding_data = create_video_embedding(video_uri, video_id)
        video_object.set_embeddings_list(video_embedding_data)

现在我们已经在本地内存中的视频对象里得到了嵌入,这里做一个快速打印测试,看看返回了什么:

video_embedding_data = video_objects[0].get_embeddings_list()

##Preview Print
for i, embedding in enumerate(video_embedding_data[:3]):
    print(f"{i}")
    for key in embedding:
        if "embedding" == key:
            print(f"\t{key}: len {len(embedding[key])}")
        else:
            print(f"\t{key}: {embedding[key]}")

输出如下:

0
	embedding: len 1024
	embeddingOption: visual-text
	startSec: 0.0
	endSec: 6.199999809265137
1
	embedding: len 1024
	embeddingOption: visual-text
	startSec: 6.199999809265137
	endSec: 10.399999618530273
2
	embedding: len 1024
	embeddingOption: visual-text
	startSec: 10.399999618530273
	endSec: 17.299999237060547

插入到 Elasticsearch

我们将把对象上传到 Elasticsearch —— 在我的例子中,大约有 155 个视频片段的元数据和嵌入。在如此小的规模下,使用平铺的 float32 索引进行暴力最近邻搜索是最有效且最经济的方法。不过,下面的示例演示了如何为 Elasticsearch 支持的大规模用例中的每个流行量化级别创建不同的索引。参见 Elastic 关于更好的二进制量化(BBQ)功能的这篇文章

es = Elasticsearch(
    hosts=[ELASTICSEARCH_ENDPOINT],
    api_key=ELASTICSEARCH_API_KEY
)

es_detail = es.info().body
if "version" in es_detail:
    identifier = es_detail['version']['build_flavor'] if 'build_flavor' in es_detail['version'] else es_detail['version']['number']
    print(f"✅ Successfully connected to Elasticsearch: {es_detail['version']['build_flavor']}")


docs = []

for video_object in video_objects:

    persist_object = video_object.get_video_object()
    embeddings = video_object.get_embeddings_list()

    for embedding in embeddings:
        if embedding["embeddingOption"] == "visual-image":

            # Create a copy of the persist object and add embedding details
            doc = copy.deepcopy(persist_object)
            doc["embedding"] =  embedding["embedding"]
            doc["start_sec"] =  embedding["startSec"]
            doc["end_sec"] =    embedding["endSec"]

            docs.append(doc)

index_varieties = [
    "flat", ## brute force float32
    "hnsw", ## float32 hnsw graph data structure
    "int8_hnsw", ## int8 hnsw graph data structure, default for lower dimension models
    "bbq_hnsw", ## Better Binary Qunatization HNSW, default for higher dimension models
    "bbq_flat" ## brute force + Better Binary Quantization 
]

for index_variety in index_varieties:
    # Create an index for the movie trailer embeddings
    # Define mapping with proper settings for dense vector search
    index_name = f"twelvelabs-movie-trailer-{index_variety}"
    mappings = {
            "properties": {
                "url": {"type": "keyword"},
                "platform": {"type": "keyword"},
                "video_id": {"type": "keyword"},
                "title": {"type": "text", "analyzer": "standard"},
                "embedding": {
                    "type": "dense_vector", 
                    "dims": 1024,
                    "similarity": "cosine",
                    "index_options": {
                        "type": index_variety
                    }
                },
                "start_sec": {"type": "float"},
                "end_sec": {"type": "float"}
            }
        }
    
    

    # Check if index already exists
    if es.indices.exists(index=index_name):
        print(f"Deleting Index '{index_name}' and then sleeping for 2 seconds")
        es.indices.delete(index=index_name)
        sleep(2)
    # Create the index
    es.indices.create(index=index_name, mappings=mappings)
    print(f"Index '{index_name}' created successfully")

for index_variety in index_varieties:
    # Create an index for the movie trailer embeddings
    # Define mapping with proper settings for dense vector search
    index_name = f"twelvelabs-movie-trailer-{index_variety}"

    # Bulk insert docs into Elasticsearch index
    print(f"Indexing {len(docs)} documents into {index_name}...")
    
    
    # Create actions for bulk API
    actions = []
    for doc in docs:
        actions.append({
            "_index": index_name,
            "_source": doc
        })
    
    # Perform bulk indexing with error handling
    try:
        success, failed = bulk(es, actions, chunk_size=100, max_retries=3, 
                               initial_backoff=2, max_backoff=60)
        print(f"\tSuccessfully indexed {success} documents into {index_name}")
        if failed:
            print(f"\tFailed to index {len(failed)} documents")
    except Exception as e:
        print(f"Error during bulk indexing: {e}")
    
    print(f"Completed indexing documents into {index_name}")

运行搜索

TwelveLabs 的 Bedrock 实现允许异步调用将文本生成向量嵌入到 S3。然而,下面我们将使用延迟更低的同步 invoke_model,直接为我们的搜索查询获取文本嵌入。(文本 Marengo 文档示例在这里。)

# Create text embedding
def create_text_embedding(text_query: str) -> list:
    text_model_id = TEXT_EMBEDDING_MODEL_ID 
    text_model_input = {
        "inputType": "text",
        "inputText": text_query
    }
    response = bedrock_client.invoke_model(
        modelId=text_model_id,
        body=json.dumps(text_model_input)
    )
    response_body = json.loads(response['body'].read().decode('utf-8'))
    embedding_data = response_body.get("data", [])
    if embedding_data:
        return embedding_data[0]["embedding"]
    else:
        return None
    
    


def vector_query(index_name: str, text_query: str) -> dict:
    
    query_embedding = create_text_embedding(text_query)
    query = {
        "retriever": {
            "knn": {
                "field": "embedding",
                "query_vector": query_embedding,
                "k": 10,
                "num_candidates": "25"
            }
        },
        "size": 10,
        "_source": False,
        "fields": ["title", "video_id", "start_sec"]
    }
    return es.search(index=index_name, body=query).body


text_query = "Show me scenes with dinosaurs"
print (vector_query("twelvelabs-movie-trailer-flat", text_query))

返回的 JSON 就是我们的搜索结果!但为了创建一个更易用的测试界面,我们可以使用一些快速的 iPython 小部件:

from ipywidgets import widgets, HTML as WHTML, HBox, Layout
from IPython.display import display

def display_search_results_html(query):
    results = vector_query("twelvelabs-movie-trailer-flat", query)
    hits = results.get('hits', {}).get('hits', [])

    if not hits:
        return "<p>No results found</p>"

    items = []
    for hit in hits:
        fields = hit.get('fields', {})
        title = fields.get('title', ['No Title'])[0]
        score = hit.get('_score', 0)
        video_id = fields.get('video_id', [''])[0]
        start_sec = fields.get('start_sec', [0])[0]
        url = 
f"https://www.youtube.com/watch?v={video_id}&t={int(start_sec)}s"
                  items.append(f'<li><a href="{url}" target="_blank">{title} (Start: {float(start_sec):.1f}s)</a> <span>Score: {score}</span></li>')

    return "<h3>Search Results:</h3><ul>" + "\n".join(items) + "</ul>"

def search_videos():
    search_input = widgets.Text(
        value='',
        placeholder='Enter your search query…',
        description='Search:',
        layout=Layout(width='70%')
    )

    search_button = widgets.Button(
        description='Search Videos',
        button_style='primary',
        layout=Layout(width='20%')
    )

    # Use a single HTML widget for output; update its .value to avoid double-rendering
    results_box = WHTML(value="")

    def on_button_click(_):
        q = search_input.value.strip()
        if not q:
            results_box.value = "<p>Please enter a search query</p>"
            return
        results_box.value = "<p>Searching…</p>"
        results_box.value = display_search_results_html(q)

    # Avoid multiple handler attachments if the cell is re-run
    try:
        search_button._click_handlers.callbacks.clear()
    except Exception:
        pass
    search_button.on_click(on_button_click)

    display(HBox([search_input, search_button]))
    display(results_box)

# Call this to create the UI
search_videos()

让我们在预告片中搜索一些视觉内容。

比较量化方法

较新的 Elasticsearch 版本默认对 1024 维密集向量使用 bbq_hnsw,它在通过对原始 float32 在过采样候选窗口中重评分来保持准确性的同时,提供最佳的速度和可扩展性。

为了通过简单的 UI 比较量化对搜索结果的影响,可以查看一个名为 Relevance Studio 的新项目。

如果我们在 Kibana 中检查索引管理,或使用 curl 执行 GET /_cat/indices ,我们会看到每个选项的存储大小大致相同。乍一看,这可能会让人困惑,但请记住存储大小大致相等,因为索引包含用于重评分的向量 float32 表示。在 bbq_hnsw 中,图中只使用量化的二进制向量表示,从而在索引和搜索时节省成本并提高性能。

最后的想法

对于单个 1024 维密集向量来说,这些结果令人印象深刻。我很期待尝试将 Marengo 模型的强大功能与混合搜索方法结合,包括音频转录,以及 Elasticsearch 的地理空间过滤和 RBAC/ABAC 访问控制。你希望 AI 对哪些视频了解一切?

原文:https://www.elastic.co/search-labs/blog/twelvelabs-marengo-video-embedding-amazon-bedrock

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐