一、目标与整体说明

本方案从技术实现视角,完整设计并实现一个「直播脚本与短视频智能生成助手」系统,包含:

  • 系统架构设计:整体模块划分、服务拆分、调用关系。
  • 核心模块代码实现:后端(FastAPI)、LLM 调用、脚本生成逻辑、多智能体编排、任务队列、向量检索等。
  • 数据库设计:核心表结构(用户、项目、脚本版本、任务队列、素材库等),含 SQL 示例。
  • 部署与优化策略:容器化、环境配置、伸缩、高并发与成本优化。
  • 完整系统工作流:从用户输入到脚本生成、短视频脚本拆解、迭代优化的端到端流程。

下面所有代码以**Python + FastAPI + PostgreSQL + Redis + 前端(任意 Web 框架)**为参考实现,你可以直接在此基础上落地一个可运行的系统。


二、系统架构设计

本部分给出系统整体架构,说明各模块职责和调用链路,为后续代码与数据库设计打基础。

2.1 逻辑架构图(文字版)

整体采用前后端分离 + LLM 编排服务 + 任务队列 + 向量检索 + 对象存储的典型结构:

          ┌──────────────────────── Web / App 前端 ────────────────────────┐
          │  - 直播脚本生成页面(话题/人设/风格配置)                      │
          │  - 短视频脚本生成页面(热点/素材选择)                        │
          │  - 脚本编辑器(版本对比、修改、再次生成)                     │
          └───────────────────────────────────────────────────────────────┘
                                   │  HTTPS / REST / WebSocket
                                   ▼
                     ┌──────────────────────────────────┐
                     │          API Gateway / BFF       │
                     │ (FastAPI: auth, routing, RBAC)   │
                     └──────────────────────────────────┘
                                   │
                     ┌──────────────────────────────────┐
                     │        业务后端服务层            │
                     │ - UserService / ProjectService   │
                     │ - ScriptService(核心)          │
                     │ - VideoPlanService               │
                     │ - TaskService / JobService       │
                     └──────────────────────────────────┘
       ┌──────────────────────────────┼──────────────────────────────┐
       ▼                              ▼                              ▼
┌────────────────┐         ┌──────────────────┐           ┌──────────────────┐
│  LLM Orchestr. │         │ Vector Store     │           │  Task Queue      │
│  (LLM 调度层)  │         │ (PGVector/Milvus)│           │ (Redis/Celery)   │
│ - Prompt模板   │         │ - 素材向量检索   │           │ - 异步生成任务   │
│ - 多智能体编排 │         │ - 历史脚本检索   │           │ - 任务状态管理   │
└────────────────┘         └──────────────────┘           └──────────────────┘
       │                              │                              │
       ▼                              ▼                              ▼
┌─────────────┐              ┌──────────────┐                 ┌──────────────┐
│ LLM Provider│              │ PostgreSQL   │                 │ Object Store │
│(OpenAI/本地)│              │  业务数据库  │                 │(S3/OSS/本地) │
└─────────────┘              └──────────────┘                 └──────────────┘

2.2 功能模块划分

  • 用户与项目管理模块
    • 管理用户、团队、项目(例如:某场直播、某一短视频系列)。
  • 直播脚本生成模块
    • 根据「人设 / 话题 / 时长 / 风格 / 观众画像」生成整体大纲 + 详细分段脚本 + 互动话术
  • 短视频脚本生成模块
    • 可直接从文本话题、直播片段概要,生成短视频脚本(标题、封面文案、完整旁白脚本、镜头说明)。
  • 素材与知识库模块
    • 管理品牌文案、过往优秀脚本、产品介绍,用向量库做RAG增强,让生成脚本更加贴合品牌语气。
  • 多智能体编排模块(核心特色)
    • 规划 Agent(Planner):拆分子任务(大纲、脚本、互动、金句等)。
    • 脚本 Writer Agent:负责语料生成。
    • 风格 Reviewer Agent:检查语言风格是否统一,帮忙重写。
  • 任务队列与异步执行模块
    • 大型脚本、多版本生成等放入任务队列,由后台 Worker 异步执行。
  • 前端交互模块
    • 多步配置、脚本展示与编辑、版本管理、导出为文档/字幕等。

三、技术选型与项目结构

3.1 技术选型

  • 后端框架:FastAPI(性能好 + OpenAPI 文档 + 异步友好)。
  • 数据库:PostgreSQL(支持 JSONB、可扩展 PGVector)。
  • 向量库
    • 小规模直接使用 PGVector 插件;
    • 大规模可引入 Milvus / Qdrant。
  • 缓存与队列:Redis + Celery(异步执行脚本生成任务)。
  • LLM 调用
    • OpenAI API / Azure OpenAI / 本地 LLM(如 Llama / Qwen)封装成统一接口。
  • 前端:任意主流框架(React / Vue),下面用 React 示例。
  • 部署:Docker 容器化,Kubernetes / Docker Compose。

