Celery + Redis 异步任务分发完整链路解析

实战场景:用户通过 Webhook 触发 Workflow

假设用户配置了 GitHub Webhook,当有新 Pull Request 时自动执行工作流进行代码审查。


完整链路时序图

LLM Service Celery Worker Redis Broker PostgreSQL AsyncWorkflowService Flask API Server Nginx GitHub Webhook LLM Service Celery Worker Redis Broker PostgreSQL AsyncWorkflowService Flask API Server Nginx GitHub Webhook === 阶段1: 接收请求并入队 (同步,毫秒级) === 验证 App、Workflow 存在 检查配额 选择队列 任务存储: {workflow_trigger_log_id: "xxx"} === 阶段2: Worker 异步处理 (异步,秒/分钟级) === loop [Worker 监听循环] 反序列化任务 耗时操作(10-30秒) === 阶段3: 结果查询 (可选) === POST /webhooks/github {pr_number: 123} 1 转发请求 2 trigger_workflow_async() 3 创建 WorkflowTriggerLog status=PENDING 4 trigger_log.id 5 QuotaType.WORKFLOW.consume() 6 根据租户等级选择 workflow_professional 7 LPUSH workflow_professional 序列化任务数据 8 更新 trigger_log status=QUEUED celery_task_id="abc-123" 9 AsyncTriggerResponse {task_id, status="queued"} 10 200 OK {task_id: "abc-123"} 11 HTTP 200 12 BRPOP workflow_professional 0 (阻塞式拉取) 13 返回任务数据 14 execute_workflow_professional() 15 根据 trigger_log_id 查询触发记录 16 trigger_log 对象 17 更新 status=RUNNING 18 执行 WorkflowAppGenerator.generate() 19 调用 GPT-4 分析代码 20 返回审查结果 21 更新 status=SUCCESS elapsed_time=25.3s outputs={result} 22 GET /trigger_logs/{task_id} 23 查询 trigger_log 24 {status: "success", outputs} 25 返回执行结果 26

API 入口 - 任务入队

文件位置:api/services/async_workflow_service.py

Step 1: 方法入口

@classmethod
def trigger_workflow_async(
    cls, session: Session, user: Union[Account, EndUser], trigger_data: TriggerData
) -> AsyncTriggerResponse:
    """
        Universal entry point for async workflow execution - THIS METHOD WILL NOT BLOCK
        通用异步工作流执行入口 - 此方法不会阻塞

        Creates a trigger log and dispatches to appropriate queue based on subscription tier.
        创建触发日志并根据订阅等级分发到相应队列。
        The workflow execution happens asynchronously in the background via Celery workers.
        工作流执行通过 Celery Worker 在后台异步进行。
        This method returns immediately after queuing the task, not after execution completion.
        此方法在任务入队后立即返回,而不是等待执行完成。

        Args:
            session: Database session to use for operations
            user: User (Account or EndUser) who initiated the workflow trigger
            trigger_data: Validated Pydantic model containing trigger information

        Returns:
            AsyncTriggerResponse with workflow_trigger_log_id, task_id, status="queued", and queue
            Note: The actual workflow execution status must be checked separately via workflow_trigger_log_id

        Raises:
            WorkflowNotFoundError: If app or workflow not found
            InvokeDailyRateLimitError: If daily rate limit exceeded
    """

关键点

  • 非阻塞:立即返回,不等待 LLM 调用完成
  • 状态追踪:返回 workflow_trigger_log_id 用于后续查询
  • 队列路由:根据订阅等级选择队列

Step 2: 验证与创建触发日志

        trigger_log_repo = SQLAlchemyWorkflowTriggerLogRepository(session)
        dispatcher_manager = QueueDispatcherManager()
        workflow_service = WorkflowService()

        # 🔍 1. 验证 App 存在
        # 1. Validate app exists
        app_model = session.scalar(select(App).where(App.id == trigger_data.app_id))
        if not app_model:
            raise WorkflowNotFoundError(f"App not found: {trigger_data.app_id}")

        # 🔍 2. 获取 Workflow
        # 2. Get workflow
        workflow = cls._get_workflow(workflow_service, app_model, trigger_data.workflow_id)

        # 🔍 3. 根据租户订阅获取队列调度器
        # 3. Get dispatcher based on tenant subscription
        dispatcher = dispatcher_manager.get_dispatcher(trigger_data.tenant_id)

        # 4. Rate limiting check will be done without timezone first

        # 🔍 5. 确定用户角色和 ID
        # 5. Determine user role and ID
        if isinstance(user, Account):
            created_by_role = CreatorUserRole.ACCOUNT
            created_by = user.id
        else:  # EndUser
            created_by_role = CreatorUserRole.END_USER
            created_by = user.id

        # 🔍 6. 先创建触发日志(用于追踪)
        # 6. Create trigger log entry first (for tracking)
        trigger_log = WorkflowTriggerLog(
            tenant_id=trigger_data.tenant_id,
            app_id=trigger_data.app_id,
            workflow_id=workflow.id,
            root_node_id=trigger_data.root_node_id,
            trigger_metadata=(
                trigger_data.trigger_metadata.model_dump_json() if trigger_data.trigger_metadata else "{}"
            ),
            trigger_type=trigger_data.trigger_type,
            workflow_run_id=None,
            outputs=None,
            trigger_data=trigger_data.model_dump_json(),
            inputs=json.dumps(dict(trigger_data.inputs)),
            status=WorkflowTriggerStatus.PENDING,  # 🔍 初始状态:PENDING
            queue_name=dispatcher.get_queue_name(),
            retry_count=0,
            created_by_role=created_by_role,
            created_by=created_by,
            celery_task_id=None,  # 🔍 此时还未创建 Celery 任务
            error=None,
            elapsed_time=None,
            total_tokens=None,
        )

        trigger_log = trigger_log_repo.create(trigger_log)
        session.commit()

数据库记录示例

-- WorkflowTriggerLog 表
INSERT INTO workflow_trigger_logs (
    id, tenant_id, app_id, workflow_id,
    status, queue_name, trigger_data, inputs,
    created_at
) VALUES (
    'trigger_log_123',
    'tenant_456',
    'app_789',
    'workflow_abc',
    'pending',  -- 初始状态
    'workflow_professional',
    '{"app_id": "app_789", ...}',
    '{"pr_number": 123}',
    '2024-01-20 10:30:00'
);

Step 3: 配额检查

        # 🔍 7. 检查并消耗配额
        # 7. Check and consume quota
        try:
            QuotaType.WORKFLOW.consume(trigger_data.tenant_id)
        except QuotaExceededError as e:
            # 🔍 更新触发日志状态
            # Update trigger log status
            trigger_log.status = WorkflowTriggerStatus.RATE_LIMITED
            trigger_log.error = f"Quota limit reached: {e}"
            trigger_log_repo.update(trigger_log)
            session.commit()

            raise WorkflowQuotaLimitError(
                f"Workflow execution quota limit reached for tenant {trigger_data.tenant_id}"
            ) from e

配额机制(伪代码示例):

# enums/quota_type.py
class QuotaType:
    WORKFLOW = QuotaConfig(
        daily_limit=1000,  # 免费版每天1000次
        redis_key_template="quota:workflow:{tenant_id}:daily"
    )
    
    def consume(self, tenant_id: str):
        key = f"quota:workflow:{tenant_id}:daily"
        current = redis.incr(key)  # 原子递增
        
        if current == 1:
            # 首次调用,设置24小时过期
            redis.expire(key, 86400)
        
        if current > self.daily_limit:
            raise QuotaExceededError()

Step 4: 任务入队(核心)

        # 🔍 8. 创建任务数据
        # 8. Create task data
        queue_name = dispatcher.get_queue_name()

        task_data = WorkflowTaskData(workflow_trigger_log_id=trigger_log.id)

        # ⭐⭐⭐⭐ ⭐9. 分发到相应队列
        # 9. Dispatch to appropriate queue
        task_data_dict = task_data.model_dump(mode="json")

        task: AsyncResult[Any] | None = None
        if queue_name == QueuePriority.PROFESSIONAL:
            task = execute_workflow_professional.delay(task_data_dict)  # type: ignore
        elif queue_name == QueuePriority.TEAM:
            task = execute_workflow_team.delay(task_data_dict)  # type: ignore
        else:  # SANDBOX
            task = execute_workflow_sandbox.delay(task_data_dict)  # type: ignore

        # 🔍 10. 更新触发日志,添加任务信息
        # 10. Update trigger log with task info
        trigger_log.status = WorkflowTriggerStatus.QUEUED  # 🔍 状态变更:PENDING -> QUEUED
        trigger_log.celery_task_id = task.id  # 🔍 记录 Celery 任务 ID
        trigger_log.triggered_at = datetime.now(UTC)
        trigger_log_repo.update(trigger_log)
        session.commit()

        return AsyncTriggerResponse(
            workflow_trigger_log_id=trigger_log.id,
            task_id=task.id,  # type: ignore
            status="queued",
            queue=queue_name,
        )
execute_workflow_professional.delay()

这一行代码触发了整个异步机制:

# tasks/async_workflow_tasks.py
@shared_task(queue=AsyncWorkflowQueue.PROFESSIONAL_QUEUE)
def execute_workflow_professional(task_data_dict: dict[str, Any]):
    # ... 任务执行逻辑

.delay()** 方法做了什么**:

在这里插入图片描述

底层 Redis 操作(简化):

# Celery 内部逻辑(简化)
def delay(self, *args, **kwargs):
    # 1. 序列化任务数据
    task_id = str(uuid.uuid4())
    message = {
        "id": task_id,
        "task": "tasks.async_workflow_tasks.execute_workflow_professional",
        "args": [task_data_dict],
        "kwargs": {},
        "retries": 0,
        "eta": None,
    }
    
    # 2. 推送到 Redis
    redis.lpush(
        "workflow_professional",  # 队列名
        json.dumps(message)
    )
    
    # 3. 返回 AsyncResult 对象
    return AsyncResult(task_id)

Redis 中的数据结构

# Redis CLI 查看
127.0.0.1:6379> LLEN workflow_professional
(integer) 5  # 队列中有5个待处理任务

127.0.0.1:6379> LRANGE workflow_professional 0 0
1) '{"id":"abc-123","task":"tasks.async_workflow_tasks.execute_workflow_professional","args":[{"workflow_trigger_log_id":"trigger_log_123"}],"kwargs":{},"retries":0}'

Celery Worker 拉取与执行

文件位置:api/tasks/async_workflow_tasks.py

Worker 启动命令

# 启动专业版队列的 Worker
celery -A celery worker \
    -Q workflow_professional \
    -c 8 \
    --loglevel=info \
    --prefetch-multiplier=1

参数说明

  • -Q workflow_professional:只监听此队列
  • <font style="color:#DF2A3F;">-c 8</font>:并发8个任务(使用 Gevent 协程池)
  • --prefetch-multiplier=1:每次只预取1个任务,避免任务积压

Worker 监听循环

# Celery Worker 内部伪代码
def worker_main_loop():
    while True:
        # 阻塞式拉取任务(BRPOP 命令)
        task_message = redis.brpop("workflow_professional", timeout=1)
        
        if task_message:
            # 反序列化任务
            task = deserialize(task_message)
            
            # 执行任务函数
            try:
                result = execute_task(task)
                # 存储结果到 backend
                redis.set(f"celery-task-meta-{task.id}", result)
            except Exception as e:
                # 重试或标记失败
                handle_failure(task, e)
任务执行函数
@shared_task(queue=AsyncWorkflowQueue.PROFESSIONAL_QUEUE)
def execute_workflow_professional(task_data_dict: dict[str, Any]):
    """Execute workflow for professional tier with highest priority
    执行专业版工作流,最高优先级"""
    # 🔍 1. 反序列化任务数据
    task_data = WorkflowTaskData.model_validate(task_data_dict)
    
    # 🔍 2. 创建调度计划实体
    cfs_plan_scheduler_entity = AsyncWorkflowCFSPlanEntity(
        queue=AsyncWorkflowQueue.PROFESSIONAL_QUEUE,
        schedule_strategy=AsyncWorkflowSystemStrategy,
        granularity=dify_config.ASYNC_WORKFLOW_SCHEDULER_GRANULARITY,
    )
    
    # ⭐⭐⭐⭐ 3. 执行通用工作流逻辑
    _execute_workflow_common(
        task_data,
        AsyncWorkflowCFSPlanScheduler(plan=cfs_plan_scheduler_entity),
        cfs_plan_scheduler_entity,
    )
