智能体开发案例:直播脚本与短视频智能生成助手
方案提供了一套可落地的系统方案:从架构、数据库、后端代码、多智能体编排、向量检索、任务队列到前端交互的完整链路。
一、目标与整体说明
本方案从技术实现视角,完整设计并实现一个「直播脚本与短视频智能生成助手」系统,包含:
- 系统架构设计:整体模块划分、服务拆分、调用关系。
- 核心模块代码实现:后端(FastAPI)、LLM 调用、脚本生成逻辑、多智能体编排、任务队列、向量检索等。
- 数据库设计:核心表结构(用户、项目、脚本版本、任务队列、素材库等),含 SQL 示例。
- 部署与优化策略:容器化、环境配置、伸缩、高并发与成本优化。
- 完整系统工作流:从用户输入到脚本生成、短视频脚本拆解、迭代优化的端到端流程。
下面所有代码以**Python + FastAPI + PostgreSQL + Redis + 前端(任意 Web 框架)**为参考实现,你可以直接在此基础上落地一个可运行的系统。
二、系统架构设计
本部分给出系统整体架构,说明各模块职责和调用链路,为后续代码与数据库设计打基础。
2.1 逻辑架构图(文字版)
整体采用前后端分离 + LLM 编排服务 + 任务队列 + 向量检索 + 对象存储的典型结构:
┌──────────────────────── Web / App 前端 ────────────────────────┐
│ - 直播脚本生成页面(话题/人设/风格配置) │
│ - 短视频脚本生成页面(热点/素材选择) │
│ - 脚本编辑器(版本对比、修改、再次生成) │
└───────────────────────────────────────────────────────────────┘
│ HTTPS / REST / WebSocket
▼
┌──────────────────────────────────┐
│ API Gateway / BFF │
│ (FastAPI: auth, routing, RBAC) │
└──────────────────────────────────┘
│
┌──────────────────────────────────┐
│ 业务后端服务层 │
│ - UserService / ProjectService │
│ - ScriptService(核心) │
│ - VideoPlanService │
│ - TaskService / JobService │
└──────────────────────────────────┘
┌──────────────────────────────┼──────────────────────────────┐
▼ ▼ ▼
┌────────────────┐ ┌──────────────────┐ ┌──────────────────┐
│ LLM Orchestr. │ │ Vector Store │ │ Task Queue │
│ (LLM 调度层) │ │ (PGVector/Milvus)│ │ (Redis/Celery) │
│ - Prompt模板 │ │ - 素材向量检索 │ │ - 异步生成任务 │
│ - 多智能体编排 │ │ - 历史脚本检索 │ │ - 任务状态管理 │
└────────────────┘ └──────────────────┘ └──────────────────┘
│ │ │
▼ ▼ ▼
┌─────────────┐ ┌──────────────┐ ┌──────────────┐
│ LLM Provider│ │ PostgreSQL │ │ Object Store │
│(OpenAI/本地)│ │ 业务数据库 │ │(S3/OSS/本地) │
└─────────────┘ └──────────────┘ └──────────────┘
2.2 功能模块划分
- 用户与项目管理模块
- 管理用户、团队、项目(例如:某场直播、某一短视频系列)。
- 直播脚本生成模块
- 根据「人设 / 话题 / 时长 / 风格 / 观众画像」生成整体大纲 + 详细分段脚本 + 互动话术。
- 短视频脚本生成模块
- 可直接从文本话题、直播片段概要,生成短视频脚本(标题、封面文案、完整旁白脚本、镜头说明)。
- 素材与知识库模块
- 管理品牌文案、过往优秀脚本、产品介绍,用向量库做RAG增强,让生成脚本更加贴合品牌语气。
- 多智能体编排模块(核心特色)
- 规划 Agent(Planner):拆分子任务(大纲、脚本、互动、金句等)。
- 脚本 Writer Agent:负责语料生成。
- 风格 Reviewer Agent:检查语言风格是否统一,帮忙重写。
- 任务队列与异步执行模块
- 大型脚本、多版本生成等放入任务队列,由后台 Worker 异步执行。
- 前端交互模块
- 多步配置、脚本展示与编辑、版本管理、导出为文档/字幕等。
三、技术选型与项目结构
3.1 技术选型
- 后端框架:FastAPI(性能好 + OpenAPI 文档 + 异步友好)。
- 数据库:PostgreSQL(支持 JSONB、可扩展 PGVector)。
- 向量库:
- 小规模直接使用 PGVector 插件;
- 大规模可引入 Milvus / Qdrant。
- 缓存与队列:Redis + Celery(异步执行脚本生成任务)。
- LLM 调用:
- OpenAI API / Azure OpenAI / 本地 LLM(如 Llama / Qwen)封装成统一接口。
- 前端:任意主流框架(React / Vue),下面用 React 示例。
- 部署:Docker 容器化,Kubernetes / Docker Compose。
3.2 后端项目结构建议
ai-script-assistant/
├─ app/
│ ├─ main.py # 入口,FastAPI app
│ ├─ config.py # 配置管理
│ ├─ deps.py # 依赖注入(DB, services)
│ ├─ api/
│ │ ├─ routes_users.py
│ │ ├─ routes_projects.py
│ │ ├─ routes_scripts.py # 直播/短视频脚本 API
│ │ ├─ routes_tasks.py
│ ├─ models/ # SQLAlchemy ORM 模型
│ ├─ schemas/ # Pydantic 模型
│ ├─ services/ # 业务服务
│ │ ├─ script_service.py
│ │ ├─ video_plan_service.py
│ │ ├─ task_service.py
│ │ └─ agent_orchestrator.py
│ ├─ llm/
│ │ ├─ client.py # 统一 LLM 客户端封装
│ │ ├─ prompts/ # Prompt 模板
│ │ │ ├─ live_outline.txt
│ │ │ ├─ live_script.txt
│ │ │ └─ short_video.txt
│ ├─ workers/
│ │ └─ celery_worker.py # Worker 进程
│ └─ utils/
│ ├─ logging.py
│ └─ text_postprocess.py
├─ migrations/ # Alembic 迁移
├─ Dockerfile
├─ docker-compose.yml
├─ requirements.txt
└─ README.md
四、数据库设计
本部分给出核心数据表设计(PostgreSQL + 可选 PGVector),以支持用户管理、项目管理、脚本生成、版本管理与任务队列。
4.1 核心实体关系(简图)
User (用户)
└─< Project (项目:一次直播或系列短视频)
├─< Script (脚本:直播脚本/短视频脚本)
│ └─< ScriptVersion (脚本版本)
├─< Asset (素材:文案、产品介绍等)
└─< Task (生成任务队列)
4.2 SQL:用户与项目
-- 用户表
CREATE TABLE "user" (
id BIGSERIAL PRIMARY KEY,
email VARCHAR(255) UNIQUE NOT NULL,
password_hash VARCHAR(255) NOT NULL,
name VARCHAR(100),
created_at TIMESTAMP NOT NULL DEFAULT now(),
updated_at TIMESTAMP NOT NULL DEFAULT now()
);
-- 项目表:一次直播或一组短视频
CREATE TABLE project (
id BIGSERIAL PRIMARY KEY,
user_id BIGINT NOT NULL REFERENCES "user"(id),
name VARCHAR(255) NOT NULL,
type VARCHAR(50) NOT NULL, -- 'live' / 'short_video'
description TEXT,
meta JSONB, -- 直播时间、平台、受众等信息
created_at TIMESTAMP NOT NULL DEFAULT now(),
updated_at TIMESTAMP NOT NULL DEFAULT now()
);
4.3 脚本与脚本版本
-- 脚本主表:一个项目下可有多个脚本(预热场、正式场等)
CREATE TABLE script (
id BIGSERIAL PRIMARY KEY,
project_id BIGINT NOT NULL REFERENCES project(id),
title VARCHAR(255) NOT NULL,
script_type VARCHAR(50) NOT NULL, -- 'live' / 'short_video'
status VARCHAR(50) NOT NULL, -- 'draft' / 'generated' / 'published'
current_version_id BIGINT, -- 指向当前激活版本
created_at TIMESTAMP NOT NULL DEFAULT now(),
updated_at TIMESTAMP NOT NULL DEFAULT now()
);
-- 脚本版本表:保存每次 LLM 生成和人工修改后的版本
CREATE TABLE script_version (
id BIGSERIAL PRIMARY KEY,
script_id BIGINT NOT NULL REFERENCES script(id),
version_number INT NOT NULL,
source VARCHAR(50) NOT NULL, -- 'llm' / 'manual'
content_md TEXT NOT NULL, -- markdown 格式的脚本
outline_json JSONB, -- 结构化大纲:章节/时间点/互动点
config JSONB, -- 本次生成时使用的配置(风格、人设等)
created_at TIMESTAMP NOT NULL DEFAULT now()
);
CREATE UNIQUE INDEX idx_script_version_unique
ON script_version(script_id, version_number);
4.4 素材与向量库(RAG)
-- 素材表:品牌调性、产品文案、历史爆款脚本等
CREATE TABLE asset (
id BIGSERIAL PRIMARY KEY,
project_id BIGINT REFERENCES project(id),
user_id BIGINT NOT NULL REFERENCES "user"(id),
title VARCHAR(255) NOT NULL,
content TEXT NOT NULL,
tags TEXT[],
created_at TIMESTAMP NOT NULL DEFAULT now()
);
-- PGVector:存储素材向量(文本嵌入)
-- 需要事先安装 pgvector 插件,并创建 vector 类型
CREATE TABLE asset_embedding (
asset_id BIGINT PRIMARY KEY REFERENCES asset(id),
embedding vector(1536) NOT NULL -- 以 OpenAI text-embedding-3-small 为例
);
CREATE INDEX idx_asset_embedding_vector
ON asset_embedding
USING ivfflat (embedding vector_cosine_ops);
4.5 生成任务队列表
CREATE TABLE task (
id BIGSERIAL PRIMARY KEY,
project_id BIGINT REFERENCES project(id),
script_id BIGINT REFERENCES script(id),
type VARCHAR(50) NOT NULL, -- 'generate_live', 'generate_short_video'
status VARCHAR(50) NOT NULL, -- 'pending', 'running', 'success', 'failed'
priority INT NOT NULL DEFAULT 0,
params JSONB, -- 生成参数
result JSONB, -- 输出结果引用(脚本版本ID、错误信息等)
error_message TEXT,
created_at TIMESTAMP NOT NULL DEFAULT now(),
updated_at TIMESTAMP NOT NULL DEFAULT now()
);
五、核心模块代码实现(后端)
下面给出关键模块的核心实现代码(删繁就简,但保持可落地),包括:配置、LLM 封装、Prompt 模板、脚本服务、多智能体编排、任务队列。
5.1 配置与应用入口
# app/config.py
from pydantic import BaseSettings
class Settings(BaseSettings):
app_name: str = "AI Live & Short Video Script Assistant"
api_prefix: str = "/api"
database_url: str
redis_url: str
openai_api_key: str
openai_base_url: str | None = None
model_name: str = "gpt-4.1"
embedding_model: str = "text-embedding-3-small"
class Config:
env_file = ".env"
settings = Settings()
# app/main.py
from fastapi import FastAPI
from .config import settings
from .api import routes_scripts, routes_tasks
def create_app() -> FastAPI:
app = FastAPI(title=settings.app_name)
app.include_router(routes_scripts.router, prefix=settings.api_prefix)
app.include_router(routes_tasks.router, prefix=settings.api_prefix)
return app
app = create_app()
5.2 统一 LLM 客户端封装
# app/llm/client.py
import httpx
from typing import List, Dict, Any
from ..config import settings
class LLMClient:
def __init__(self):
self._client = httpx.AsyncClient(
base_url=settings.openai_base_url or "https://api.openai.com/v1",
headers={"Authorization": f"Bearer {settings.openai_api_key}"}
)
self.model = settings.model_name
self.embedding_model = settings.embedding_model
async def chat(self, messages: List[Dict[str, str]], **kwargs) -> str:
payload = {
"model": self.model,
"messages": messages,
**kwargs
}
resp = await self._client.post("/chat/completions", json=payload)
resp.raise_for_status()
data = resp.json()
return data["choices"][0]["message"]["content"]
async def embed(self, input_texts: List[str]) -> List[List[float]]:
payload = {
"model": self.embedding_model,
"input": input_texts
}
resp = await self._client.post("/embeddings", json=payload)
resp.raise_for_status()
data = resp.json()
return [item["embedding"] for item in data["data"]]
llm_client = LLMClient()
5.3 Prompt 模板示例
# app/llm/prompts/live_outline.txt
你是一名资深直播策划与脚本编剧,擅长为主播设计高转化率的直播大纲和话术。
【直播信息】
- 人设:{{ persona }}
- 主题:{{ topic }}
- 目标:{{ goal }}
- 时长:{{ duration_minutes }} 分钟
- 平台:{{ platform }}
- 目标受众:{{ audience }}
【素材与品牌信息】
{{ retrieved_assets }}
请输出:
1. 直播整体结构(如:开场、暖场互动、产品介绍节奏、福利节奏、结束收尾)
2. 每个阶段的时间范围、阶段目标
3. 每个阶段的关键话术要点(要点式,不需要逐字稿)
请使用 JSON 格式输出,字段包括:
- "sections": [
{
"id": "string",
"name": "string",
"start_min": number,
"end_min": number,
"objective": "string",
"bullet_points": ["string", ...]
},
...
]
# app/llm/prompts/live_script.txt
你是一名专业直播脚本编剧。
【上一步的直播大纲 JSON】
{{ outline_json }}
【要求】
1. 根据每个阶段的大纲要点,输出详细的直播逐字稿。
2. 语言风格:{{ style }} (如:幽默、干货、情绪煽动等)
3. 保持与以下品牌和产品信息一致:
{{ retrieved_assets }}
【输出格式】
请使用 Markdown 格式,每个阶段一个二级标题(##),并在内部按对话话术分段。
示例:
## 开场
- 主播:xxxx
- 主播:xxxx
- 主播:xxxx
...
# app/llm/prompts/short_video.txt
你是一名短视频编剧与文案专家,擅长为抖音/快手/小红书制作高完播率的脚本。
【视频信息】
- 平台:{{ platform }}
- 主题:{{ topic }}
- 时长:{{ duration_seconds }} 秒
- 目标:{{ goal }}
- 受众:{{ audience }}
【可参考的直播内容节选/素材】
{{ reference_text }}
【要求】
1. 输出一个适合短视频的标题(具有吸引力和钩子)
2. 输出封面文案(适合放在封面图上的少量文字)
3. 输出详细脚本,包含每 3-5 秒一个分镜描述和对应旁白文案
4. 风格:{{ style }}
【输出 JSON 结构】
{
"title": "...",
"cover_text": "...",
"shots": [
{
"start_sec": 0,
"end_sec": 4,
"scene": "画面描述",
"voiceover": "旁白文案"
},
...
]
}
5.4 向量检索服务(RAG)
从素材中检索与本次直播/短视频相关的内容,作为 prompt 的上下文。
# app/services/asset_service.py
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import text
from typing import List
from ..llm.client import llm_client
class AssetService:
def __init__(self, db: AsyncSession):
self.db = db
async def search_assets(self, project_id: int | None, user_id: int, query: str, top_k: int = 5) -> List[dict]:
# 1. 对 query 做 embedding
embeddings = await llm_client.embed([query])
emb = embeddings[0]
emb_str = ",".join(str(x) for x in emb)
# 2. 使用 PGVector 相似度检索
sql = text("""
SELECT a.id, a.title, a.content, a.tags
FROM asset a
JOIN asset_embedding e ON a.id = e.asset_id
WHERE (a.project_id = :project_id OR :project_id IS NULL)
AND a.user_id = :user_id
ORDER BY e.embedding <-> (:emb::vector)
LIMIT :top_k
""")
rows = (await self.db.execute(sql, {
"project_id": project_id,
"user_id": user_id,
"emb": emb_str,
"top_k": top_k
})).mappings().all()
return [dict(r) for r in rows]
@staticmethod
def format_assets_for_prompt(assets: List[dict]) -> str:
lines = []
for a in assets:
tags = ", ".join(a.get("tags") or [])
lines.append(f"[{a['title']}] (tags: {tags})\n{a['content']}\n")
return "\n\n".join(lines)
5.5 多智能体编排核心(Planner + Writer + Reviewer)
通过一个 Orchestrator 组合多个「子 Agent」,这里用简单函数模拟多轮调用。
# app/services/agent_orchestrator.py
import json
from typing import Dict, Any
from ..llm.client import llm_client
from ..services.asset_service import AssetService
from ..config import settings
from pathlib import Path
PROMPTS_DIR = Path(__file__).resolve().parent.parent / "llm" / "prompts"
def load_prompt(name: str) -> str:
return (PROMPTS_DIR / name).read_text(encoding="utf-8")
class AgentOrchestrator:
def __init__(self, asset_service: AssetService):
self.asset_service = asset_service
async def generate_live_script(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""
多智能体流程:
1. Planner Agent:生成直播大纲(JSON)
2. Writer Agent:生成逐字稿 Markdown
3. Reviewer Agent:风格统一和优化
"""
persona = params["persona"]
topic = params["topic"]
goal = params.get("goal", "提升转化")
duration = params.get("duration_minutes", 60)
platform = params.get("platform", "抖音")
audience = params.get("audience", "泛用户")
style = params.get("style", "轻松幽默")
# 1. 根据 topic + persona 从素材库中检索品牌/产品信息
assets = await self.asset_service.search_assets(
project_id=params.get("project_id"),
user_id=params["user_id"],
query=f"{topic} {persona} {goal}"
)
assets_text = self.asset_service.format_assets_for_prompt(assets)
# 2. Planner Agent:生成直播大纲 JSON
outline_prompt_template = load_prompt("live_outline.txt")
outline_prompt = outline_prompt_template.format(
persona=persona,
topic=topic,
goal=goal,
duration_minutes=duration,
platform=platform,
audience=audience,
retrieved_assets=assets_text
)
outline_json_str = await llm_client.chat([
{"role": "system", "content": "你是直播策划专家,负责设计结构化直播大纲。"},
{"role": "user", "content": outline_prompt}
], temperature=0.4)
try:
outline = json.loads(outline_json_str)
except json.JSONDecodeError:
# 简单修正:尝试从 Markdown code block 中提取 JSON
outline = self._extract_json(outline_json_str)
# 3. Writer Agent:根据大纲生成逐字稿
script_prompt_template = load_prompt("live_script.txt")
script_prompt = script_prompt_template.format(
outline_json=json.dumps(outline, ensure_ascii=False, indent=2),
style=style,
retrieved_assets=assets_text
)
script_md = await llm_client.chat([
{"role": "system", "content": "你是直播脚本编剧,负责写详细逐字稿。"},
{"role": "user", "content": script_prompt}
], temperature=0.7)
# 4. Reviewer Agent:统一风格(可选)
reviewer_prompt = (
"你是一名文字编辑,请在不改变核心意思的前提下,统一下述直播脚本的口吻为:"
f"{style},适当增加互动话术和金句,输出 Markdown。\n\n{script_md}"
)
reviewed_md = await llm_client.chat([
{"role": "system", "content": "你是资深文案编辑。"},
{"role": "user", "content": reviewer_prompt}
], temperature=0.5)
return {
"outline": outline,
"script_md": reviewed_md,
"used_assets": assets
}
async def generate_short_video_script(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""
短视频脚本生成流程:
1. 参考直播脚本节选(或话题简介)
2. 生成 JSON 结构的分镜脚本
"""
topic = params["topic"]
platform = params.get("platform", "抖音")
duration_seconds = params.get("duration_seconds", 30)
goal = params.get("goal", "提升曝光")
audience = params.get("audience", "泛用户")
style = params.get("style", "节奏明快")
reference_text = params.get("reference_text", "")
short_prompt_template = load_prompt("short_video.txt")
short_prompt = short_prompt_template.format(
platform=platform,
topic=topic,
duration_seconds=duration_seconds,
goal=goal,
audience=audience,
style=style,
reference_text=reference_text
)
resp = await llm_client.chat([
{"role": "system", "content": "你是短视频脚本专家。"},
{"role": "user", "content": short_prompt}
], temperature=0.7)
try:
data = json.loads(resp)
except json.JSONDecodeError:
data = self._extract_json(resp)
return data
@staticmethod
def _extract_json(text: str) -> Dict[str, Any]:
import re
m = re.search(r"\{.*\}", text, re.S)
if not m:
return {}
s = m.group(0)
return json.loads(s)
5.6 脚本服务与 FastAPI 路由
5.6.1 Pydantic 模型
# app/schemas/script.py
from pydantic import BaseModel, Field
from typing import Optional, Any
class LiveScriptGenerateRequest(BaseModel):
project_id: Optional[int]
persona: str
topic: str
goal: str = "提升转化"
duration_minutes: int = 60
platform: str = "抖音"
audience: str = "泛用户"
style: str = "轻松幽默"
class LiveScriptGenerateResponse(BaseModel):
script_id: int
version_id: int
outline: Any
content_md: str
class ShortVideoGenerateRequest(BaseModel):
project_id: Optional[int]
topic: str
duration_seconds: int = 30
platform: str = "抖音"
audience: str = "泛用户"
goal: str = "曝光"
style: str = "节奏明快"
reference_script_id: Optional[int]
class ShortVideoGenerateResponse(BaseModel):
script_id: int
version_id: int
result_json: Any
5.6.2 Script Service(与数据库交互)
# app/services/script_service.py
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func
from typing import Dict, Any
from ..models import Script, ScriptVersion
class ScriptService:
def __init__(self, db: AsyncSession):
self.db = db
async def create_script(
self, project_id: int, title: str, script_type: str, status: str = "generated"
) -> Script:
script = Script(
project_id=project_id,
title=title,
script_type=script_type,
status=status
)
self.db.add(script)
await self.db.flush()
return script
async def add_script_version(
self, script_id: int, content_md: str, outline_json: Dict | None, config: Dict[str, Any], source: str = "llm"
) -> ScriptVersion:
# 获取当前最大版本号
stmt = select(func.max(ScriptVersion.version_number)).where(
ScriptVersion.script_id == script_id
)
max_version = (await self.db.execute(stmt)).scalar() or 0
version = ScriptVersion(
script_id=script_id,
version_number=max_version + 1,
source=source,
content_md=content_md,
outline_json=outline_json,
config=config
)
self.db.add(version)
await self.db.flush()
# 更新 script.current_version_id
script = await self.db.get(Script, script_id)
script.current_version_id = version.id
await self.db.flush()
return version
5.6.3 API 路由:同步生成(演示)
实际生产中应走任务队列,下面先给同步版本,方便开发调试。
# app/api/routes_scripts.py
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from ..deps import get_db, get_current_user
from ..services.asset_service import AssetService
from ..services.agent_orchestrator import AgentOrchestrator
from ..services.script_service import ScriptService
from ..schemas.script import (
LiveScriptGenerateRequest, LiveScriptGenerateResponse,
ShortVideoGenerateRequest, ShortVideoGenerateResponse
)
router = APIRouter(tags=["scripts"])
@router.post("/scripts/live/generate", response_model=LiveScriptGenerateResponse)
async def generate_live_script(
body: LiveScriptGenerateRequest,
db: AsyncSession = Depends(get_db),
user=Depends(get_current_user)
):
asset_service = AssetService(db)
orchestrator = AgentOrchestrator(asset_service)
script_service = ScriptService(db)
params = body.dict()
params["user_id"] = user.id
result = await orchestrator.generate_live_script(params)
if not body.project_id:
# 若未指定项目,可自动创建默认项目(此处略),假定有 project_id
raise HTTPException(status_code=400, detail="project_id is required in this simplified example")
script = await script_service.create_script(
project_id=body.project_id,
title=f"{body.topic} - 直播脚本",
script_type="live"
)
version = await script_service.add_script_version(
script_id=script.id,
content_md=result["script_md"],
outline_json=result["outline"],
config=body.dict()
)
await db.commit()
return LiveScriptGenerateResponse(
script_id=script.id,
version_id=version.id,
outline=result["outline"],
content_md=result["script_md"]
)
@router.post("/scripts/short-video/generate", response_model=ShortVideoGenerateResponse)
async def generate_short_video_script(
body: ShortVideoGenerateRequest,
db: AsyncSession = Depends(get_db),
user=Depends(get_current_user)
):
asset_service = AssetService(db)
orchestrator = AgentOrchestrator(asset_service)
script_service = ScriptService(db)
reference_text = ""
if body.reference_script_id:
# 从 ScriptVersion 中随便取一个版本:真实项目里按 current_version_id 或最新版本读取
from ..models import ScriptVersion
version = await db.get(ScriptVersion, body.reference_script_id)
if version:
reference_text = version.content_md
params = body.dict()
params["reference_text"] = reference_text
data = await orchestrator.generate_short_video_script(params)
if not body.project_id:
raise HTTPException(status_code=400, detail="project_id is required in this simplified example")
script = await script_service.create_script(
project_id=body.project_id,
title=f"{body.topic} - 短视频脚本",
script_type="short_video"
)
version = await script_service.add_script_version(
script_id=script.id,
content_md=json.dumps(data, ensure_ascii=False, indent=2),
outline_json=None,
config=body.dict()
)
await db.commit()
return ShortVideoGenerateResponse(
script_id=script.id,
version_id=version.id,
result_json=data
)
六、任务队列与异步执行
直播脚本和短视频脚本生成可能较耗时,生产中应通过任务队列 + Worker异步执行,并在前端展示进度。
6.1 Celery 配置
# app/workers/celery_worker.py
import os
from celery import Celery
broker_url = os.getenv("REDIS_URL", "redis://localhost:6379/0")
backend_url = broker_url
celery_app = Celery(
"ai_script_worker",
broker=broker_url,
backend=backend_url,
)
celery_app.conf.update(
task_routes={
"tasks.generate_live_script": {"queue": "live_script"},
"tasks.generate_short_video": {"queue": "short_video"},
},
task_default_queue="default",
)
6.2 任务定义
# app/workers/tasks.py
import asyncio
from .celery_worker import celery_app
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker
from ..config import settings
from ..services.asset_service import AssetService
from ..services.agent_orchestrator import AgentOrchestrator
from ..services.script_service import ScriptService
from ..models import Task as TaskModel
engine = create_async_engine(settings.database_url, future=True)
AsyncSessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)
@celery_app.task(name="tasks.generate_live_script")
def generate_live_script_task(task_id: int):
asyncio.run(_generate_live_script_async(task_id))
async def _generate_live_script_async(task_id: int):
async with AsyncSessionLocal() as db:
task = await db.get(TaskModel, task_id)
if not task:
return
task.status = "running"
await db.flush()
try:
asset_service = AssetService(db)
orchestrator = AgentOrchestrator(asset_service)
script_service = ScriptService(db)
params = dict(task.params)
params.setdefault("user_id", task.params.get("user_id", 0))
result = await orchestrator.generate_live_script(params)
if "project_id" not in params:
raise ValueError("project_id is required")
script = await script_service.create_script(
project_id=params["project_id"],
title=f"{params['topic']} - 直播脚本",
script_type="live"
)
version = await script_service.add_script_version(
script_id=script.id,
content_md=result["script_md"],
outline_json=result["outline"],
config=params
)
task.status = "success"
task.result = {"script_id": script.id, "version_id": version.id}
except Exception as e:
task.status = "failed"
task.error_message = str(e)
await db.commit()
任务创建 API 只需把生成参数存入 task.params,然后调用 generate_live_script_task.delay(task_id) 即可,前端轮询 /tasks/{id} 获取执行进度。
七、前端交互与示例代码
本部分简要展示如何在前端调用直播脚本生成接口,并展示结果。
7.1 React 简易示例:直播脚本生成表单
// src/pages/LiveScriptGenerator.tsx
import React, { useState } from "react";
import axios from "axios";
interface LiveScriptResponse {
script_id: number;
version_id: number;
outline: any;
content_md: string;
}
export const LiveScriptGenerator: React.FC = () => {
const [topic, setTopic] = useState("");
const [persona, setPersona] = useState("专业带货主播");
const [duration, setDuration] = useState(60);
const [style, setStyle] = useState("轻松幽默");
const [loading, setLoading] = useState(false);
const [result, setResult] = useState<LiveScriptResponse | null>(null);
const handleSubmit = async (e: React.FormEvent) => {
e.preventDefault();
setLoading(true);
try {
const resp = await axios.post<LiveScriptResponse>("/api/scripts/live/generate", {
project_id: 1,
persona,
topic,
duration_minutes: duration,
style,
goal: "提升转化",
platform: "抖音",
audience: "年轻女性"
});
setResult(resp.data);
} catch (err) {
console.error(err);
alert("生成失败");
} finally {
setLoading(false);
}
};
return (
<div style={{ maxWidth: 960, margin: "0 auto", padding: 24 }}>
<h2>直播脚本智能生成助手</h2>
<form onSubmit={handleSubmit}>
<div>
<label>直播主题</label>
<input value={topic} onChange={e => setTopic(e.target.value)} required />
</div>
<div>
<label>主播人设</label>
<input value={persona} onChange={e => setPersona(e.target.value)} />
</div>
<div>
<label>时长(分钟)</label>
<input
type="number"
value={duration}
onChange={e => setDuration(Number(e.target.value))}
/>
</div>
<div>
<label>风格</label>
<input value={style} onChange={e => setStyle(e.target.value)} />
</div>
<button type="submit" disabled={loading}>
{loading ? "生成中..." : "生成脚本"}
</button>
</form>
{result && (
<div style={{ marginTop: 32 }}>
<h3>生成结果(Markdown)</h3>
<pre style={{ whiteSpace: "pre-wrap" }}>{result.content_md}</pre>
</div>
)}
</div>
);
};
在实际项目里,可用 Markdown 渲染组件(如 react-markdown)来美观展示脚本。
八、完整系统工作流
本部分从「用户操作 → 系统内部调用 → LLM → 数据落库」的角度,描述关键流程。
8.1 直播脚本生成工作流
- 用户在前端填写配置:
- 话题、时长、人设、风格、目标受众、平台等。
- 前端调用
/api/scripts/live/generate或创建任务/api/tasks。 - 后端 API:
- 根据用户 ID 与项目 ID 初始化
AssetService、AgentOrchestrator、ScriptService。
- 根据用户 ID 与项目 ID 初始化
- RAG 检索:
- 使用 topic/persona/goal 生成查询 embedding,在
asset_embedding中检索相关素材。 - 格式化成可读文本注入 Prompt。
- 使用 topic/persona/goal 生成查询 embedding,在
- Planner Agent 调用 LLM:
- 使用
live_outline.txt模板,生成直播大纲 JSON(带时间段与要点)。
- 使用
- Writer Agent 调用 LLM:
- 使用
live_script.txt模板,把大纲和素材一起输入,生成 Markdown 逐字稿。
- 使用
- Reviewer Agent 调用 LLM:
- 对脚本做风格统一、补充互动话术和金句。
- 结果存储:
- 新建
script,新增一条script_version,保存 outline_json、content_md 与本次配置。
- 新建
- 返回前端:
- 返回脚本 ID、版本 ID、outline JSON 和 Markdown 脚本。
8.2 短视频脚本生成工作流
- 用户在前端:
- 输入短视频主题、时长、平台、风格;
- 可选:从已有直播脚本中选择一个版本作为参考文本。
- API
/api/scripts/short-video/generate:- 若传
reference_script_id,则从script_version读出对应 Markdown,截取关键部分。
- 若传
- 调用 LLM:
- 使用
short_video.txt模板,生成包含title、cover_text、shots数组的 JSON。
- 使用
- 存储:
- 创建
script(类型short_video),脚本内容直接保存 JSON(或转为 Markdown + JSON)。
- 创建
- 前端:
- 展示分镜列表,每个分镜包含时间范围、画面描述、旁白文案,可供用户微调。
8.3 脚本迭代与版本管理
- 用户修改脚本内容:
- 提交新版内容给后端,创建
script_version,source='manual'。
- 提交新版内容给后端,创建
- 用户希望「在此基础上再优化一次」:
- 将当前版本内容作为
reference_text或 Prompt 输入,再次通过 Reviewer Agent / Writer Agent 调用 LLM 进行重写。 - 产生新版本,保留旧版,支持对比与回滚。
- 将当前版本内容作为
九、部署与优化策略
本部分介绍如何把系统部署上线,以及在性能、成本、质量上的优化方案。
9.1 Docker 化后端服务
# Dockerfile
FROM python:3.11-slim
WORKDIR /app
ENV PYTHONUNBUFFERED=1
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]
9.2 docker-compose 示例
version: "3.9"
services:
api:
build: .
depends_on:
- db
- redis
environment:
DATABASE_URL: postgresql+asyncpg://user:pass@db:5432/ai_script
REDIS_URL: redis://redis:6379/0
OPENAI_API_KEY: your_key_here
ports:
- "8000:8000"
worker:
build: .
command: ["celery", "-A", "app.workers.celery_worker.celery_app", "worker", "-l", "info"]
depends_on:
- api
- redis
- db
environment:
DATABASE_URL: postgresql+asyncpg://user:pass@db:5432/ai_script
REDIS_URL: redis://redis:6379/0
OPENAI_API_KEY: your_key_here
db:
image: postgres:15
environment:
POSTGRES_USER: user
POSTGRES_PASSWORD: pass
POSTGRES_DB: ai_script
ports:
- "5432:5432"
redis:
image: redis:7
ports:
- "6379:6379"
9.3 性能与成本优化策略
- Prompt 优化:
- 控制上下文长度:RAG 检索后只保留最相关的 3~5 条素材。
- 采用结构化输出(JSON),减少二次解析和二次对话。
- 缓存:
- 相同配置下的生成结果可以做缓存(如:同一 topic + persona + style)。
- 异步与批量:
- 使用 Celery / asyncio 并行调用 LLM,减少总耗时。
- 多个短视频脚本的生成可以批处理。
- 模型分层:
- 大纲和结构类任务可用相对便宜的模型(如 GPT-4.1-mini / 本地模型),最终逐字稿用更强模型。
- 监控与日志:
- 记录 Prompt/Response 长度、耗时、token 数量,方便优化成本。
十、扩展方向与多智能体演进
在现有设计基础上,可以进一步演进成更强大的「多智能体内容生产平台」:
- 自动热点抓取 Agent:
- 从微博/抖音热榜抓取热点话题,自动生成「选题 + 直播脚本 + N 个短视频脚本」。
- 数据分析 Agent:
- 读取以往直播/短视频的表现数据(观看时长、转化率),反向调整脚本结构和话术风格。
- 多语言 Agent:
- 在脚本生成后自动生成多语言版本,并考虑文化差异调整。
总结
- 提供了一套可落地的系统方案:从架构、数据库、后端代码、多智能体编排、向量检索、任务队列到前端交互的完整链路。
- 给出了关键模块代码示例(LLM 封装、RAG 检索、脚本生成 Orchestrator、API 路由、任务 Worker、前端页面)。
- 通过多智能体 + RAG + 版本管理 + 任务队列的组合,可以较低成本地搭建一个工业级的「直播脚本与短视频智能生成助手」。
如果你希望,我可以在下一步帮你:
- 把其中某一部分(例如「直播脚本多智能体编排」或「PGVector 初始化与检索」)进一步展开成完整可运行 Demo 项目的代码骨架。
更多推荐



所有评论(0)