3.2 后端项目结构建议

ai-script-assistant/
  ├─ app/
  │  ├─ main.py                # 入口,FastAPI app
  │  ├─ config.py              # 配置管理
  │  ├─ deps.py                # 依赖注入(DB, services)
  │  ├─ api/
  │  │  ├─ routes_users.py
  │  │  ├─ routes_projects.py
  │  │  ├─ routes_scripts.py   # 直播/短视频脚本 API
  │  │  ├─ routes_tasks.py
  │  ├─ models/                # SQLAlchemy ORM 模型
  │  ├─ schemas/               # Pydantic 模型
  │  ├─ services/              # 业务服务
  │  │  ├─ script_service.py
  │  │  ├─ video_plan_service.py
  │  │  ├─ task_service.py
  │  │  └─ agent_orchestrator.py
  │  ├─ llm/
  │  │  ├─ client.py           # 统一 LLM 客户端封装
  │  │  ├─ prompts/            # Prompt 模板
  │  │  │  ├─ live_outline.txt
  │  │  │  ├─ live_script.txt
  │  │  │  └─ short_video.txt
  │  ├─ workers/
  │  │  └─ celery_worker.py    # Worker 进程
  │  └─ utils/
  │     ├─ logging.py
  │     └─ text_postprocess.py
  ├─ migrations/               # Alembic 迁移
  ├─ Dockerfile
  ├─ docker-compose.yml
  ├─ requirements.txt
  └─ README.md

四、数据库设计

本部分给出核心数据表设计(PostgreSQL + 可选 PGVector),以支持用户管理、项目管理、脚本生成、版本管理与任务队列。

4.1 核心实体关系(简图)

User (用户)
  └─< Project (项目:一次直播或系列短视频)
        ├─< Script (脚本:直播脚本/短视频脚本)
        │      └─< ScriptVersion (脚本版本)
        ├─< Asset (素材:文案、产品介绍等)
        └─< Task (生成任务队列)

4.2 SQL:用户与项目

-- 用户表
CREATE TABLE "user" (
    id            BIGSERIAL PRIMARY KEY,
    email         VARCHAR(255) UNIQUE NOT NULL,
    password_hash VARCHAR(255) NOT NULL,
    name          VARCHAR(100),
    created_at    TIMESTAMP NOT NULL DEFAULT now(),
    updated_at    TIMESTAMP NOT NULL DEFAULT now()
);

-- 项目表:一次直播或一组短视频
CREATE TABLE project (
    id            BIGSERIAL PRIMARY KEY,
    user_id       BIGINT NOT NULL REFERENCES "user"(id),
    name          VARCHAR(255) NOT NULL,
    type          VARCHAR(50) NOT NULL,  -- 'live' / 'short_video'
    description   TEXT,
    meta          JSONB,                 -- 直播时间、平台、受众等信息
    created_at    TIMESTAMP NOT NULL DEFAULT now(),
    updated_at    TIMESTAMP NOT NULL DEFAULT now()
);

4.3 脚本与脚本版本

-- 脚本主表:一个项目下可有多个脚本(预热场、正式场等)
CREATE TABLE script (
    id            BIGSERIAL PRIMARY KEY,
    project_id    BIGINT NOT NULL REFERENCES project(id),
    title         VARCHAR(255) NOT NULL,
    script_type   VARCHAR(50) NOT NULL,  -- 'live' / 'short_video'
    status        VARCHAR(50) NOT NULL,  -- 'draft' / 'generated' / 'published'
    current_version_id BIGINT,           -- 指向当前激活版本
    created_at    TIMESTAMP NOT NULL DEFAULT now(),
    updated_at    TIMESTAMP NOT NULL DEFAULT now()
);

-- 脚本版本表:保存每次 LLM 生成和人工修改后的版本
CREATE TABLE script_version (
    id            BIGSERIAL PRIMARY KEY,
    script_id     BIGINT NOT NULL REFERENCES script(id),
    version_number INT NOT NULL,
    source        VARCHAR(50) NOT NULL,  -- 'llm' / 'manual'
    content_md    TEXT NOT NULL,         -- markdown 格式的脚本
    outline_json  JSONB,                 -- 结构化大纲:章节/时间点/互动点
    config        JSONB,                 -- 本次生成时使用的配置(风格、人设等)
    created_at    TIMESTAMP NOT NULL DEFAULT now()
);

CREATE UNIQUE INDEX idx_script_version_unique
    ON script_version(script_id, version_number);

4.4 素材与向量库(RAG)