**通用执行逻辑 **_execute_workflow_common
def _execute_workflow_common(
    task_data: WorkflowTaskData,
    cfs_plan_scheduler: AsyncWorkflowCFSPlanScheduler,
    cfs_plan_scheduler_entity: AsyncWorkflowCFSPlanEntity,
):
    """Execute workflow with common logic and trigger log updates.
    执行工作流的通用逻辑并更新触发日志。"""

    # 🔍 1. 创建新的数据库会话(与 API 进程隔离)
    # Create a new session for this task
    session_factory = sessionmaker(bind=db.engine, expire_on_commit=False)

    with session_factory() as session:
        trigger_log_repo = SQLAlchemyWorkflowTriggerLogRepository(session)

        # 🔍 2. 获取触发日志
        # Get trigger log
        trigger_log = trigger_log_repo.get_by_id(task_data.workflow_trigger_log_id)

        if not trigger_log:
            # This should not happen, but handle gracefully
            return

        # 🔍 3. 从触发日志重建执行数据
        # Reconstruct execution data from trigger log
        trigger_data = TriggerData.model_validate_json(trigger_log.trigger_data)

        # 🔍 4. 更新状态为运行中
        # Update status to running
        trigger_log.status = WorkflowTriggerStatus.RUNNING
        trigger_log_repo.update(trigger_log)
        session.commit()

        start_time = datetime.now(UTC)

        try:
            # 🔍 5. 获取 App 和 Workflow 模型
            # Get app and workflow models
            app_model = session.scalar(select(App).where(App.id == trigger_log.app_id))

            if not app_model:
                raise WorkflowNotFoundError(f"App not found: {trigger_log.app_id}")

            workflow = session.scalar(select(Workflow).where(Workflow.id == trigger_log.workflow_id))
            if not workflow:
                raise WorkflowNotFoundError(f"Workflow not found: {trigger_log.workflow_id}")

            user = _get_user(session, trigger_log)

            # 🔍 6. 执行工作流(核心)
            # Execute workflow using WorkflowAppGenerator
            generator = WorkflowAppGenerator()

            # Prepare args matching AppGenerateService.generate format
            args = _build_generator_args(trigger_data)

            # If workflow_id was specified, add it to args
            if trigger_data.workflow_id:
                args["workflow_id"] = str(trigger_data.workflow_id)

            # ⭐⭐⭐⭐⭐ 7. 执行工作流(这里会调用 LLM)
            # Execute the workflow with the trigger type
            generator.generate(
                app_model=app_model,
                workflow=workflow,
                user=user,
                args=args,
                invoke_from=InvokeFrom.SERVICE_API,
                streaming=False,
                call_depth=0,
                triggered_from=trigger_data.trigger_from,
                root_node_id=trigger_data.root_node_id,
                graph_engine_layers=[
                    # TODO: Re-enable TimeSliceLayer after the HITL release.
                    TriggerPostLayer(cfs_plan_scheduler_entity, start_time, trigger_log.id, session_factory),
                ],
            )

        except Exception as e:
            # 🔍 8. 异常处理:更新触发日志为失败状态
            # Calculate elapsed time for failed execution
            elapsed_time = (datetime.now(UTC) - start_time).total_seconds()

            # Update trigger log with failure
            trigger_log.status = WorkflowTriggerStatus.FAILED
            trigger_log.error = str(e)
            trigger_log.finished_at = datetime.now(UTC)
            trigger_log.elapsed_time = elapsed_time
            trigger_log_repo.update(trigger_log)

            # Final failure - no retry logic (simplified like RAG tasks)
            session.commit()

关键点注释

  1. 独立会话:Worker 创建新的数据库会话,与 API 进程隔离,避免长事务
  2. 状态追踪:PENDING → QUEUED → RUNNING → SUCCESS/FAILED
  3. 异常安全:即使执行失败,也会更新数据库状态
  4. 工作流执行<font style="color:#DF2A3F;">WorkflowAppGenerator.generate()</font> 是实际执行 Workflow 的地方

结果查询(可选)

用户可以通过 workflow_trigger_log_id 查询执行状态:

# API 端点
@app.route("/trigger_logs/<trigger_log_id>")
def get_trigger_log(trigger_log_id: str):
    result = AsyncWorkflowService.get_trigger_log(trigger_log_id)
    return jsonify(result)

返回数据示例

{
  "id": "trigger_log_123",
  "status": "success",
  "triggered_at": "2024-01-20T10:30:00Z",
  "finished_at": "2024-01-20T10:30:25Z",
  "elapsed_time": 25.3,
  "outputs": {
    "code_review": "代码质量良好,建议优化第23行..."
  },
  "total_tokens": 3500,
  "error": null
}

完整架构流程图

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述


关键函数递归解析

execute_workflow_professional.delay()

# 位置:tasks/async_workflow_tasks.py:35
@shared_task(queue=AsyncWorkflowQueue.PROFESSIONAL_QUEUE)
def execute_workflow_professional(task_data_dict: dict[str, Any]):
    """
    装饰器 @shared_task 的作用:
    1. 将函数注册为 Celery 任务
    2. 自动序列化/反序列化参数
    3. 提供 .delay() 和 .apply_async() 方法
    """
    pass

# 调用 .delay() 时:
task = execute_workflow_professional.delay({"workflow_trigger_log_id": "xxx"})

# 等价于:
task = execute_workflow_professional.apply_async(
    args=[{"workflow_trigger_log_id": "xxx"}],
    queue="workflow_professional",
    retry=False
)

底层 Redis 操作

# Celery 内部实现(简化)
def apply_async(self, args, kwargs, queue):
    # 1. 生成唯一任务 ID
    task_id = self._gen_task_id()
    
    # 2. 构造消息体
    message = {
        "id": task_id,
        "task": self.name,  # "tasks.async_workflow_tasks.execute_workflow_professional"
        "args": args,
        "kwargs": kwargs,
        "retries": 0,
        "eta": None,
        "expires": None,
    }
    
    # 3. 序列化(默认使用 JSON)
    serialized = json.dumps(message)
    
    # 4. 推送到 Redis 队列
    redis_client.lpush(queue, serialized)
    
    # 5. 返回 AsyncResult 对象
    return AsyncResult(task_id)

WorkflowAppGenerator.generate()

# 位置:core/app/apps/workflow/app_generator.py
class WorkflowAppGenerator:
    def generate(
        self,
        app_model: App,
        workflow: Workflow,
        user: Account | EndUser,
        args: dict,
        invoke_from: InvokeFrom,
        streaming: bool,
        call_depth: int,
        triggered_from: WorkflowRunTriggeredFrom | None = None,
        root_node_id: str | None = None,
        graph_engine_layers: list | None = None,
    ):
        """
        核心工作流执行引擎
        
        执行流程:
        1. 解析 Workflow Graph(节点和连接)
        2. 初始化执行上下文
        3. 按拓扑顺序执行节点
        4. 处理节点间的数据流
        5. 收集输出结果
        """
        # 1. 构建执行图
        graph = self._build_graph(workflow)
        
        # 2. 初始化变量池
        variable_pool = VariablePool()
        variable_pool.add_variables(args["inputs"])
        
        # 3. 执行节点
        for node in graph.topological_sort():
            if node.type == NodeType.LLM:
                # 调用 LLM
                result = self._execute_llm_node(node, variable_pool)
            elif node.type == NodeType.CODE:
                # 执行代码
                result = self._execute_code_node(node, variable_pool)
            elif node.type == NodeType.TOOL:
                # 调用工具
                result = self._execute_tool_node(node, variable_pool)
            
            # 更新变量池
            variable_pool.add_variable(node.id, result)
        
        # 4. 返回结果
        return variable_pool.get_outputs()

TriggerPostLayer —— 结果回写

# 位置:core/app/layers/trigger_post_layer.py(推测)
class TriggerPostLayer:
    """
    工作流执行完成后的回调层
    负责将结果写回 WorkflowTriggerLog
    """
    def __init__(self, cfs_plan_entity, start_time, trigger_log_id, session_factory):
        self.start_time = start_time
        self.trigger_log_id = trigger_log_id
        self.session_factory = session_factory
    
    def on_workflow_completed(self, outputs, total_tokens):
        """工作流完成时调用"""
        with self.session_factory() as session:
            trigger_log = session.get(WorkflowTriggerLog, self.trigger_log_id)
            
            # 更新结果
            trigger_log.status = WorkflowTriggerStatus.SUCCESS
            trigger_log.outputs = json.dumps(outputs)
            trigger_log.total_tokens = total_tokens
            trigger_log.elapsed_time = (datetime.now(UTC) - self.start_time).total_seconds()
            trigger_log.finished_at = datetime.now(UTC)
            
            session.commit()

实战示例:完整数据流

场景:GitHub PR 触发代码审查

GPT-4 API PostgreSQL Celery Worker Redis API Server GitHub GPT-4 API PostgreSQL Celery Worker Redis API Server GitHub === 用户创建 PR AsyncWorkflowService.trigger_workflow_async === Worker 异步处理 === WorkflowAppGenerator.generate() GPT-4 分析代码(耗时15秒) === 用户查询结果 === POST /webhooks/github Body: {pr_number: 123, diff: "..."} INSERT workflow_trigger_logs (id=log_abc, status=pending) LPUSH workflow_professional {"workflow_trigger_log_id": "log_abc"} UPDATE workflow_trigger_logs SET status=queued, celery_task_id=task_123 200 OK {task_id: "task_123", status: "queued"} BRPOP workflow_professional (阻塞等待) {"workflow_trigger_log_id": "log_abc"} SELECT * FROM workflow_trigger_logs WHERE id=log_abc {status: queued, app_id: app_789, ...} UPDATE SET status=running SELECT * FROM workflows WHERE id=workflow_abc {graph: {...}, nodes: [...]} POST /v1/chat/completions Prompt: "Review this code:\n{diff}" {review: "代码质量良好,建议..."} UPDATE workflow_trigger_logs SET status=success, outputs='{"review": "..."}', elapsed_time=15.3, total_tokens=2500 GET /trigger_logs/log_abc SELECT * FROM workflow_trigger_logs WHERE id=log_abc {status: success, outputs: {...}} 200 OK {status: "success", review: "..."}

时间线分析

时刻 事件 耗时 说明
T+0ms GitHub 发送 Webhook - 用户创建 PR
T+50ms API 创建 trigger_log 10ms 数据库写入
T+55ms API 推送到 Redis 5ms LPUSH 操作
T+60ms API 返回响应 5ms HTTP 200
T+100ms Worker 拉取任务 - BRPOP 返回
T+150ms Worker 查询数据库 50ms SELECT
T+15.2s GPT-4 返回结果 15s LLM 调用
T+15.3s Worker 更新数据库 100ms UPDATE
T+20s 用户查询结果 - GET API

关键指标

  • API 响应时间:60ms(用户无感知)
  • 实际执行时间:15.3s(异步完成)
  • 并发能力:API 可同时处理 上千 请求

关键配置文件

Celery 配置 (extensions/ext_celery.py)

Dify 使用 Celery 作为分布式任务队列,配置文件位于 ext_celery.py

celery_app.conf.update(
    # Broker 配置
    broker=dify_config.CELERY_BROKER_URL,  # redis://localhost:6379/0
    
    # Result Backend
    result_backend=dify_config.CELERY_RESULT_BACKEND,
    
    # 任务忽略结果(减轻 Redis 负载)
    task_ignore_result=True,
    
    # 连接重试
    broker_connection_retry_on_startup=True,
    
    # 时区
    timezone=pytz.timezone("UTC"),
    
    # 任务序列化
    task_serializer='json',
    accept_content=['json'],
    result_serializer='json',
)
celery_app = Celery(
    app.name,
    task_cls=FlaskTask,
    broker=dify_config.CELERY_BROKER_URL,
    backend=dify_config.CELERY_BACKEND,
)

celery_app.conf.update(
    result_backend=dify_config.CELERY_RESULT_BACKEND,
    broker_transport_options=broker_transport_options,
    broker_connection_retry_on_startup=True,
    worker_log_format=dify_config.LOG_FORMAT,
    worker_task_log_format=dify_config.LOG_FORMAT,
    worker_hijack_root_logger=False,
    timezone=pytz.timezone(dify_config.LOG_TZ or "UTC"),
    task_ignore_result=True,
)

# Apply SSL configuration if enabled
ssl_options = _get_celery_ssl_options()
if ssl_options:
    celery_app.conf.update(
        broker_use_ssl=ssl_options,
        # Also apply SSL to the backend if it's Redis
        redis_backend_use_ssl=ssl_options if dify_config.CELERY_BACKEND == "redis" else None,
    )

    if dify_config.LOG_FILE:
        celery_app.conf.update(
            worker_logfile=dify_config.LOG_FILE,
        )

celery_app.set_default()
app.extensions["celery"] = celery_app

核心配置解读

  • broker: Redis 作为消息代理(Broker),存储待执行任务
  • backend: 任务结果存储,可选 Redis/Database/RabbitMQ
  • broker_connection_retry_on_startup: 启动时自动重连,防止 Redis 临时不可用导致服务崩溃
  • task_ignore_result: 忽略结果存储,减轻 Redis 负载(适用于不需要获取任务结果的场景)
Redis Sentinel 高可用配置
if dify_config.CELERY_USE_SENTINEL:
    broker_transport_options = {
        "master_name": dify_config.CELERY_SENTINEL_MASTER_NAME,
        "sentinel_kwargs": {
            "socket_timeout": dify_config.CELERY_SENTINEL_SOCKET_TIMEOUT,
            "password": dify_config.CELERY_SENTINEL_PASSWORD,
        },
    }

高可用保障

  • 支持 Redis Sentinel 模式,实现主从切换
  • 当 Master 节点故障时,Sentinel 自动选举新 Master
  • 防止 Redis 单点故障导致任务队列不可用

队列定义 (tasks/workflow_cfs_scheduler/entities.py)

if dify_config.EDITION == "CLOUD":
    # 云版:三个队列
    _professional_queue = "workflow_professional"
    _team_queue = "workflow_team"
    _sandbox_queue = "workflow_sandbox"
else:
    # 社区版:单队列
    _professional_queue = "workflow"
    _team_queue = "workflow"
    _sandbox_queue = "workflow"

Worker 启动脚本

#!/bin/bash
# start_workers.sh

# 启动专业版队列 Worker(8个并发)
celery -A celery worker \
    -Q workflow_professional \
    -c 8 \
    --pool=gevent \
    --loglevel=info \
    --logfile=/var/log/celery/professional.log &

# 启动团队版队列 Worker(4个并发)
celery -A celery worker \
    -Q workflow_team \
    -c 4 \
    --pool=gevent \
    --loglevel=info \
    --logfile=/var/log/celery/team.log &

# 启动免费版队列 Worker(2个并发)
celery -A celery worker \
    -Q workflow_sandbox \
    -c 2 \
    --pool=gevent \
    --loglevel=info \
    --logfile=/var/log/celery/sandbox.log &

总结:异步架构的核心优势

对比同步 vs 异步

维度 同步架构 异步架构(Celery + Redis)
API 响应时间 10-30秒 50-100ms
并发能力 40(连接池限制) 1000+(瞬时入队)
失败处理 用户感知超时 后台重试,用户无感
资源利用 API Worker 阻塞 API/Worker 解耦,独立扩展
监控追踪 困难 通过 trigger_log_id 完整追踪

关键设计原则

  1. 非阻塞入队:API 只负责验证和入队,立即返回
  2. 状态追踪:通过 WorkflowTriggerLog 全程记录
  3. 队列隔离:按租户等级分流,防止资源滥用
  4. 独立会话:Worker 使用独立数据库会话,避免长事务
  5. 异常安全:即使失败也更新状态,不丢失追踪

这套架构让 Dify 能够支撑每秒数千并发请求,同时保持 API 响应速度在毫秒级!

轻重任务队列分离 —— 按租户等级分流

整体架构流程图

在这里插入图片描述
在这里插入图片描述


场景示例

假设三个用户同时触发工作流:

  • Alice(专业版):生成市场分析报告
  • Bob(团队版):批量处理客户数据
  • Charlie(免费版):测试简单工作流

API 入口 - 接收请求

文件位置: api/services/async_workflow_service.py

# 第52-80行:AsyncWorkflowService.trigger_workflow_async
@classmethod
def trigger_workflow_async(
    cls, session: Session, user: Union[Account, EndUser], trigger_data: TriggerData
) -> AsyncTriggerResponse:
    """
        Universal entry point for async workflow execution - THIS METHOD WILL NOT BLOCK

        Creates a trigger log and dispatches to appropriate queue based on subscription tier.
        The workflow execution happens asynchronously in the background via Celery workers.
        This method returns immediately after queuing the task, not after execution completion.

        Args:
            session: Database session to use for operations
            user: User (Account or EndUser) who initiated the workflow trigger
            trigger_data: Validated Pydantic model containing trigger information

        Returns:
            AsyncTriggerResponse with workflow_trigger_log_id, task_id, status="queued", and queue
            Note: The actual workflow execution status must be checked separately via workflow_trigger_log_id

        Raises:
            WorkflowNotFoundError: If app or workflow not found
            InvokeDailyRateLimitError: If daily rate limit exceeded

        Behavior:
            - Non-blocking: Returns immediately after queuing
            - Asynchronous: Actual execution happens in background Celery workers
            - Status tracking: Use workflow_trigger_log_id to monitor progress
            - Queue-based: Routes to different queues based on subscription tier
   """

关键注释解读

  • 🔸 非阻塞:方法立即返回,不等待工作流执行完成
  • 🔸 异步执行:实际执行在后台 Celery Worker 中进行
  • 🔸 状态追踪:通过 workflow_trigger_log_id 监控执行状态

示例调用

# Alice(专业版用户)触发工作流
trigger_data = TriggerData(
    tenant_id="tenant_alice_123",
    app_id="app_market_analysis",
    workflow_id="workflow_001",
    inputs={"topic": "AI市场趋势"},
    trigger_type="api"
)

response = AsyncWorkflowService.trigger_workflow_async(
    session=db_session,
    user=alice_user,
    trigger_data=trigger_data
)
# 立即返回: AsyncTriggerResponse(
#   workflow_trigger_log_id="log_abc123",
#   task_id="celery_task_xyz789",
#   status="queued",
#   queue="workflow_professional"
# )

获取队列分发器

文件位置: api/services/async_workflow_service.py:82-94

        trigger_log_repo = SQLAlchemyWorkflowTriggerLogRepository(session)
        dispatcher_manager = QueueDispatcherManager()
        workflow_service = WorkflowService()

        # 1. Validate app exists
        app_model = session.scalar(select(App).where(App.id == trigger_data.app_id))
        if not app_model:
            raise WorkflowNotFoundError(f"App not found: {trigger_data.app_id}")

        # 2. Get workflow
        workflow = cls._get_workflow(workflow_service, app_model, trigger_data.workflow_id)

        # 3. Get dispatcher based on tenant subscription
        dispatcher = dispatcher_manager.get_dispatcher(trigger_data.tenant_id)

关键步骤注释

  • Step 1: 验证应用是否存在
  • Step 2: 获取工作流配置
  • Step 3: 🎯 核心 - 根据租户订阅获取分发器

查询租户订阅计划

文件位置: api/services/workflow/queue_dispatcher.py:80-106

    def get_dispatcher(cls, tenant_id: str) -> BaseQueueDispatcher:
        """
        Get dispatcher based on tenant's subscription plan

        Args:
            tenant_id: The tenant identifier

        Returns:
            Appropriate queue dispatcher instance
        """
        # 🔸 Step 3.1: 检查是否启用计费
        if dify_config.BILLING_ENABLED:
            try:
                # 🔸 Step 3.2: 从计费服务获取租户信息
                billing_info = BillingService.get_info(tenant_id)
                plan = billing_info.get("subscription", {}).get("plan", "sandbox")
            except Exception:
                # 🔸 Step 3.3: 计费服务失败时默认 sandbox
                # If billing service fails, default to sandbox
                plan = "sandbox"
        else:
            # 🔸 Step 3.4: 未启用计费时默认 team
            # If billing is disabled, use team tier as default
            plan = "team"

        # 🔸 Step 3.5: 根据计划选择分发器类
        dispatcher_class = cls.PLAN_DISPATCHER_MAP.get(
            plan,
            SandboxQueueDispatcher,  # Default to sandbox for unknown plans
        )

        return dispatcher_class()  # type: ignore
分发器映射表:
    # Mapping of billing plans to dispatchers
    PLAN_DISPATCHER_MAP = {
        "professional": ProfessionalQueueDispatcher,
        "team": TeamQueueDispatcher,
        "sandbox": SandboxQueueDispatcher,
        # Add new tiers here as they're created
        # For any unknown plan, default to sandbox
    }
时序图 - 订阅查询流程:
Redis Cache Billing API BillingService dify_config QueueDispatcherManager Redis Cache Billing API BillingService dify_config QueueDispatcherManager alt [缓存命中] [缓存未命中] alt [计费启用] [计费未启用] 检查 BILLING_ENABLED get_info(tenant_id) 查询缓存 tenant_plan:alice_123 {plan: "professional"} GET /subscription/info {subscription: {plan: "professional"}} 缓存10分钟 {plan: "professional"} PLAN_DISPATCHER_MAP["professional"] ProfessionalQueueDispatcher() 默认 TeamQueueDispatcher()

示例执行

# Alice 的租户 ID
tenant_id = "tenant_alice_123"

# QueueDispatcherManager.get_dispatcher 执行流程:
# 1. 调用 BillingService.get_info("tenant_alice_123")
# 2. 返回: {"subscription": {"plan": "professional"}}
# 3. plan = "professional"
# 4. dispatcher_class = ProfessionalQueueDispatcher
# 5. 返回: ProfessionalQueueDispatcher() 实例

分发器返回队列名称

文件位置: api/services/workflow/queue_dispatcher.py:37-65

专业版分发器

class ProfessionalQueueDispatcher(BaseQueueDispatcher):
    """Dispatcher for professional tier"""

    def get_queue_name(self) -> str:
        return QueuePriority.PROFESSIONAL

    def get_priority(self) -> int:
        return 100

团队版分发器

class TeamQueueDispatcher(BaseQueueDispatcher):
    """Dispatcher for team tier"""

    def get_queue_name(self) -> str:
        return QueuePriority.TEAM

    def get_priority(self) -> int:
        return 50

免费版分发器

class SandboxQueueDispatcher(BaseQueueDispatcher):
    """Dispatcher for free/sandbox tier"""

    def get_queue_name(self) -> str:
        return QueuePriority.SANDBOX

    def get_priority(self) -> int:
        return 10

队列优先级定义

class QueuePriority(StrEnum):
    """Queue priorities for different subscription tiers"""

    PROFESSIONAL = "workflow_professional"  # Highest priority
    TEAM = "workflow_team"
    SANDBOX = "workflow_sandbox"  # Free tier

优先级对比表

租户等级 队列名称 优先级分数 Worker 数量 预期响应时间
Professional workflow_professional 100 8 < 5 秒
Team workflow_team 50 4 < 30 秒
Sandbox workflow_sandbox 10 2 < 5 分钟

创建触发日志