-- 素材表:品牌调性、产品文案、历史爆款脚本等
CREATE TABLE asset (
    id           BIGSERIAL PRIMARY KEY,
    project_id   BIGINT REFERENCES project(id),
    user_id      BIGINT NOT NULL REFERENCES "user"(id),
    title        VARCHAR(255) NOT NULL,
    content      TEXT NOT NULL,
    tags         TEXT[],
    created_at   TIMESTAMP NOT NULL DEFAULT now()
);

-- PGVector:存储素材向量(文本嵌入)
-- 需要事先安装 pgvector 插件,并创建 vector 类型
CREATE TABLE asset_embedding (
    asset_id    BIGINT PRIMARY KEY REFERENCES asset(id),
    embedding   vector(1536) NOT NULL   -- 以 OpenAI text-embedding-3-small 为例
);

CREATE INDEX idx_asset_embedding_vector
    ON asset_embedding
    USING ivfflat (embedding vector_cosine_ops);

4.5 生成任务队列表

CREATE TABLE task (
    id            BIGSERIAL PRIMARY KEY,
    project_id    BIGINT REFERENCES project(id),
    script_id     BIGINT REFERENCES script(id),
    type          VARCHAR(50) NOT NULL, -- 'generate_live', 'generate_short_video'
    status        VARCHAR(50) NOT NULL, -- 'pending', 'running', 'success', 'failed'
    priority      INT NOT NULL DEFAULT 0,
    params        JSONB,                -- 生成参数
    result        JSONB,                -- 输出结果引用(脚本版本ID、错误信息等)
    error_message TEXT,
    created_at    TIMESTAMP NOT NULL DEFAULT now(),
    updated_at    TIMESTAMP NOT NULL DEFAULT now()
);

五、核心模块代码实现(后端)

下面给出关键模块的核心实现代码(删繁就简,但保持可落地),包括:配置、LLM 封装、Prompt 模板、脚本服务、多智能体编排、任务队列。

5.1 配置与应用入口

# app/config.py
from pydantic import BaseSettings

class Settings(BaseSettings):
    app_name: str = "AI Live & Short Video Script Assistant"
    api_prefix: str = "/api"
    database_url: str
    redis_url: str
    openai_api_key: str
    openai_base_url: str | None = None
    model_name: str = "gpt-4.1"
    embedding_model: str = "text-embedding-3-small"

    class Config:
        env_file = ".env"

settings = Settings()
# app/main.py
from fastapi import FastAPI
from .config import settings
from .api import routes_scripts, routes_tasks

def create_app() -> FastAPI:
    app = FastAPI(title=settings.app_name)
    app.include_router(routes_scripts.router, prefix=settings.api_prefix)
    app.include_router(routes_tasks.router, prefix=settings.api_prefix)
    return app

app = create_app()

5.2 统一 LLM 客户端封装

# app/llm/client.py
import httpx
from typing import List, Dict, Any
from ..config import settings

class LLMClient:
    def __init__(self):
        self._client = httpx.AsyncClient(
            base_url=settings.openai_base_url or "https://api.openai.com/v1",
            headers={"Authorization": f"Bearer {settings.openai_api_key}"}
        )
        self.model = settings.model_name
        self.embedding_model = settings.embedding_model

    async def chat(self, messages: List[Dict[str, str]], **kwargs) -> str:
        payload = {
            "model": self.model,
            "messages": messages,
            **kwargs
        }
        resp = await self._client.post("/chat/completions", json=payload)
        resp.raise_for_status()
        data = resp.json()
        return data["choices"][0]["message"]["content"]

    async def embed(self, input_texts: List[str]) -> List[List[float]]:
        payload = {
            "model": self.embedding_model,
            "input": input_texts
        }
        resp = await self._client.post("/embeddings", json=payload)
        resp.raise_for_status()
        data = resp.json()
        return [item["embedding"] for item in data["data"]]

llm_client = LLMClient()

5.3 Prompt 模板示例

# app/llm/prompts/live_outline.txt

你是一名资深直播策划与脚本编剧,擅长为主播设计高转化率的直播大纲和话术。

【直播信息】
- 人设:{{ persona }}
- 主题:{{ topic }}
- 目标:{{ goal }}
- 时长:{{ duration_minutes }} 分钟
- 平台:{{ platform }}
- 目标受众:{{ audience }}

【素材与品牌信息】
{{ retrieved_assets }}

请输出:
1. 直播整体结构(如:开场、暖场互动、产品介绍节奏、福利节奏、结束收尾)
2. 每个阶段的时间范围、阶段目标
3. 每个阶段的关键话术要点(要点式,不需要逐字稿)

请使用 JSON 格式输出,字段包括:
- "sections": [
    {
      "id": "string",
      "name": "string",
      "start_min": number,
      "end_min": number,
      "objective": "string",
      "bullet_points": ["string", ...]
    },
    ...
  ]
# app/llm/prompts/live_script.txt

你是一名专业直播脚本编剧。