文件位置: api/services/async_workflow_service.py:106-132

        # 6. Create trigger log entry first (for tracking)
        trigger_log = WorkflowTriggerLog(
            tenant_id=trigger_data.tenant_id,
            app_id=trigger_data.app_id,
            workflow_id=workflow.id,
            root_node_id=trigger_data.root_node_id,
            trigger_metadata=(
                trigger_data.trigger_metadata.model_dump_json() if trigger_data.trigger_metadata else "{}"
            ),
            trigger_type=trigger_data.trigger_type,
            workflow_run_id=None,
            outputs=None,
            trigger_data=trigger_data.model_dump_json(),
            inputs=json.dumps(dict(trigger_data.inputs)),
            status=WorkflowTriggerStatus.PENDING,
            queue_name=dispatcher.get_queue_name(),  # 🔸 记录队列名称
            retry_count=0,
            created_by_role=created_by_role,
            created_by=created_by,
            celery_task_id=None,
            error=None,
            elapsed_time=None,
            total_tokens=None,
        )

        trigger_log = trigger_log_repo.create(trigger_log)
        session.commit()

关键字段解读

  • <font style="color:#DF2A3F;">queue_name</font>: 记录任务被分配到哪个队列(用于监控和调试)
  • <font style="color:#DF2A3F;">status</font>: 初始状态为 <font style="color:#DF2A3F;">PENDING</font>(待处理)
  • celery_task_id: 稍后会更新为 Celery 任务 ID

示例数据

# Alice 的触发日志
{
    "id": "log_abc123",
    "tenant_id": "tenant_alice_123",
    "app_id": "app_market_analysis",
    "workflow_id": "workflow_001",
    "status": "PENDING",
    "queue_name": "workflow_professional",  # ✅ 专业版队列
    "celery_task_id": None,  # 待分配
    "created_at": "2024-01-20 10:30:00"
}

配额检查

文件位置: api/services/async_workflow_service.py:134-146

        # 7. Check and consume quota
        try:
            QuotaType.WORKFLOW.consume(trigger_data.tenant_id)
        except QuotaExceededError as e:
            # Update trigger log status
            trigger_log.status = WorkflowTriggerStatus.RATE_LIMITED
            trigger_log.error = f"Quota limit reached: {e}"
            trigger_log_repo.update(trigger_log)
            session.commit()

            raise WorkflowQuotaLimitError(
                f"Workflow execution quota limit reached for tenant {trigger_data.tenant_id}"
            ) from e

配额机制

  • 专业版:1000 次/天
  • 团队版:500 次/天
  • 免费版:50 次/天

示例

# Charlie(免费版)已执行 50 次,再次触发会失败
try:
    QuotaType.WORKFLOW.consume("tenant_charlie_456")
except QuotaExceededError:
    # 更新状态为 RATE_LIMITED
    # 抛出 WorkflowQuotaLimitError

分发到对应队列

文件位置: api/services/async_workflow_service.py:148-176

        # 8. Create task data
        queue_name = dispatcher.get_queue_name()

        task_data = WorkflowTaskData(workflow_trigger_log_id=trigger_log.id)

        # 9. Dispatch to appropriate queue
        task_data_dict = task_data.model_dump(mode="json")

        task: AsyncResult[Any] | None = None
        # 🔸 根据队列名称选择对应的 Celery 任务
        if queue_name == QueuePriority.PROFESSIONAL:
            task = execute_workflow_professional.delay(task_data_dict)  # type: ignore
        elif queue_name == QueuePriority.TEAM:
            task = execute_workflow_team.delay(task_data_dict)  # type: ignore
        else:  # SANDBOX
            task = execute_workflow_sandbox.delay(task_data_dict)  # type: ignore

        # 10. Update trigger log with task info
        trigger_log.status = WorkflowTriggerStatus.QUEUED
        trigger_log.celery_task_id = task.id
        trigger_log.triggered_at = datetime.now(UTC)
        trigger_log_repo.update(trigger_log)
        session.commit()

        return AsyncTriggerResponse(
            workflow_trigger_log_id=trigger_log.id,
            task_id=task.id,  # type: ignore
            status="queued",
            queue=queue_name,
        )

任务分发逻辑流程图

在这里插入图片描述

示例执行

# Alice(专业版)
queue_name = "workflow_professional"
task_data_dict = {"workflow_trigger_log_id": "log_abc123"}

# 执行分支 1
task = execute_workflow_professional.delay(task_data_dict)
# Celery 内部操作:
# Redis LPUSH workflow_professional '{"workflow_trigger_log_id": "log_abc123"}'

# task.id = "celery_task_xyz789"

# 更新日志
trigger_log.status = WorkflowTriggerStatus.QUEUED
trigger_log.celery_task_id = "celery_task_xyz789"

# 返回响应
return AsyncTriggerResponse(
    workflow_trigger_log_id="log_abc123",
    task_id="celery_task_xyz789",
    status="queued",
    queue="workflow_professional"
)

Celery 任务定义

文件位置: api/tasks/async_workflow_tasks.py:35-80

专业版任务
@shared_task(queue=AsyncWorkflowQueue.PROFESSIONAL_QUEUE)
def execute_workflow_professional(task_data_dict: dict[str, Any]):
    """Execute workflow for professional tier with highest priority"""
    task_data = WorkflowTaskData.model_validate(task_data_dict)
    cfs_plan_scheduler_entity = AsyncWorkflowCFSPlanEntity(
        queue=AsyncWorkflowQueue.PROFESSIONAL_QUEUE,
        schedule_strategy=AsyncWorkflowSystemStrategy,
        granularity=dify_config.ASYNC_WORKFLOW_SCHEDULER_GRANULARITY,
    )
    _execute_workflow_common(
        task_data,
        AsyncWorkflowCFSPlanScheduler(plan=cfs_plan_scheduler_entity),
        cfs_plan_scheduler_entity,
    )
团队版任务
@shared_task(queue=AsyncWorkflowQueue.TEAM_QUEUE)
def execute_workflow_team(task_data_dict: dict[str, Any]):
    """Execute workflow for team tier"""
    task_data = WorkflowTaskData.model_validate(task_data_dict)
    cfs_plan_scheduler_entity = AsyncWorkflowCFSPlanEntity(
        queue=AsyncWorkflowQueue.TEAM_QUEUE,
        schedule_strategy=AsyncWorkflowSystemStrategy,
        granularity=dify_config.ASYNC_WORKFLOW_SCHEDULER_GRANULARITY,
    )
    _execute_workflow_common(
        task_data,
        AsyncWorkflowCFSPlanScheduler(plan=cfs_plan_scheduler_entity),
        cfs_plan_scheduler_entity,
    )
免费版任务
@shared_task(queue=AsyncWorkflowQueue.SANDBOX_QUEUE)
def execute_workflow_sandbox(task_data_dict: dict[str, Any]):
    """Execute workflow for free tier with lower retry limit"""
    task_data = WorkflowTaskData.model_validate(task_data_dict)
    cfs_plan_scheduler_entity = AsyncWorkflowCFSPlanEntity(
        queue=AsyncWorkflowQueue.SANDBOX_QUEUE,
        schedule_strategy=AsyncWorkflowSystemStrategy,
        granularity=dify_config.ASYNC_WORKFLOW_SCHEDULER_GRANULARITY,
    )
    _execute_workflow_common(
        task_data,
        AsyncWorkflowCFSPlanScheduler(plan=cfs_plan_scheduler_entity),
        cfs_plan_scheduler_entity,
    )
任务装饰器解读:
@shared_task(queue=AsyncWorkflowQueue.PROFESSIONAL_QUEUE)
#            ↑
#            指定任务推送到哪个队列
队列名称映射:
# Determine queue names based on edition
if dify_config.EDITION == "CLOUD":
    # Cloud edition: separate queues for different tiers
    _professional_queue = "workflow_professional"
    _team_queue = "workflow_team"
    _sandbox_queue = "workflow_sandbox"
    AsyncWorkflowSystemStrategy = WorkflowScheduleCFSPlanEntity.Strategy.TimeSlice
else:
    # Community edition: single workflow queue (not dataset)
    _professional_queue = "workflow"
    _team_queue = "workflow"
    _sandbox_queue = "workflow"
    AsyncWorkflowSystemStrategy = WorkflowScheduleCFSPlanEntity.Strategy.Nop


class AsyncWorkflowQueue(StrEnum):
    # Define constants
    PROFESSIONAL_QUEUE = _professional_queue
    TEAM_QUEUE = _team_queue
    SANDBOX_QUEUE = _sandbox_queue

云版 vs 社区版差异

版本 专业版队列 团队版队列 免费版队列 调度策略
CLOUD <font style="color:#DF2A3F;">workflow_professional</font> workflow_team workflow_sandbox TimeSlice
社区版 <font style="color:#DF2A3F;">workflow</font> workflow workflow Nop

Worker 拉取与执行

Celery Worker 启动配置

# 专业版 Worker(8个进程,高优先级)
celery -A celery worker \
  -Q workflow_professional \
  -c 8 \
  --prefetch-multiplier 1 \
  --max-tasks-per-child 1000 \
  --loglevel INFO

# 团队版 Worker(4个进程)
celery -A celery worker \
  -Q workflow_team \
  -c 4 \
  --prefetch-multiplier 2 \
  --max-tasks-per-child 500

# 免费版 Worker(2个进程,低优先级)
celery -A celery worker \
  -Q workflow_sandbox \
  -c 2 \
  --prefetch-multiplier 4 \
  --max-tasks-per-child 100

Worker 拉取流程

Database Task Executor Celery Worker Redis Broker Database Task Executor Celery Worker Redis Broker 阻塞等待任务 {"workflow_trigger_log_id": "log_abc123"} WorkflowAppGenerator.generate() BRPOP workflow_professional 0 task_data_dict 反序列化任务数据 execute_workflow_professional(task_data_dict) 查询触发日志 trigger_log 更新状态为 RUNNING 保存状态 执行工作流逻辑 保存执行结果 更新状态为 SUCCESS 任务完成 ACK 确认

任务执行核心逻辑

文件位置: api/tasks/async_workflow_tasks.py:94-177

def _execute_workflow_common(
    task_data: WorkflowTaskData,
    cfs_plan_scheduler: AsyncWorkflowCFSPlanScheduler,
    cfs_plan_scheduler_entity: AsyncWorkflowCFSPlanEntity,
):
    """Execute workflow with common logic and trigger log updates."""

    # 🔸 ⭐⭐⭐Step 10.1: 创建独立的数据库会话
    # Create a new session for this task
    session_factory = sessionmaker(bind=db.engine, expire_on_commit=False)

    with session_factory() as session:
        trigger_log_repo = SQLAlchemyWorkflowTriggerLogRepository(session)

        # 🔸 Step 10.2: 获取触发日志
        # Get trigger log
        trigger_log = trigger_log_repo.get_by_id(task_data.workflow_trigger_log_id)

        if not trigger_log:
            # This should not happen, but handle gracefully
            return

        # 🔸 Step 10.3: 重建执行数据
        # Reconstruct execution data from trigger log
        trigger_data = TriggerData.model_validate_json(trigger_log.trigger_data)

        # 🔸 Step 10.4: 更新状态为 RUNNING
        # Update status to running
        trigger_log.status = WorkflowTriggerStatus.RUNNING
        trigger_log_repo.update(trigger_log)
        session.commit()

        start_time = datetime.now(UTC)

        try:
            # 🔸 Step 10.5: 获取应用和工作流模型
            # Get app and workflow models
            app_model = session.scalar(select(App).where(App.id == trigger_log.app_id))

            if not app_model:
                raise WorkflowNotFoundError(f"App not found: {trigger_log.app_id}")

            workflow = session.scalar(select(Workflow).where(Workflow.id == trigger_log.workflow_id))
            if not workflow:
                raise WorkflowNotFoundError(f"Workflow not found: {trigger_log.workflow_id}")

            user = _get_user(session, trigger_log)

            # 🔸 ⭐⭐⭐⭐⭐Step 10.6: 执行工作流
            # Execute workflow using WorkflowAppGenerator
            generator = WorkflowAppGenerator()

            # Prepare args matching AppGenerateService.generate format
            args = _build_generator_args(trigger_data)

            # If workflow_id was specified, add it to args
            if trigger_data.workflow_id:
                args["workflow_id"] = str(trigger_data.workflow_id)

            # Execute the workflow with the trigger type
            generator.generate(
                app_model=app_model,
                workflow=workflow,
                user=user,
                args=args,
                invoke_from=InvokeFrom.SERVICE_API,
                streaming=False,
                call_depth=0,
                triggered_from=trigger_data.trigger_from,
                root_node_id=trigger_data.root_node_id,
                graph_engine_layers=[
                    # TODO: Re-enable TimeSliceLayer after the HITL release.
                    TriggerPostLayer(cfs_plan_scheduler_entity, start_time, trigger_log.id, session_factory),
                ],
            )

        except Exception as e:
            # 🔸 Step 10.7: 异常处理
            # Calculate elapsed time for failed execution
            elapsed_time = (datetime.now(UTC) - start_time).total_seconds()

            # Update trigger log with failure
            trigger_log.status = WorkflowTriggerStatus.FAILED
            trigger_log.error = str(e)
            trigger_log.finished_at = datetime.now(UTC)
            trigger_log.elapsed_time = elapsed_time
            trigger_log_repo.update(trigger_log)

            # Final failure - no retry logic (simplified like RAG tasks)
            session.commit()

关键步骤注释解读

  1. 独立会话:每个任务使用独立的数据库会话,避免事务冲突
  2. 状态追踪:PENDING → RUNNING → SUCCESS/FAILED
  3. 异常处理:捕获所有异常并记录到日志,不阻塞队列
  4. 工作流执行:调用 <font style="color:#DF2A3F;">WorkflowAppGenerator.generate()</font> 真正执行业务逻辑

完整时序图 - 三个用户并发场景

Worker-Sandbox Worker-Team Worker-Pro Redis Broker BillingService QueueDispatcherManager AsyncWorkflowService Charlie (免费版) Bob (团队版) Alice (专业版) Worker-Sandbox Worker-Team Worker-Pro Redis Broker BillingService QueueDispatcherManager AsyncWorkflowService Charlie (免费版) Bob (团队版) Alice (专业版) par [三个用户同时请求] par [并发查询订阅] par [分发到不同队列] par [API 立即返回] ✅ API 请求已完成(非阻塞) par [Worker 并发拉取] 高优先级,8个并发 中优先级,4个并发 低优先级,2个并发 par [Worker 并发执行] Alice: 5秒完成 Bob: 20秒完成 Charlie: 2分钟完成 触发工作流 触发工作流 触发工作流 get_dispatcher(alice) get_dispatcher(bob) get_dispatcher(charlie) get_info(alice) {plan: "professional"} get_info(bob) {plan: "team"} get_info(charlie) {plan: "sandbox"} LPUSH workflow_professional LPUSH workflow_team LPUSH workflow_sandbox {queue: "professional", status: "queued"} {queue: "team", status: "queued"} {queue: "sandbox", status: "queued"} BRPOP workflow_professional Alice 的任务 BRPOP workflow_team Bob 的任务 BRPOP workflow_sandbox Charlie 的任务 执行 Alice 的工作流 执行 Bob 的工作流 执行 Charlie 的工作流

实际场景对比

场景 1:高峰期负载

假设:同时有 100 个请求

  • 50 个专业版用户
  • 30 个团队版用户
  • 20 个免费版用户

传统单队列模式

所有任务混在一起 → 100个任务排队
免费用户的大任务阻塞付费用户 → 付费用户体验差

多队列分离模式

workflow_professional: 50个任务 → 8个Worker并发处理 → 平均响应 6.25秒
workflow_team:        30个任务 → 4个Worker并发处理 → 平均响应 7.5秒
workflow_sandbox:     20个任务 → 2个Worker并发处理 → 平均响应 10秒

✅ 付费用户不受免费用户影响

场景 2:免费用户滥用

攻击:免费用户发起 1000 个并发请求

单队列模式后果

所有用户的任务都被阻塞 → 系统瘫痪

多队列模式防护

workflow_sandbox 队列积压 1000 个任务
workflow_professional 和 workflow_team 队列正常运行
免费用户自己等待,不影响付费用户 ✅

监控与调优

关键监控指标

# 队列长度监控
redis-cli LLEN workflow_professional  # 应 < 100
redis-cli LLEN workflow_team          # 应 < 200
redis-cli LLEN workflow_sandbox       # 可 > 1000

# Worker 状态监控
celery -A celery inspect active
celery -A celery inspect stats

# 任务执行速度
SELECT 
    queue_name,
    AVG(elapsed_time) as avg_time,
    COUNT(*) as total_tasks
FROM workflow_trigger_log
WHERE created_at > NOW() - INTERVAL '1 hour'
GROUP BY queue_name;

动态扩缩容策略

# Kubernetes HPA 配置
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: celery-worker-professional
spec:
  scaleTargetRef:
    name: celery-worker-professional
  minReplicas: 8
  maxReplicas: 50
  metrics:
  - type: External
    external:
      metric:
        name: redis_queue_length
        selector:
          matchLabels:
            queue: workflow_professional
      target:
        type: AverageValue
        averageValue: "20"  # 队列长度>20自动扩容

总结:轻重任务分离的核心价值

架构优势

  1. 服务质量保障(QoS)
    • 付费用户享受独立资源池
    • 响应速度可预测
  2. 资源隔离
    • 免费用户无法消耗付费用户资源
    • 单个租户故障不影响其他租户
  3. 弹性扩展
    • 可针对不同队列独立扩容
    • 按需分配计算资源
  4. 成本优化
    • 高价值用户获得优先级
    • 低价值用户使用共享资源

关键实现要点

  1. 订阅计划查询BillingService.get_info()
  2. 分发器模式QueueDispatcherManager
  3. 多队列路由@shared_task(queue=...)
  4. Worker 分组celery -Q workflow_professional

数据库连接池管理

整体架构流程图

连接回收

Celery任务处理

API请求处理

连接池配置

应用启动阶段

提供连接

提供连接

app.py启动

create_app

initialize_extensions

ext_database.init_app

db.init_app - Flask-SQLAlchemy

创建 Engine + 连接池

ext_session_factory.init_app

configure_session_factory

SQLALCHEMY_POOL_SIZE=30

SQLALCHEMY_MAX_OVERFLOW=10

SQLALCHEMY_POOL_RECYCLE=3600

SQLALCHEMY_POOL_TIMEOUT=30

QueuePool

HTTP请求到达

Flask before_request

init_request_context

Controller处理

需要数据库?

db.session或Session

直接返回

从连接池获取连接

执行SQL查询

提交/回滚

连接归还池

Celery Worker

任务开始

sessionmaker创建Session

with session_factory as session

从连接池获取连接

执行任务逻辑

session.commit

session.close - 退出with

连接归还池

连接使用超过3600秒

POOL_RECYCLE触发

关闭旧连接

创建新连接


应用启动 - 连接池初始化

入口:app.py 启动

# api/app.py
from app_factory import create_app

app = create_app()
celery = app.extensions["celery"]

应用工厂创建

文件位置api/app_factory.py

def create_app() -> DifyApp:
    start_time = time.perf_counter()
    app = create_flask_app_with_configs()
    initialize_extensions(app)
    end_time = time.perf_counter()
    if dify_config.DEBUG:
        logger.info("Finished create_app (%s ms)", round((end_time - start_time) * 1000, 2))
    return app

关键步骤注释

  • create_flask_app_with_configs():创建 Flask 应用并加载配置
  • <font style="color:#DF2A3F;">initialize_extensions(app)</font>核心! 初始化所有扩展,包括数据库

扩展初始化顺序

文件位置api/app_factory.py

def initialize_extensions(app: DifyApp):
    from extensions import (
        ext_app_metrics,
        ext_blueprints,
        ext_celery,
        ext_code_based_extension,
        ext_commands,
        ext_compress,
        ext_database,
        ext_forward_refs,
        ext_hosting_provider,
        ext_import_modules,
        ext_logging,
        ext_login,
        ext_logstore,
        ext_mail,
        ext_migrate,
        ext_orjson,
        ext_otel,
        ext_proxy_fix,
        ext_redis,
        ext_request_logging,
        ext_sentry,
        ext_session_factory,
        ext_set_secretkey,
        ext_storage,
        ext_timezone,
        ext_warnings,
    )

    extensions = [
        ext_timezone,
        ext_logging,
        ext_warnings,
        ext_import_modules,
        ext_orjson,
        ext_forward_refs,
        ext_set_secretkey,
        ext_compress,
        ext_code_based_extension,
        ext_database,
        ext_app_metrics,
        ext_migrate,
        ext_redis,
        ext_storage,
        ext_logstore,  # Initialize logstore after storage, before celery
        ext_celery,
        ext_login,
        ext_mail,
        ext_hosting_provider,
        ext_sentry,
        ext_proxy_fix,
        ext_blueprints,
        ext_commands,
        ext_otel,
        ext_request_logging,
        ext_session_factory,
    ]
    for ext in extensions:
        short_name = ext.__name__.split(".")[-1]
        is_enabled = ext.is_enabled() if hasattr(ext, "is_enabled") else True
        if not is_enabled:
            if dify_config.DEBUG:
                logger.info("Skipped %s", short_name)
            continue

        start_time = time.perf_counter()
        ext.init_app(app)
        end_time = time.perf_counter()
        if dify_config.DEBUG:
            logger.info("Loaded %s (%s ms)", short_name, round((end_time - start_time) * 1000, 2))

关键注释

  • ext_database:第10个加载,初始化连接池
  • ext_session_factory:最后加载,配置会话工厂

数据库扩展初始化

文件位置api/extensions/ext_database.py

def init_app(app: DifyApp):
    db.init_app(app)
    _setup_gevent_compatibility()

    # Eagerly build the engine so pool_size/max_overflow/etc. come from config
    try:
        with app.app_context():
            _ = db.engine  # triggers engine creation with the configured options
    except Exception:
        logger.exception("Failed to initialize SQLAlchemy engine during app startup")