【上一步的直播大纲 JSON】
{{ outline_json }}

【要求】
1. 根据每个阶段的大纲要点,输出详细的直播逐字稿。
2. 语言风格:{{ style }} (如:幽默、干货、情绪煽动等)
3. 保持与以下品牌和产品信息一致:
{{ retrieved_assets }}

【输出格式】
请使用 Markdown 格式,每个阶段一个二级标题(##),并在内部按对话话术分段。
示例:
## 开场
- 主播:xxxx
- 主播:xxxx
- 主播:xxxx
...
# app/llm/prompts/short_video.txt

你是一名短视频编剧与文案专家,擅长为抖音/快手/小红书制作高完播率的脚本。

【视频信息】
- 平台:{{ platform }}
- 主题:{{ topic }}
- 时长:{{ duration_seconds }} 秒
- 目标:{{ goal }}
- 受众:{{ audience }}

【可参考的直播内容节选/素材】
{{ reference_text }}

【要求】
1. 输出一个适合短视频的标题(具有吸引力和钩子)
2. 输出封面文案(适合放在封面图上的少量文字)
3. 输出详细脚本,包含每 3-5 秒一个分镜描述和对应旁白文案
4. 风格:{{ style }}

【输出 JSON 结构】
{
  "title": "...",
  "cover_text": "...",
  "shots": [
    {
      "start_sec": 0,
      "end_sec": 4,
      "scene": "画面描述",
      "voiceover": "旁白文案"
    },
    ...
  ]
}

5.4 向量检索服务(RAG)

从素材中检索与本次直播/短视频相关的内容,作为 prompt 的上下文。

# app/services/asset_service.py
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import text
from typing import List
from ..llm.client import llm_client

class AssetService:
    def __init__(self, db: AsyncSession):
        self.db = db

    async def search_assets(self, project_id: int | None, user_id: int, query: str, top_k: int = 5) -> List[dict]:
        # 1. 对 query 做 embedding
        embeddings = await llm_client.embed([query])
        emb = embeddings[0]
        emb_str = ",".join(str(x) for x in emb)

        # 2. 使用 PGVector 相似度检索
        sql = text("""
            SELECT a.id, a.title, a.content, a.tags
            FROM asset a
            JOIN asset_embedding e ON a.id = e.asset_id
            WHERE (a.project_id = :project_id OR :project_id IS NULL)
              AND a.user_id = :user_id
            ORDER BY e.embedding <-> (:emb::vector)
            LIMIT :top_k
        """)
        rows = (await self.db.execute(sql, {
            "project_id": project_id,
            "user_id": user_id,
            "emb": emb_str,
            "top_k": top_k
        })).mappings().all()
        return [dict(r) for r in rows]

    @staticmethod
    def format_assets_for_prompt(assets: List[dict]) -> str:
        lines = []
        for a in assets:
            tags = ", ".join(a.get("tags") or [])
            lines.append(f"[{a['title']}] (tags: {tags})\n{a['content']}\n")
        return "\n\n".join(lines)

5.5 多智能体编排核心(Planner + Writer + Reviewer)

通过一个 Orchestrator 组合多个「子 Agent」,这里用简单函数模拟多轮调用。

# app/services/agent_orchestrator.py
import json
from typing import Dict, Any
from ..llm.client import llm_client
from ..services.asset_service import AssetService
from ..config import settings
from pathlib import Path

PROMPTS_DIR = Path(__file__).resolve().parent.parent / "llm" / "prompts"

def load_prompt(name: str) -> str:
    return (PROMPTS_DIR / name).read_text(encoding="utf-8")

class AgentOrchestrator:
    def __init__(self, asset_service: AssetService):
        self.asset_service = asset_service

    async def generate_live_script(self, params: Dict[str, Any]) -> Dict[str, Any]:
        """
        多智能体流程:
        1. Planner Agent:生成直播大纲(JSON)
        2. Writer Agent:生成逐字稿 Markdown
        3. Reviewer Agent:风格统一和优化
        """
        persona = params["persona"]
        topic = params["topic"]
        goal = params.get("goal", "提升转化")
        duration = params.get("duration_minutes", 60)
        platform = params.get("platform", "抖音")
        audience = params.get("audience", "泛用户")
        style = params.get("style", "轻松幽默")

        # 1. 根据 topic + persona 从素材库中检索品牌/产品信息
        assets = await self.asset_service.search_assets(
            project_id=params.get("project_id"),
            user_id=params["user_id"],
            query=f"{topic} {persona} {goal}"
        )
        assets_text = self.asset_service.format_assets_for_prompt(assets)

        # 2. Planner Agent:生成直播大纲 JSON
        outline_prompt_template = load_prompt("live_outline.txt")
        outline_prompt = outline_prompt_template.format(
            persona=persona,
            topic=topic,
            goal=goal,
            duration_minutes=duration,
            platform=platform,
            audience=audience,
            retrieved_assets=assets_text
        )

        outline_json_str = await llm_client.chat([
            {"role": "system", "content": "你是直播策划专家,负责设计结构化直播大纲。"},
            {"role": "user", "content": outline_prompt}
        ], temperature=0.4)

        try:
            outline = json.loads(outline_json_str)
        except json.JSONDecodeError:
            # 简单修正:尝试从 Markdown code block 中提取 JSON
            outline = self._extract_json(outline_json_str)

        # 3. Writer Agent:根据大纲生成逐字稿
        script_prompt_template = load_prompt("live_script.txt")
        script_prompt = script_prompt_template.format(
            outline_json=json.dumps(outline, ensure_ascii=False, indent=2),
            style=style,
            retrieved_assets=assets_text
        )

        script_md = await llm_client.chat([
            {"role": "system", "content": "你是直播脚本编剧,负责写详细逐字稿。"},
            {"role": "user", "content": script_prompt}
        ], temperature=0.7)

        # 4. Reviewer Agent:统一风格(可选)
        reviewer_prompt = (
            "你是一名文字编辑,请在不改变核心意思的前提下,统一下述直播脚本的口吻为:"
            f"{style},适当增加互动话术和金句,输出 Markdown。\n\n{script_md}"
        )

        reviewed_md = await llm_client.chat([
            {"role": "system", "content": "你是资深文案编辑。"},
            {"role": "user", "content": reviewer_prompt}
        ], temperature=0.5)

        return {
            "outline": outline,
            "script_md": reviewed_md,
            "used_assets": assets
        }

    async def generate_short_video_script(self, params: Dict[str, Any]) -> Dict[str, Any]:
        """
        短视频脚本生成流程:
        1. 参考直播脚本节选(或话题简介)
        2. 生成 JSON 结构的分镜脚本
        """
        topic = params["topic"]
        platform = params.get("platform", "抖音")
        duration_seconds = params.get("duration_seconds", 30)
        goal = params.get("goal", "提升曝光")
        audience = params.get("audience", "泛用户")
        style = params.get("style", "节奏明快")
        reference_text = params.get("reference_text", "")

        short_prompt_template = load_prompt("short_video.txt")
        short_prompt = short_prompt_template.format(
            platform=platform,
            topic=topic,
            duration_seconds=duration_seconds,
            goal=goal,
            audience=audience,
            style=style,
            reference_text=reference_text
        )

        resp = await llm_client.chat([
            {"role": "system", "content": "你是短视频脚本专家。"},
            {"role": "user", "content": short_prompt}
        ], temperature=0.7)

        try:
            data = json.loads(resp)
        except json.JSONDecodeError:
            data = self._extract_json(resp)

        return data

    @staticmethod
    def _extract_json(text: str) -> Dict[str, Any]:
        import re
        m = re.search(r"\{.*\}", text, re.S)
        if not m:
            return {}
        s = m.group(0)
        return json.loads(s)

5.6 脚本服务与 FastAPI 路由

5.6.1 Pydantic 模型
# app/schemas/script.py
from pydantic import BaseModel, Field
from typing import Optional, Any

class LiveScriptGenerateRequest(BaseModel):
    project_id: Optional[int]
    persona: str
    topic: str
    goal: str = "提升转化"
    duration_minutes: int = 60
    platform: str = "抖音"
    audience: str = "泛用户"
    style: str = "轻松幽默"

class LiveScriptGenerateResponse(BaseModel):
    script_id: int
    version_id: int
    outline: Any
    content_md: str

class ShortVideoGenerateRequest(BaseModel):
    project_id: Optional[int]
    topic: str
    duration_seconds: int = 30
    platform: str = "抖音"
    audience: str = "泛用户"
    goal: str = "曝光"
    style: str = "节奏明快"
    reference_script_id: Optional[int]

class ShortVideoGenerateResponse(BaseModel):
    script_id: int
    version_id: int
    result_json: Any
5.6.2 Script Service(与数据库交互)
# app/services/script_service.py
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select, func
from typing import Dict, Any
from ..models import Script, ScriptVersion

class ScriptService:
    def __init__(self, db: AsyncSession):
        self.db = db

    async def create_script(
        self, project_id: int, title: str, script_type: str, status: str = "generated"
    ) -> Script:
        script = Script(
            project_id=project_id,
            title=title,
            script_type=script_type,
            status=status
        )
        self.db.add(script)
        await self.db.flush()
        return script

    async def add_script_version(
        self, script_id: int, content_md: str, outline_json: Dict | None, config: Dict[str, Any], source: str = "llm"
    ) -> ScriptVersion:
        # 获取当前最大版本号
        stmt = select(func.max(ScriptVersion.version_number)).where(
            ScriptVersion.script_id == script_id
        )
        max_version = (await self.db.execute(stmt)).scalar() or 0
        version = ScriptVersion(
            script_id=script_id,
            version_number=max_version + 1,
            source=source,
            content_md=content_md,
            outline_json=outline_json,
            config=config
        )
        self.db.add(version)
        await self.db.flush()
        # 更新 script.current_version_id
        script = await self.db.get(Script, script_id)
        script.current_version_id = version.id
        await self.db.flush()
        return version
5.6.3 API 路由:同步生成(演示)

实际生产中应走任务队列,下面先给同步版本,方便开发调试。

# app/api/routes_scripts.py
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from ..deps import get_db, get_current_user
from ..services.asset_service import AssetService
from ..services.agent_orchestrator import AgentOrchestrator
from ..services.script_service import ScriptService
from ..schemas.script import (
    LiveScriptGenerateRequest, LiveScriptGenerateResponse,
    ShortVideoGenerateRequest, ShortVideoGenerateResponse
)

router = APIRouter(tags=["scripts"])

@router.post("/scripts/live/generate", response_model=LiveScriptGenerateResponse)
async def generate_live_script(
    body: LiveScriptGenerateRequest,
    db: AsyncSession = Depends(get_db),
    user=Depends(get_current_user)
):
    asset_service = AssetService(db)
    orchestrator = AgentOrchestrator(asset_service)
    script_service = ScriptService(db)

    params = body.dict()
    params["user_id"] = user.id

    result = await orchestrator.generate_live_script(params)

    if not body.project_id:
        # 若未指定项目,可自动创建默认项目(此处略),假定有 project_id
        raise HTTPException(status_code=400, detail="project_id is required in this simplified example")

    script = await script_service.create_script(
        project_id=body.project_id,
        title=f"{body.topic} - 直播脚本",
        script_type="live"
    )

    version = await script_service.add_script_version(
        script_id=script.id,
        content_md=result["script_md"],
        outline_json=result["outline"],
        config=body.dict()
    )

    await db.commit()

    return LiveScriptGenerateResponse(
        script_id=script.id,
        version_id=version.id,
        outline=result["outline"],
        content_md=result["script_md"]
    )


@router.post("/scripts/short-video/generate", response_model=ShortVideoGenerateResponse)
async def generate_short_video_script(
    body: ShortVideoGenerateRequest,
    db: AsyncSession = Depends(get_db),
    user=Depends(get_current_user)
):
    asset_service = AssetService(db)
    orchestrator = AgentOrchestrator(asset_service)
    script_service = ScriptService(db)

    reference_text = ""
    if body.reference_script_id:
        # 从 ScriptVersion 中随便取一个版本:真实项目里按 current_version_id 或最新版本读取
        from ..models import ScriptVersion
        version = await db.get(ScriptVersion, body.reference_script_id)
        if version:
            reference_text = version.content_md

    params = body.dict()
    params["reference_text"] = reference_text

    data = await orchestrator.generate_short_video_script(params)

    if not body.project_id:
        raise HTTPException(status_code=400, detail="project_id is required in this simplified example")

    script = await script_service.create_script(
        project_id=body.project_id,
        title=f"{body.topic} - 短视频脚本",
        script_type="short_video"
    )

    version = await script_service.add_script_version(
        script_id=script.id,
        content_md=json.dumps(data, ensure_ascii=False, indent=2),
        outline_json=None,
        config=body.dict()
    )

    await db.commit()

    return ShortVideoGenerateResponse(
        script_id=script.id,
        version_id=version.id,
        result_json=data
    )

六、任务队列与异步执行

直播脚本和短视频脚本生成可能较耗时,生产中应通过任务队列 + Worker异步执行,并在前端展示进度。

6.1 Celery 配置

# app/workers/celery_worker.py
import os
from celery import Celery

broker_url = os.getenv("REDIS_URL", "redis://localhost:6379/0")
backend_url = broker_url

celery_app = Celery(
    "ai_script_worker",
    broker=broker_url,
    backend=backend_url,
)

celery_app.conf.update(
    task_routes={
        "tasks.generate_live_script": {"queue": "live_script"},
        "tasks.generate_short_video": {"queue": "short_video"},
    },
    task_default_queue="default",
)

6.2 任务定义

# app/workers/tasks.py
import asyncio
from .celery_worker import celery_app
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.orm import sessionmaker
from ..config import settings
from ..services.asset_service import AssetService
from ..services.agent_orchestrator import AgentOrchestrator
from ..services.script_service import ScriptService
from ..models import Task as TaskModel

engine = create_async_engine(settings.database_url, future=True)
AsyncSessionLocal = sessionmaker(engine, class_=AsyncSession, expire_on_commit=False)

@celery_app.task(name="tasks.generate_live_script")
def generate_live_script_task(task_id: int):
    asyncio.run(_generate_live_script_async(task_id))

async def _generate_live_script_async(task_id: int):
    async with AsyncSessionLocal() as db:
        task = await db.get(TaskModel, task_id)
        if not task:
            return
        task.status = "running"
        await db.flush()

        try:
            asset_service = AssetService(db)
            orchestrator = AgentOrchestrator(asset_service)
            script_service = ScriptService(db)

            params = dict(task.params)
            params.setdefault("user_id",  task.params.get("user_id", 0))

            result = await orchestrator.generate_live_script(params)
            if "project_id" not in params:
                raise ValueError("project_id is required")

            script = await script_service.create_script(
                project_id=params["project_id"],
                title=f"{params['topic']} - 直播脚本",
                script_type="live"
            )
            version = await script_service.add_script_version(
                script_id=script.id,
                content_md=result["script_md"],
                outline_json=result["outline"],
                config=params
            )
            task.status = "success"
            task.result = {"script_id": script.id, "version_id": version.id}
        except Exception as e:
            task.status = "failed"
            task.error_message = str(e)

        await db.commit()

任务创建 API 只需把生成参数存入 task.params,然后调用 generate_live_script_task.delay(task_id) 即可,前端轮询 /tasks/{id} 获取执行进度。


七、前端交互与示例代码

本部分简要展示如何在前端调用直播脚本生成接口,并展示结果。

7.1 React 简易示例:直播脚本生成表单

// src/pages/LiveScriptGenerator.tsx
import React, { useState } from "react";
import axios from "axios";

interface LiveScriptResponse {
  script_id: number;
  version_id: number;
  outline: any;
  content_md: string;
}

export const LiveScriptGenerator: React.FC = () => {
  const [topic, setTopic] = useState("");
  const [persona, setPersona] = useState("专业带货主播");
  const [duration, setDuration] = useState(60);
  const [style, setStyle] = useState("轻松幽默");
  const [loading, setLoading] = useState(false);
  const [result, setResult] = useState<LiveScriptResponse | null>(null);

  const handleSubmit = async (e: React.FormEvent) => {
    e.preventDefault();
    setLoading(true);
    try {
      const resp = await axios.post<LiveScriptResponse>("/api/scripts/live/generate", {
        project_id: 1,
        persona,
        topic,
        duration_minutes: duration,
        style,
        goal: "提升转化",
        platform: "抖音",
        audience: "年轻女性"
      });
      setResult(resp.data);
    } catch (err) {
      console.error(err);
      alert("生成失败");
    } finally {
      setLoading(false);
    }
  };

  return (
    <div style={{ maxWidth: 960, margin: "0 auto", padding: 24 }}>
      <h2>直播脚本智能生成助手</h2>
      <form onSubmit={handleSubmit}>
        <div>
          <label>直播主题</label>
          <input value={topic} onChange={e => setTopic(e.target.value)} required />
        </div>
        <div>
          <label>主播人设</label>
          <input value={persona} onChange={e => setPersona(e.target.value)} />
        </div>
        <div>
          <label>时长(分钟)</label>
          <input
            type="number"
            value={duration}
            onChange={e => setDuration(Number(e.target.value))}
          />
        </div>
        <div>
          <label>风格</label>
          <input value={style} onChange={e => setStyle(e.target.value)} />
        </div>
        <button type="submit" disabled={loading}>
          {loading ? "生成中..." : "生成脚本"}
        </button>
      </form>

      {result && (
        <div style={{ marginTop: 32 }}>
          <h3>生成结果(Markdown)</h3>
          <pre style={{ whiteSpace: "pre-wrap" }}>{result.content_md}</pre>
        </div>
      )}
    </div>
  );
};

在实际项目里,可用 Markdown 渲染组件(如 react-markdown)来美观展示脚本。


八、完整系统工作流

本部分从「用户操作 → 系统内部调用 → LLM → 数据落库」的角度,描述关键流程。

8.1 直播脚本生成工作流

  1. 用户在前端填写配置
    • 话题、时长、人设、风格、目标受众、平台等。
  2. 前端调用 /api/scripts/live/generate 或创建任务 /api/tasks
  3. 后端 API
    • 根据用户 ID 与项目 ID 初始化 AssetServiceAgentOrchestratorScriptService
  4. RAG 检索
    • 使用 topic/persona/goal 生成查询 embedding,在 asset_embedding 中检索相关素材。
    • 格式化成可读文本注入 Prompt。
  5. Planner Agent 调用 LLM:
    • 使用 live_outline.txt 模板,生成直播大纲 JSON(带时间段与要点)。
  6. Writer Agent 调用 LLM:
    • 使用 live_script.txt 模板,把大纲和素材一起输入,生成 Markdown 逐字稿。
  7. Reviewer Agent 调用 LLM:
    • 对脚本做风格统一、补充互动话术和金句。
  8. 结果存储
    • 新建 script,新增一条 script_version,保存 outline_json、content_md 与本次配置。
  9. 返回前端
    • 返回脚本 ID、版本 ID、outline JSON 和 Markdown 脚本。

8.2 短视频脚本生成工作流

  1. 用户在前端:
    • 输入短视频主题、时长、平台、风格;
    • 可选:从已有直播脚本中选择一个版本作为参考文本
  2. API /api/scripts/short-video/generate
    • 若传 reference_script_id,则从 script_version 读出对应 Markdown,截取关键部分。
  3. 调用 LLM:
    • 使用 short_video.txt 模板,生成包含 titlecover_textshots 数组的 JSON。
  4. 存储:
    • 创建 script(类型 short_video),脚本内容直接保存 JSON(或转为 Markdown + JSON)。
  5. 前端:
    • 展示分镜列表,每个分镜包含时间范围、画面描述、旁白文案,可供用户微调。

8.3 脚本迭代与版本管理

  • 用户修改脚本内容:
    • 提交新版内容给后端,创建 script_versionsource='manual'
  • 用户希望「在此基础上再优化一次」:
    • 将当前版本内容作为 reference_text 或 Prompt 输入,再次通过 Reviewer Agent / Writer Agent 调用 LLM 进行重写。
    • 产生新版本,保留旧版,支持对比与回滚。

九、部署与优化策略

本部分介绍如何把系统部署上线,以及在性能、成本、质量上的优化方案。

9.1 Docker 化后端服务

# Dockerfile
FROM python:3.11-slim

WORKDIR /app

ENV PYTHONUNBUFFERED=1

COPY requirements.txt .
RUN pip install -r requirements.txt

COPY . .

CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"]

9.2 docker-compose 示例

version: "3.9"
services:
  api:
    build: .
    depends_on:
      - db
      - redis
    environment:
      DATABASE_URL: postgresql+asyncpg://user:pass@db:5432/ai_script
      REDIS_URL: redis://redis:6379/0
      OPENAI_API_KEY: your_key_here
    ports:
      - "8000:8000"

  worker:
    build: .
    command: ["celery", "-A", "app.workers.celery_worker.celery_app", "worker", "-l", "info"]
    depends_on:
      - api
      - redis
      - db
    environment:
      DATABASE_URL: postgresql+asyncpg://user:pass@db:5432/ai_script
      REDIS_URL: redis://redis:6379/0
      OPENAI_API_KEY: your_key_here

  db:
    image: postgres:15
    environment:
      POSTGRES_USER: user
      POSTGRES_PASSWORD: pass
      POSTGRES_DB: ai_script
    ports:
      - "5432:5432"

  redis:
    image: redis:7
    ports:
      - "6379:6379"

9.3 性能与成本优化策略

  • Prompt 优化
    • 控制上下文长度:RAG 检索后只保留最相关的 3~5 条素材。
    • 采用结构化输出(JSON),减少二次解析和二次对话。
  • 缓存
    • 相同配置下的生成结果可以做缓存(如:同一 topic + persona + style)。
  • 异步与批量
    • 使用 Celery / asyncio 并行调用 LLM,减少总耗时。
    • 多个短视频脚本的生成可以批处理。
  • 模型分层
    • 大纲和结构类任务可用相对便宜的模型(如 GPT-4.1-mini / 本地模型),最终逐字稿用更强模型。
  • 监控与日志
    • 记录 Prompt/Response 长度、耗时、token 数量,方便优化成本。

十、扩展方向与多智能体演进

在现有设计基础上,可以进一步演进成更强大的「多智能体内容生产平台」:

  • 自动热点抓取 Agent
    • 从微博/抖音热榜抓取热点话题,自动生成「选题 + 直播脚本 + N 个短视频脚本」。
  • 数据分析 Agent
    • 读取以往直播/短视频的表现数据(观看时长、转化率),反向调整脚本结构和话术风格。
  • 多语言 Agent
    • 在脚本生成后自动生成多语言版本,并考虑文化差异调整。

总结

  • 提供了一套可落地的系统方案:从架构、数据库、后端代码、多智能体编排、向量检索、任务队列到前端交互的完整链路。
  • 给出了关键模块代码示例(LLM 封装、RAG 检索、脚本生成 Orchestrator、API 路由、任务 Worker、前端页面)。
  • 通过多智能体 + RAG + 版本管理 + 任务队列的组合,可以较低成本地搭建一个工业级的「直播脚本与短视频智能生成助手」。

如果你希望,我可以在下一步帮你:

  • 把其中某一部分(例如「直播脚本多智能体编排」或「PGVector 初始化与检索」)进一步展开成完整可运行 Demo 项目的代码骨架。
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