关键步骤解析

  1. db.init_app(app)
    • <font style="color:#DF2A3F;">db</font><font style="color:#DF2A3F;">Flask-SQLAlchemy</font> 实例(定义在 <font style="color:#DF2A3F;">models/engine.py</font>
    • app.config 读取数据库配置
    • 创建 SQLAlchemy Engine 和连接池
  2. _setup_gevent_compatibility()
    • 处理 Gevent 协程环境下的连接安全回滚
    • 防止连接泄漏
  3. _ = db.engine
    • 关键! 立即触发 Engine 创建
    • 确保连接池参数生效

连接池配置参数

文件位置api/configs/middleware/__init__.py

    SQLALCHEMY_POOL_SIZE: NonNegativeInt = Field(
        description="Maximum number of database connections in the pool.",
        default=30,
    )

    SQLALCHEMY_MAX_OVERFLOW: NonNegativeInt = Field(
        description="Maximum number of connections that can be created beyond the pool_size.",
        default=10,
    )

    SQLALCHEMY_POOL_RECYCLE: NonNegativeInt = Field(
        description="Number of seconds after which a connection is automatically recycled.",
        default=3600,
    )

    SQLALCHEMY_POOL_USE_LIFO: bool = Field(
        description="If True, SQLAlchemy will use last-in-first-out way to retrieve connections from pool.",
        default=False,
    )

    SQLALCHEMY_POOL_PRE_PING: bool = Field(
        description="If True, enables connection pool pre-ping feature to check connections.",
        default=False,
    )

    SQLALCHEMY_ECHO: bool | str = Field(
        description="If True, SQLAlchemy will log all SQL statements.",
        default=False,
    )

    SQLALCHEMY_POOL_TIMEOUT: NonNegativeInt = Field(
        description="Number of seconds to wait for a connection from the pool before raising a timeout error.",
        default=30,
    )

    RETRIEVAL_SERVICE_EXECUTORS: NonNegativeInt = Field(
        description="Number of processes for the retrieval service, default to CPU cores.",
        default=os.cpu_count() or 1,
    )

    @computed_field  # type: ignore[prop-decorator]
    @property
    def SQLALCHEMY_ENGINE_OPTIONS(self) -> dict[str, Any]:
        # Parse DB_EXTRAS for 'options'
        db_extras_dict = dict(parse_qsl(self.DB_EXTRAS))
        options = db_extras_dict.get("options", "")
        connect_args = {}
        # Use the dynamic SQLALCHEMY_DATABASE_URI_SCHEME property
        if self.SQLALCHEMY_DATABASE_URI_SCHEME.startswith("postgresql"):
            timezone_opt = "-c timezone=UTC"
            if options:
                merged_options = f"{options} {timezone_opt}"
            else:
                merged_options = timezone_opt
            connect_args = {"options": merged_options}

        return {
            "pool_size": self.SQLALCHEMY_POOL_SIZE,
            "max_overflow": self.SQLALCHEMY_MAX_OVERFLOW,
            "pool_recycle": self.SQLALCHEMY_POOL_RECYCLE,
            "pool_pre_ping": self.SQLALCHEMY_POOL_PRE_PING,
            "connect_args": connect_args,
            "pool_use_lifo": self.SQLALCHEMY_POOL_USE_LIFO,
            "pool_reset_on_return": None,
            "pool_timeout": self.SQLALCHEMY_POOL_TIMEOUT,
        }

配置参数可视化

┌─────────────────────────────────────────┐
│         数据库连接池配置                 │
├─────────────────────────────────────────┤
│ pool_size: 30                           │  核心连接数(常驻)
│ max_overflow: 10                        │  溢出连接数(临时)
│ ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━        │
│ 总连接数 = 30 + 10 = 40                │
│                                         │
│ pool_recycle: 3600秒 (1小时)           │  连接回收时间
│ pool_timeout: 30秒                     │  等待连接超时
│ pool_pre_ping: False                   │  使用前不检测连接
│ pool_use_lifo: False (FIFO模式)        │  先进先出
└─────────────────────────────────────────┘

Session Factory 配置

文件位置api/extensions/ext_session_factory.py

from core.db.session_factory import configure_session_factory
from extensions.ext_database import db


def init_app(app):
    with app.app_context():
        configure_session_factory(db.engine)

文件位置api/core/db/session_factory.py

from sqlalchemy import Engine
from sqlalchemy.orm import Session, sessionmaker

_session_maker: sessionmaker[Session] | None = None


def configure_session_factory(engine: Engine, expire_on_commit: bool = False):
    """Configure the global session factory"""
    global _session_maker
    _session_maker = sessionmaker(bind=engine, expire_on_commit=expire_on_commit)


def get_session_maker() -> sessionmaker[Session]:
    if _session_maker is None:
        raise RuntimeError("Session factory not configured. Call configure_session_factory() first.")
    return _session_maker


def create_session() -> Session:
    return get_session_maker()()


# Class wrapper for convenience
class SessionFactory:
    @staticmethod
    def configure(engine: Engine, expire_on_commit: bool = False):
        configure_session_factory(engine, expire_on_commit)

    @staticmethod
    def get_session_maker() -> sessionmaker[Session]:
        return get_session_maker()

    @staticmethod
    def create_session() -> Session:
        return create_session()


session_factory = SessionFactory()

关键注释

  • <font style="color:#DF2A3F;">_session_maker</font>:全局单例 sessionmaker
  • expire_on_commit=False:提交后对象不过期,减少数据库查询
  • bind=engine:绑定到连接池 Engine

API 请求 - 连接获取与使用

完整的 HTTP 请求处理时序图

PostgreSQL 连接池 服务层 控制器 Flask Nginx 客户端 PostgreSQL 连接池 服务层 控制器 Flask Nginx 客户端 初始化请求上下文 从池获取连接 归还连接 POST /api/v1/apps 转发请求 before_request app.post() 验证参数 AppService.create_app() db.session 建立TCP连接 连接建立 返回Session对象 session.add() session.commit() BEGIN INSERT COMMIT 成功 提交成功 返回app_model 返回JSON after_request scoped_session.remove() HTTP 200 OK 返回结果

Controller 层使用 db.session

文件位置api/controllers/console/app/app.py

# 典型的 Controller 代码片段
from extensions.ext_database import db
from models import App

class AppApi(Resource):
    @login_required
    @account_initialization_required
    def post(self):
        # 🔍 Step 1: 解析请求参数
        args = request.get_json()
        
        # 🔍 Step 2: 调用 Service 层
        app = AppService.create_app(
            tenant_id=current_account_with_tenant.current_tenant.id,
            args=args,
            account=current_account_with_tenant
        )
        
        # 🔍 Step 3: Service 内部使用 db.session
        # 实际代码在 Service 层:
        # app_model = App(...)
        # db.session.add(app_model)
        # db.session.commit()
        
        return app.to_dict(), 201

db.session 是什么

# db.session 是 Flask-SQLAlchemy 提供的 scoped_session
# 实际定义:
from flask_sqlalchemy import SQLAlchemy

db = SQLAlchemy()  # 在 models/engine.py

# db.session 的内部实现(简化):
# db.session = scoped_session(
#     sessionmaker(bind=engine),
#     scopefunc=_app_ctx_stack.__ident_func__
# )

scoped_session 工作原理

┌────────────────────────────────────────────┐
│            scoped_session                  │
├────────────────────────────────────────────┤
│                                            │
│  线程/请求上下文 A  →  Session A           │
│  线程/请求上下文 B  →  Session B           │
│  线程/请求上下文 C  →  Session C           │
│                                            │
│  同一请求内多次调用 db.session             │
│  返回同一个 Session 对象                   │
│                                            │
│  请求结束后自动调用 session.remove()       │
│  归还连接到连接池                          │
└────────────────────────────────────────────┘

连接从池中获取的详细过程

连接池内部状态(QueuePool)

# SQLAlchemy QueuePool 内部结构(简化)
class QueuePool:
    def __init__(self, pool_size=30, max_overflow=10):
        self._pool = Queue()  # 可用连接队列
        self._overflow = 0    # 当前溢出连接数
        self._checked_out = set()  # 已借出的连接
        
        # 初始化时创建 pool_size 个连接
        for _ in range(pool_size):
            conn = self._create_connection()
            self._pool.put(conn)
    
    def get_connection(self, timeout=30):
        """
        获取连接的逻辑
        """
        # 🔍 Step 1: 尝试从队列获取空闲连接
        try:
            conn = self._pool.get(block=False)
            self._checked_out.add(conn)
            return conn
        except Empty:
            pass
        
        # 🔍 Step 2: 队列为空,检查是否可以创建溢出连接
        if self._overflow < self.max_overflow:
            conn = self._create_connection()
            self._overflow += 1
            self._checked_out.add(conn)
            return conn
        
        # 🔍 Step 3: 达到上限,等待其他连接归还
        try:
            conn = self._pool.get(timeout=timeout)
            self._checked_out.add(conn)
            return conn
        except Empty:
            raise TimeoutError("Pool exhausted")
    
    def return_connection(self, conn):
        """
        归还连接
        """
        self._checked_out.remove(conn)
        
        # 检查连接是否需要回收(超过 pool_recycle 时间)
        if self._should_recycle(conn):
            conn.close()
            conn = self._create_connection()
        
        # 归还到队列
        self._pool.put(conn)

可视化连接获取流程

请求1到达
    ↓
从池获取连接
    ↓
┌─────────────────────────────────────┐
│       连接池状态                     │
│  ┌───┬───┬───┬───┬───┬───┬───┐     │
│  │ 1 │ 2 │ 3 │ 4 │ 5 │...│30 │     │  空闲连接
│  └─┬─┴───┴───┴───┴───┴───┴───┘     │
│    │                               │
│    └──> 借出给请求1                │
│                                    │
│  已借出: [conn1]                   │
│  空闲: 29个                        │
└─────────────────────────────────────┘

请求2-30到达(同时)
    ↓
全部从池获取连接
    ↓
┌─────────────────────────────────────┐
│  已借出: [conn1...conn30]          │
│  空闲: 0个                         │
└─────────────────────────────────────┘

请求31到达
    ↓
池已空,创建溢出连接
    ↓
┌─────────────────────────────────────┐
│  已借出: [conn1...conn30, conn31]  │
│  空闲: 0个                         │
│  溢出: 1个 (最多10个)              │
└─────────────────────────────────────┘

请求41到达
    ↓
达到上限40(30+10),等待30秒
    ↓
超时抛出异常:TimeoutError

Celery 任务 - 独立 Session 管理

Celery Worker 使用 sessionmaker

文件位置api/tasks/async_workflow_tasks.py

def _execute_workflow_common(
    task_data: WorkflowTaskData,
    cfs_plan_scheduler: AsyncWorkflowCFSPlanScheduler,
    cfs_plan_scheduler_entity: AsyncWorkflowCFSPlanEntity,
):
    """Execute workflow with common logic and trigger log updates."""

    # Create a new session for this task
    session_factory = sessionmaker(bind=db.engine, expire_on_commit=False)

    with session_factory() as session:
        trigger_log_repo = SQLAlchemyWorkflowTriggerLogRepository(session)

        # Get trigger log
        trigger_log = trigger_log_repo.get_by_id(task_data.workflow_trigger_log_id)

        if not trigger_log:
            # This should not happen, but handle gracefully
            return

        # Reconstruct execution data from trigger log
        trigger_data = TriggerData.model_validate_json(trigger_log.trigger_data)

        # Update status to running
        trigger_log.status = WorkflowTriggerStatus.RUNNING
        trigger_log_repo.update(trigger_log)
        session.commit()

关键步骤注释

  1. session_factory = sessionmaker(…)
    • 创建独立的 sessionmaker(不使用 Flask 的 scoped_session)
    • bind=db.engine:复用同一个连接池
    • expire_on_commit=False:提交后对象不失效
  2. with session_factory() as session:
    • 使用上下文管理器自动管理 Session
    • 进入时:从连接池获取连接
    • 退出时:自动调用 session.close(),归还连接
  3. session.commit()
    • 提交事务
    • 不会关闭连接(仍在 with 块内)

时序图 - Celery 任务的连接生命周期

PostgreSQL 连接池 sessionmaker Celery Worker PostgreSQL 连接池 sessionmaker Celery Worker 任务开始执行 等待连接归还(最多30秒) alt [池中有空闲连接] [池已满且未达上限] [池已满且达上限] 执行业务逻辑 任务完成,退出 with 块 连接可被其他任务复用 任务执行完毕 session_factory() 进入 with 块 请求连接 返回空闲连接 创建新连接 连接建立成功 返回新连接 返回等待到的连接 标记连接为已借出 返回 Session 对象 session.add(trigger_log) session.commit() BEGIN UPDATE workflow_trigger_log SET status='running' COMMIT 事务提交成功 __exit__ 自动调用 session.close() 归还连接 连接回到队列

连接回收与健康检查

POOL_RECYCLE 自动回收机制

配置

SQLALCHEMY_POOL_RECYCLE = 3600  # 1小时

工作原理

# SQLAlchemy 内部逻辑(简化)
class QueuePool:
    def get_connection(self):
        conn = self._pool.get()
        
        # 🔍 检查连接是否超过回收时间
        if time.time() - conn.created_at > self.pool_recycle:
            # 关闭旧连接
            conn.close()
            # 创建新连接
            conn = self._create_connection()
        
        return conn

为什么需要回收

场景:MySQL wait_timeout=28800 (8小时)

时间轴:
00:00  创建连接 conn1
01:00  使用 conn1 查询
02:00  conn1 归还到池
...
10:00  再次使用 conn1
       ❌ 连接已被 MySQL 服务器关闭(超过8小时未活动)
       ❌ 抛出异常:MySQL server has gone away

解决方案:
POOL_RECYCLE = 3600 (1小时)
    ↓
每小时自动关闭并重建连接
    ↓
连接永远不会在池中超过1小时
    ↓
避免 MySQL timeout 问题

POOL_PRE_PING 健康检查

配置

SQLALCHEMY_POOL_PRE_PING = False  # 默认关闭

开启后的工作原理

# 使用连接前先 ping
class QueuePool:
    def get_connection(self):
        conn = self._pool.get()
        
        if self.pool_pre_ping:
            # 🔍 发送 SELECT 1 测试连接
            try:
                conn.execute("SELECT 1")
            except Exception:
                # 连接已断开,创建新连接
                conn.close()
                conn = self._create_connection()
        
        return conn

性能权衡

开启 POOL_PRE_PING:
  ✅ 优点:每次使用前检测,避免使用断开的连接
  ❌ 缺点:每次获取连接都多一次 SELECT 1 查询,增加延迟

关闭 POOL_PRE_PING + 开启 POOL_RECYCLE:
  ✅ 优点:定期回收,无额外查询开销
  ⚠️  注意:需要设置合理的回收时间

Gevent 协程兼容性处理

文件位置api/extensions/ext_database.py

def _setup_gevent_compatibility():
    global _gevent_compatibility_setup  # pylint: disable=global-statement

    # Avoid duplicate registration
    if _gevent_compatibility_setup:
        return

    @event.listens_for(Pool, "reset")
    def _safe_reset(dbapi_connection, connection_record, reset_state):  # pyright: ignore[reportUnusedFunction]
        if reset_state.terminate_only:
            return

        # Safe rollback for connection
        try:
            hub = gevent.get_hub()
            if hasattr(hub, "loop") and getattr(hub.loop, "in_callback", False):
                gevent.spawn_later(0, lambda: _safe_rollback(dbapi_connection))
            else:
                _safe_rollback(dbapi_connection)
        except (AttributeError, ImportError):
            _safe_rollback(dbapi_connection))

    _gevent_compatibility_setup = True
为什么需要:事务状态混乱
Celery + Gevent 环境:

协程1: 开始事务 → 协程切换
协程2: 使用同一连接 → ❌ 事务状态混乱
    ↓
解决:监听 Pool.reset 事件
    ↓
在协程切换时安全回滚事务
    ↓
防止事务泄漏和状态污染

完整示例:从 HTTP 请求到数据库连接

# ========================================
# 示例 1:Flask API 处理流程
# ========================================

# Step 1: 客户端发起请求
# POST /v1/apps
# Body: {"name": "My App", "mode": "chat"}

# Step 2: Flask 路由处理
@api.route('/apps', methods=['POST'])
@login_required
def create_app():
    # ── 进入请求上下文 ──
    # before_request 钩子已执行:init_request_context()
    
    args = request.get_json()
    
    # Step 3: Service 层处理
    app = AppService.create_app(
        tenant_id=current_tenant.id,
        args=args
    )
    
    return jsonify(app.to_dict()), 201
    
    # ── 离开请求上下文 ──
    # after_request 钩子执行:scoped_session.remove()
    # 连接归还到池


# ========================================
# AppService.create_app 内部实现
# ========================================
class AppService:
    @staticmethod
    def create_app(tenant_id, args):
        # 🔍 使用 db.session(scoped_session)
        app_model = App(
            tenant_id=tenant_id,
            name=args['name'],
            mode=args['mode']
        )
        
        # ── 第一次使用 db.session ──
        # 触发连接获取:
        # 1. scoped_session 检查当前请求上下文
        # 2. 从连接池获取连接
        # 3. 创建 Session 对象并绑定连接
        db.session.add(app_model)
        
        # ── 执行 SQL ──
        # BEGIN TRANSACTION
        # INSERT INTO app (tenant_id, name, mode) VALUES (...)
        db.session.commit()
        # COMMIT
        
        # ── 连接仍然持有,等待请求结束 ──
        
        return app_model


# ========================================
# 示例 2:Celery 任务处理流程
# ========================================

@shared_task(queue="workflow")
def execute_workflow_task(workflow_id, user_id):
    # 🔍 创建独立的 sessionmaker
    from extensions.ext_database import db
    from sqlalchemy.orm import sessionmaker
    
    session_factory = sessionmaker(bind=db.engine, expire_on_commit=False)
    
    # ── 进入 with 块,获取连接 ──
    with session_factory() as session:
        # 从连接池获取连接
        # 1. QueuePool.get_connection()
        # 2. 检查池中是否有空闲连接
        # 3. 如有:返回;如无且未达上限:创建;达上限:等待
        
        # 执行业务逻辑
        workflow = session.query(Workflow).get(workflow_id)
        workflow.status = "running"
        
        # ── 提交事务 ──
        # BEGIN; UPDATE workflow SET status='running'; COMMIT;
        session.commit()
        
        # 继续执行工作流...
        result = run_workflow(workflow)
        
        workflow.status = "completed"
        session.commit()
        
        # ── 退出 with 块 ──
        # __exit__ 自动调用 session.close()
        # 连接归还到池:QueuePool.return_connection()
    
    return result


# ========================================
# 连接池内部状态变化(可视化)
# ========================================

# 初始状态(应用启动后)
Pool: [conn1, conn2, ..., conn30]  # 30个空闲连接
Checked out: []
Overflow: 0

# ── 请求1到达 ──
db.session.add(app_model)
# 获取 conn1
Pool: [conn2, ..., conn30]
Checked out: [conn1]

# ── 请求2-30同时到达 ──
# 获取 conn2-conn30
Pool: []
Checked out: [conn1, conn2, ..., conn30]

# ── 请求31到达 ──
# 创建溢出连接 conn31
Pool: []
Checked out: [conn1, ..., conn30, conn31]
Overflow: 1

# ── 请求1完成,after_request 执行 ──
scoped_session.remove()
# 归还 conn1
Pool: [conn1]
Checked out: [conn2, ..., conn30, conn31]
Overflow: 1

# ── 1小时后,conn1 被再次使用 ──
db.session.query(...)
# POOL_RECYCLE 检查
if (now - conn1.created_at) > 3600:
    conn1.close()  # 关闭旧连接
    conn1 = create_new_connection()  # 创建新连接
# 使用 conn1 执行查询

完整生命周期时序图总览

PostgreSQL Celery Worker Flask API QueuePool SQLAlchemy Engine 应用启动 PostgreSQL Celery Worker Flask API QueuePool SQLAlchemy Engine 应用启动 === 应用初始化 === loop [创建30个连接] === API 请求处理 === === Celery 任务处理 === === 连接回收 === db.init_app() 创建连接池(size=30) 建立TCP连接 连接就绪 请求连接 返回 conn1 执行SQL 返回结果 归还连接 请求连接 返回 conn2 执行SQL 返回结果 归还连接 检查连接时长 关闭过期连接 创建新连接

关键文件与函数总览

文件路径 关键函数/类 作用
api/app_factory.py create_app() 应用入口,初始化所有扩展
api/app_factory.py initialize_extensions() 按顺序加载扩展
api/extensions/ext_database.py init_app() 初始化数据库连接池
api/extensions/ext_database.py _setup_gevent_compatibility() Gevent 兼容性处理
api/models/engine.py db = SQLAlchemy() Flask-SQLAlchemy 实例
api/configs/middleware/__init__.py SQLALCHEMY_ENGINE_OPTIONS 连接池配置参数
api/extensions/ext_session_factory.py init_app() 配置 sessionmaker
api/core/db/session_factory.py configure_session_factory() 全局 sessionmaker 配置
api/core/db/session_factory.py create_session() 创建独立 Session
api/tasks/async_workflow_tasks.py _execute_workflow_common() Celery 任务使用 sessionmaker

最佳实践总结

✅ 推荐做法

  1. API 层使用 db.session
# Flask-SQLAlchemy 自动管理生命周期
db.session.add(model)
db.session.commit()
# 请求结束后自动归还连接
  1. Celery 任务使用独立 sessionmaker
session_factory = sessionmaker(bind=db.engine, expire_on_commit=False)
with session_factory() as session:
    session.add(model)
    session.commit()
# with 块结束自动关闭
  1. 设置合理的连接池参数
SQLALCHEMY_POOL_SIZE = 30  # 根据并发量调整
SQLALCHEMY_MAX_OVERFLOW = 10  # 允许临时突增
SQLALCHEMY_POOL_RECYCLE = 3600  # 1小时回收

❌ 避免的坑

  1. 不要在 Celery 任务中使用 db.session
# ❌ 错误:scoped_session 在非 Flask 上下文中行为不可预测
@shared_task
def my_task():
    db.session.add(model)  # 危险!
  1. 不要忘记归还连接
# ❌ 错误:手动创建 Session 后不关闭
session = Session(bind=db.engine)
session.query(...)
# 忘记 session.close() → 连接泄漏

# ✅ 正确:使用 with 块
with Session(bind=db.engine) as session:
    session.query(...)
# 自动关闭
  1. 不要设置过小的连接池
# ❌ 错误:pool_size=5 无法应对100并发
# 会导致大量请求等待超时

# ✅ 正确:根据实际并发设置
# 经验公式:pool_size = 预期并发数 * 1.2

工程实战与故障应对 (Failure Mode Analysis)

故障模拟 1:连接雪崩 (Connection Avalanche)

场景描述

凌晨 2 点,某大客户的自动化脚本突然发起 10,000 个并发请求:

for i in {1..10000}; do
    curl -X POST "https://api.dify.ai/v1/workflows/run" &
done

雪崩链路

在这里插入图片描述

代码中的防护机制

1. 连接池超时保护

    SQLALCHEMY_POOL_TIMEOUT: NonNegativeInt = Field(
        description="Number of seconds to wait for a connection from the pool before raising a timeout error.",
        default=30,
    )
  • 等待 30 秒后抛出 TimeoutError
  • 防止请求无限期阻塞

2. Celery 任务忽略结果

        task_ignore_result=True,
  • 不将结果存入 Redis,减轻压力
  • 适用于"fire-and-forget"场景

3. 队列分流调度

# 专业版用户走快速队列
execute_workflow_professional.delay(task_data)

# 免费版用户走慢速队列
execute_workflow_sandbox.delay(task_data)
  • 付费用户不受免费用户影响
  • 核心服务可用性优先保障

缺失的防护(建议增强):

dify中未发现以下机制

API 层限流

# 建议在 API 入口添加
from flask_limiter import Limiter

limiter = Limiter(
    app,
    key_func=lambda: request.headers.get("Authorization"),  # 按 API Key 限流
    default_limits=["100 per minute", "1000 per hour"]
)

@app.route("/v1/workflows/run")
@limiter.limit("10 per second")  # 单个 API Key 每秒最多10次
def run_workflow():
    ...

Celery Worker 动态扩缩容

# Kubernetes HPA 配置
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: celery-worker
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: celery-worker
  minReplicas: 3
  maxReplicas: 20
  metrics:
  - type: External
    external:
      metric:
        name: redis_queue_length
      target:
        type: AverageValue
        averageValue: "100"  # 队列长度超过100自动扩容

故障模拟 2:Redis 过载与脑裂

场景 A:Redis 内存满

# Redis 内存使用率 99.9%
127.0.0.1:6379> INFO memory
used_memory_human:15.99G
maxmemory:16.00G

影响

  • 新任务无法入队(LPUSH 失败)
  • Celery Broker 不可用
  • 所有异步任务停滞

代码中的容错

        broker_connection_retry_on_startup=True,
  • 启动时自动重连,但运行时连接断开会导致任务丢失
  • 缺失:运行时重连机制
建议增强:
# 任务发布时捕获异常
try:
    execute_workflow_professional.delay(task_data)
except Exception as e:
    logger.error(f"Failed to publish task to Redis: {e}")
    # 降级:直接同步执行或存入数据库待重试
    fallback_execute_workflow(task_data)

场景 B:Redis Sentinel 脑裂

Master 与 Sentinel 网络分区:

┌─────────┐      ┌─────────┐
│ Master  │ ✗ ✗ ✗│Sentinel │
│ Redis-1 │      │ Group   │
└─────────┘      └─────────┘
     │                │
     ✓                ✓
┌─────────┐      ┌─────────┐
│ Client  │      │ Slave   │
│ (Dify)  │      │ Redis-2 │
└─────────┘      └─────────┘

后果

  • Sentinel 选举 Redis-2 为新 Master
  • Client 仍连接旧 Master(Redis-1)
  • 任务分散到两个 Master,状态不一致
代码中的保护:
    if dify_config.CELERY_USE_SENTINEL:
        broker_transport_options = {
            "master_name": dify_config.CELERY_SENTINEL_MASTER_NAME,
            "sentinel_kwargs": {
                "socket_timeout": dify_config.CELERY_SENTINEL_SOCKET_TIMEOUT,
                "password": dify_config.CELERY_SENTINEL_PASSWORD,
            },
        }
  • Celery 会自动从 Sentinel 获取当前 Master
  • 但不保证立即切换(可能有几秒延迟)
任务状态一致性校验:
    with session_factory() as session:
        trigger_log_repo = SQLAlchemyWorkflowTriggerLogRepository(session)

        # Get trigger log
        trigger_log = trigger_log_repo.get_by_id(task_data.workflow_trigger_log_id)

        if not trigger_log:
            # This should not happen, but handle gracefully
            return

        # Reconstruct execution data from trigger log
        trigger_data = TriggerData.model_validate_json(trigger_log.trigger_data)

        # Update status to running
        trigger_log.status = WorkflowTriggerStatus.RUNNING
        trigger_log_repo.update(trigger_log)
        session.commit()

        start_time = datetime.now(UTC)

        try:
  • 任务状态存储在数据库(而非 Redis),即使 Redis 脑裂,任务状态仍可靠
  • 通过 trigger_log_id 保证幂等性

异步 DoS 攻击 —— 虚假长任务耗尽资源

攻击手法

攻击者注册免费账号,上传恶意构造的文档:

# 攻击脚本
import requests

api_key = "免费账号的API Key"
headers = {"Authorization": f"Bearer {api_key}"}

# 上传 10GB 的垃圾PDF文件
files = {"file": open("10gb_junk.pdf", "rb")}
for i in range(1000):
    requests.post(
        "https://api.dify.ai/v1/datasets/upload",
        headers=headers,
        files=files
    )

攻击效果

  • 触发 1000 个文档索引任务
  • 每个任务耗时 1 小时+
  • 占满 dataset 队列
  • 正常用户无法索引文档

代码中的防御机制

批量上传限制

    # check document limit
    features = FeatureService.get_features(dataset.tenant_id)
    try:
        if features.billing.enabled:
            vector_space = features.vector_space
            count = len(document_ids)
            batch_upload_limit = int(dify_config.BATCH_UPLOAD_LIMIT)
            if features.billing.subscription.plan == CloudPlan.SANDBOX and count > 1:
                raise ValueError("Your current plan does not support batch upload, please upgrade your plan.")
            if count > batch_upload_limit:
                raise ValueError(f"You have reached the batch upload limit of {batch_upload_limit}.")
            if 0 < vector_space.limit <= vector_space.size:
                raise ValueError(
                    "Your total number of documents plus the number of uploads have over the limit of "
                    "your subscription."
                )
    except Exception as e:
        for document_id in document_ids:
            document = (
                db.session.query(Document).where(Document.id == document_id, Document.dataset_id == dataset_id).first()
            )
            if document:
                document.indexing_status = "error"
                document.error = str(e)
                document.stopped_at = naive_utc_now()
                db.session.add(document)
        db.session.commit()
        db.session.close()
        return

防御要点

  • 免费版禁止批量上传count > 1 直接拒绝
  • 批量上传上限:付费版也有 BATCH_UPLOAD_LIMIT
  • 向量空间配额:超过 vector_space.limit 拒绝

租户隔离并发控制

        tenant_isolated_task_queue = TenantIsolatedTaskQueue(tenant_id, "document_indexing")

        # Check if there are waiting tasks in the queue
        # Use rpop to get the next task from the queue (FIFO order)
        next_tasks = tenant_isolated_task_queue.pull_tasks(count=dify_config.TENANT_ISOLATED_TASK_CONCURRENCY)

        logger.info("document indexing tenant isolation queue %s next tasks: %s", tenant_id, next_tasks)

        if next_tasks:
            for next_task in next_tasks:
                document_task = DocumentTask(**next_task)
                # Process the next waiting task
                # Keep the flag set to indicate a task is running
                tenant_isolated_task_queue.set_task_waiting_time()
                task_func.delay(  # type: ignore
                    tenant_id=document_task.tenant_id,
                    dataset_id=document_task.dataset_id,
                    document_ids=document_task.document_ids,
                )
        else:
            # No more waiting tasks, clear the flag
            tenant_isolated_task_queue.delete_task_key()

防御要点

  • 单个租户最多同时执行 <font style="color:#DF2A3F;">TENANT_ISOLATED_TASK_CONCURRENCY</font> 个任务
  • 恶意租户只能占用自己的并发槽位
  • 不影响其他租户

优先级队列分离

@shared_task(queue="priority_dataset")
def priority_document_indexing_task(tenant_id: str, dataset_id: str, document_ids: Sequence[str]):
  • 付费用户走 priority_dataset 队列
  • 免费用户攻击只影响 dataset 队列
  • 付费用户不受影响

缺失的防护(建议增强):

dify 代码中未发现以下机制

任务执行时长限制

@shared_task(
    queue="dataset",
    time_limit=3600,  # 任务最多执行1小时
    soft_time_limit=3300  # 55分钟时发送警告信号
)
def document_indexing_task(...):
    ...

恶意文件检测

# 在任务开始前检测文件
def validate_document(file_path):
    # 检查文件大小
    if os.path.getsize(file_path) > 100 * 1024 * 1024:  # 100MB
        raise ValueError("File too large")
    
    # 检查PDF页数
    import PyPDF2
    with open(file_path, 'rb') as f:
        pdf = PyPDF2.PdfReader(f)
        if len(pdf.pages) > 1000:
            raise ValueError("Too many pages")

4.2 防御机制总结

攻击类型 防御机制 代码位置
批量上传 DoS 免费版禁止批量、配额限制 document_indexing_task.py:61
单租户霸占 租户隔离队列 queue.py:25
付费用户被拖累 多队列分离 entities.py:7
数据库连接耗尽 连接池大小限制 middleware/__init__.py:168
Redis 过载 task_ignore_result=True ext_celery.py:82

标准化回答话术(300字以内)

如何优化一个大模型应用平台的系统吞吐量并防止连接雪崩?


任务调度连接管理队列分流安全防护四个维度设计高并发架构:

** Celery + Redis 异步架构解耦**

首先采用 Celery + Redis 实现请求响应与任务执行的解耦:

  • 同步场景:用户请求 → API 直接调用 LLM → 阻塞 30 秒 → QPS 仅 1.3
  • 异步优化:用户请求 → API 推送到 Redis 队列 → 立即返回任务 ID → Celery Worker 异步执行 → QPS 提升至 1000+

关键配置

celery_app.conf.update(
    broker=redis_url,
    task_ignore_result=True,  # 不存结果,减轻 Redis 压力
    broker_connection_retry_on_startup=True  # 自动重连
)

为什么仅增加 CPU 不够

  • 瓶颈在 I/O 等待(LLM API、数据库查询),不是 CPU 计算
  • 需要优化任务流转效率:
    • 使用连接池复用数据库连接
    • 批量操作减少 Redis 往返
    • 合理设置 Worker 并发数(基于 I/O 密集型调优)

** PgBouncer 连接池优化防止雪崩**

问题场景:10,000 并发请求瞬间涌入,应用需要 10,000 个数据库连接,但 PostgreSQL 默认最大连接数仅 100,导致连接耗尽。

解决方案 - 两层连接池

应用层(SQLAlchemy)

SQLALCHEMY_POOL_SIZE = 30  # 核心连接数
SQLALCHEMY_MAX_OVERFLOW = 10  # 临时连接数(总计40)
SQLALCHEMY_POOL_RECYCLE = 3600  # 1小时回收,防止 MySQL wait_timeout
SQLALCHEMY_POOL_TIMEOUT = 30  # 等待连接超时

中间层(PgBouncer)

[databases]
dify = host=postgres port=5432 dbname=dify

[pgbouncer]
pool_mode = transaction  # 事务级池化,利用率最高
max_client_conn = 10000  # 接受应用的10000个连接
default_pool_size = 20  # 但只给 PostgreSQL 建立20个真实连接
reserve_pool_size = 5  # 紧急备用连接

工作原理

  • 应用 40 个连接 → PgBouncer 映射到 20 个真实连接
  • 事务结束后立即归还连接,避免长时间占用
  • 防止连接雪崩:即使应用请求 10,000 连接,数据库只承受 20 个

监控指标

-- 监控 PgBouncer 等待队列
SHOW POOLS;
-- 若 waiting_clients > 100,说明连接池过小,需扩容

轻重任务队列分离策略

单队列问题:免费用户上传 1GB 文档索引(耗时 1 小时),阻塞付费用户的 100KB 文档(耗时 10 秒)。

多队列架构

# 按租户等级分流
if user.plan == "professional":
    execute_workflow_professional.delay(task)  # workflow_professional 队列
elif user.plan == "team":
    execute_workflow_team.delay(task)  # workflow_team 队列
else:
    execute_workflow_sandbox.delay(task)  # workflow_sandbox 队列

# 按任务类型分流
if task_type == "quick_query":
    quick_task.delay()  # 快速队列,秒级响应
elif task_type == "document_indexing":
    indexing_task.delay()  # dataset 队列,分钟级

Worker 部署策略

# 专业版队列:8 个 Worker,高优先级
celery -A celery worker -Q workflow_professional -c 8 --prefetch-multiplier 1

# 团队版队列:4 个 Worker
celery -A celery worker -Q workflow_team -c 4

# 免费版队列:2 个 Worker,低优先级
celery -A celery worker -Q workflow_sandbox -c 2 --max-tasks-per-child 100

租户隔离队列(防止单租户霸占):

class TenantIsolatedTaskQueue:
    # 每个租户独立队列
    queue_key = f"tenant_tasks:{tenant_id}"
    
    # 并发控制
    max_concurrent_tasks = 3  # 单租户最多3个任务同时执行

异步 DoS 防御思路

攻击场景:恶意用户发送 1000 个"上传 1GB 文档"请求,耗尽队列资源。

入口限流

# API 层限流(基于 API Key)
@limiter.limit("10 requests per second")
@limiter.limit("100 requests per hour")
def upload_document():
    ...

配额校验

# 免费版禁止批量上传
if user.plan == "sandbox" and len(documents) > 1:
    raise PermissionError("Upgrade to batch upload")

# 向量空间配额
if used_vectors >= vector_quota:
    raise QuotaExceededError()

任务超时保护

@shared_task(
    time_limit=3600,  # 硬超时:1小时
    soft_time_limit=3300  # 软超时:55分钟发警告
)
def long_running_task():
    ...

恶意文件检测

# 任务执行前验证
def validate_file(file_path):
    size = os.path.getsize(file_path)
    if size > 100 * 1024 * 1024:  # 100MB
        raise ValueError("File too large")

监控告警

# 队列长度超过阈值告警
if redis.llen("workflow_sandbox") > 1000:
    alert_ops_team("Queue backlog detected")


进一步优化:性能监控与调优

关键指标

# 队列积压
redis.llen("workflow_professional")  # < 100 正常,> 1000 需扩容

# 连接池使用率
pool.checkedout / (pool.size + pool.overflow)  # < 80% 正常

# Worker 任务处理速度
celery inspect stats  # tasks_per_second

# 数据库慢查询
SELECT * FROM pg_stat_statements WHERE mean_exec_time > 1000;

自动扩缩容(Kubernetes HPA):

apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: celery-worker-pro
spec:
  scaleTargetRef:
    name: celery-worker-pro
  minReplicas: 8
  maxReplicas: 50
  metrics:
  - type: External
    external:
      metric:
        name: redis_queue_length
      target:
        averageValue: "50"  # 队列长度>50自动扩容

总结:高并发架构的核心原则

  1. 异步解耦Celery + Redis 让 API 快速响应
  2. 连接复用PgBouncer + SQLAlchemy 连接池防止雪崩
  3. 队列分流:轻重任务隔离、租户隔离、优先级分离
  4. 安全防护:限流、配额、超时、恶意检测
  5. 弹性扩展:基于队列长度自动扩缩容 Worker
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