我们可以把Open WebUI想象成一个管道系统,数据通过管道和阀门流动。管道作为open webui的插件,可以为数据构建新的通路,可以自定义逻辑和处理数据;阀门是管道的可配置部件,控制数据流过管道时的行为。管道可以理解成用户自定义的模型,并且被追加到用户可见的模型列表中。用户的对话式,可以跟选择大模型一样选择管道。

        管道的引入方式通前面所说的Action和Filter一样,下面以一个使用langchain的管道为例,该管道在发送请求到大模型之前把{'role':'system', 'cotent':'You are a helpful bot'}增加到messages中,具体代码如:

"""
title: LangChain Pipe Function
author: Colby Sawyer @ Attollo LLC (mailto:colby.sawyer@attollodefense.com)
author_url: https://github.com/ColbySawyer7
version: 0.1.0

This module defines a Pipe class that utilizes LangChain
"""

from typing import Optional, Callable, Awaitable
from pydantic import BaseModel, Field
import os
import time

# import LangChain dependencies
from langchain_core.prompts import ChatPromptTemplate
from langchain.schema import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
from langchain_community.llms import Ollama
# Uncomment to use OpenAI and FAISS
#from langchain_openai import ChatOpenAI
#from langchain_community.vectorstores import FAISS

class Pipe:
    class Valves(BaseModel):
        base_url: str = Field(default="http://localhost:11434")
        ollama_embed_model: str = Field(default="nomic-embed-text")
        ollama_model: str = Field(default="llama3.1")
        openai_api_key: str = Field(default="...")
        openai_model: str = Field(default="gpt3.5-turbo")
        emit_interval: float = Field(
            default=2.0, description="Interval in seconds between status emissions"
        )
        enable_status_indicator: bool = Field(
            default=True, description="Enable or disable status indicator emissions"
        )

    def __init__(self):
        self.type = "pipe"
        self.id = "langchain_pipe"
        self.name = "LangChain Pipe"
        self.valves = self.Valves()
        self.last_emit_time = 0
        pass

    async def emit_status(
        self,
        __event_emitter__: Callable[[dict], Awaitable[None]],
        level: str,
        message: str,
        done: bool,
    ):
        current_time = time.time()
        if (
            __event_emitter__
            and self.valves.enable_status_indicator
            and (
                current_time - self.last_emit_time >= self.valves.emit_interval or done
            )
        ):
            await __event_emitter__(
                {
                    "type": "status",
                    "data": {
                        "status": "complete" if done else "in_progress",
                        "level": level,
                        "description": message,
                        "done": done,
                    },
                }
            )
            self.last_emit_time = current_time

    async def pipe(self, body: dict, 
             __user__: Optional[dict] = None,
        __event_emitter__: Callable[[dict], Awaitable[None]] = None,
        __event_call__: Callable[[dict], Awaitable[dict]] = None,
        ) -> Optional[dict]:
        await self.emit_status(
            __event_emitter__, "info", "/initiating Chain", False
        )

        # ======================================================================================================================================
        # MODEL SETUP
        # ======================================================================================================================================
        # Setup the model for generating responses
        # ==========================================================================
        # Ollama Usage
        _model = Ollama(
            model=self.valves.ollama_model,
            base_url=self.valves.base_url
        )
        # ==========================================================================
        # OpenAI Usage
        # _model = ChatOpenAI(
        #     openai_api_key=self.valves.openai_api_key,
        #     model=self.valves.openai_model
        # )
        # ==========================================================================

        # Example usage of FAISS for retreival
        # vectorstore = FAISS.from_texts(
        #     texts, embedding=OpenAIEmbeddings(openai_api_key=self.valves.openai_api_key)
        # )

        # ======================================================================================================================================
        # PROMPTS SETUP
        # ==========================================================================
        _prompt = ChatPromptTemplate.from_messages([
            ("system", "You are a helpful bot"),
            ("human", "{question}")
        ])
        # ======================================================================================================================================
        # CHAIN SETUP
        # ==========================================================================
        # Basic Chain
        chain = (
            {"question": RunnablePassthrough()}
            | _prompt
            | _model
            | StrOutputParser()
        )
        # ======================================================================================================================================
        # Langchain Calling
        # ======================================================================================================================================
        await self.emit_status(
            __event_emitter__, "info", "Starting Chain", False
        )
        messages = body.get("messages", [])
        # Verify a message is available
        if messages:
            question = messages[-1]["content"]
            try:
                # Invoke Chain
                response = chain.invoke(question)
                # 把调用管道的应答数据追加到表单的messages中
                body["messages"].append({"role": "assistant", "content": response})
            except Exception as e:
                await self.emit_status(__event_emitter__, "error", f"Error during sequence execution: {str(e)}", True)
                return {"error": str(e)}
        # If no message is available alert user
        else:
            await self.emit_status(__event_emitter__, "error", "No messages found in the request body", True)
            body["messages"].append({"role": "assistant", "content": "No messages found in the request body"})

        await self.emit_status(__event_emitter__, "info", "Complete", True)
        return response

        管道增加成功后,出现在聊天页面的模型列表中,用户可以在聊天页面选择使用,效果如下图:

         一、请求报文

        在请求数据中的model就是当前新增的管道名LangChain Pipe,具体如下:

{
    "stream": true,
    "model": "langchain_pipe",
    "messages": [
        {
            "role": "user",
            "content": "床前明月光的下一句"
        }
    ],
    "params": {},
    "tool_servers": [],
    "features": {
        "image_generation": false,
        "code_interpreter": false,
        "web_search": false,
        "memory": false
    },
    "variables": {
        "{{USER_NAME}}": "acaluis",
        "{{USER_LOCATION}}": "Unknown",
        "{{CURRENT_DATETIME}}": "2025-08-22 18:27:04",
        "{{CURRENT_DATE}}": "2025-08-22",
        "{{CURRENT_TIME}}": "18:27:04",
        "{{CURRENT_WEEKDAY}}": "Friday",
        "{{CURRENT_TIMEZONE}}": "Etc/GMT-8",
        "{{USER_LANGUAGE}}": "zh-CN"
    },
    "model_item": {
        "id": "langchain_pipe",
        "name": "LangChain Pipe",
        "object": "model",
        "created": 1755857251,
        "owned_by": "openai",
        "pipe": {
            "type": "pipe"
        },
        "actions": [],
        "filters": [],
        "tags": []
    },
    "session_id": "Hh2Tyy7FYSQz-I1SAAAg",
    "chat_id": "b80cadeb-0389-4464-9fd9-270193f4f3a0",
    "id": "618d361e-d70c-4fec-b945-a4be549bd699",
    "background_tasks": {
        "title_generation": true,
        "tags_generation": true,
        "follow_up_generation": true
    }
}

        二、源码分析

        在会话主流程中,关于pipe的处理入口在generate_chat_completion方法中,相关代码如下:

async def generate_chat_completion(
    request: Request,
    form_data: dict,
    user: Any,
    bypass_filter: bool = False,
):

    ……

    if model.get("pipe"): #看这里。请求报文中有pipe属性,所以进入这个分支。
            return await generate_function_chat_completion(#下面重点分析该方法代码
                request, form_data, user=user, models=models
            )

    ……  

        generate_function_chat_completion代码如下:

本方法处理流程如下:

1)确定使用的模型,也就是现在的langchain_pipe

2)设置pipe方法的部分参数到extra_params中

3)从模块中获取管道Function的pipe方法及参数

4)异步执行pipe方法

5)把调用pipe方法的应答包装成openai兼容格式,并以流式返回

async def generate_function_chat_completion(
    request, form_data, user, models: dict = {}
):
    async def execute_pipe(pipe, params):#该方法异步调用管道中的pipe方法
        if inspect.iscoroutinefunction(pipe):
            return await pipe(**params)
        else:
            return pipe(**params)

    async def get_message_content(res: str | Generator | AsyncGenerator) -> str:
        if isinstance(res, str):
            return res
        if isinstance(res, Generator):
            return "".join(map(str, res))
        if isinstance(res, AsyncGenerator):
            return "".join([str(stream) async for stream in res])

    def process_line(form_data: dict, line):
        if isinstance(line, BaseModel):
            line = line.model_dump_json()
            line = f"data: {line}"
        if isinstance(line, dict):
            line = f"data: {json.dumps(line)}"

        try:
            line = line.decode("utf-8")
        except Exception:
            pass

        if line.startswith("data:"):
            return f"{line}\n\n"
        else:
            line = openai_chat_chunk_message_template(form_data["model"], line)
            return f"data: {json.dumps(line)}\n\n"

    def get_pipe_id(form_data: dict) -> str:
        pipe_id = form_data["model"]
        if "." in pipe_id:
            pipe_id, _ = pipe_id.split(".", 1)
        return pipe_id

    #本方法用于从form_data,user和extra_params中获取管道的pipe函数参数

    def get_function_params(function_module, form_data, user, extra_params=None):
        if extra_params is None:
            extra_params = {}

        pipe_id = get_pipe_id(form_data)

        # Get the signature of the function
        sig = inspect.signature(function_module.pipe) #获取函数签名

        '''

            body设置为表单,__event_emitter__、__event_caller__和__user__来自

            extra_params

        '''
        params = {"body": form_data} | {
            k: v for k, v in extra_params.items() if k in sig.parameters
        }

        '''

            如果参数中有__user__并且管道函数中有UserValves,则需要根据pipe_id和user_id

            找到用户阀门值,并设置到params["__user__"]["valves"]中

        '''

        if "__user__" in params and hasattr(function_module, "UserValves"):
            user_valves = Functions.get_user_valves_by_id_and_user_id(pipe_id, user.id)
            try:
                params["__user__"]["valves"] = function_module.UserValves(**user_valves)
            except Exception as e:
                log.exception(e)
                params["__user__"]["valves"] = function_module.UserValves()

        return params

    #以下两行代码根据请求中的model确定使用的模型,这里对应就是前面定位的管道

    model_id = form_data.get("model")
    model_info = Models.get_model_by_id(model_id)

    '''

        当前model_info信息如下:

                {
                    "id": "langchain_pipe",
                    "name": "LangChain Pipe",
                    "object": "model",
                    "created": 1755827527,
                    "owned_by": "openai",
                    "pipe":{"type": "pipe"},
                }

    '''

    metadata = form_data.pop("metadata", {})

    files = metadata.get("files", [])
    tool_ids = metadata.get("tool_ids", [])
    # tool_ids处理,暂不关注
    if tool_ids is None:
        tool_ids = []

    __event_emitter__ = None
    __event_call__ = None
    __task__ = None
    __task_body__ = None

    if metadata:#如果元数据不为空,判断其中是否包含了session_id,chat_id和message_id

        #如果上面的判断成立,则创建__event_emitter__和__event_call__
        if all(k in metadata for k in ("session_id", "chat_id", "message_id")):
            __event_emitter__ = get_event_emitter(metadata)
            __event_call__ = get_event_call(metadata)
        __task__ = metadata.get("task", None)
        __task_body__ = metadata.get("task_body", None)

    extra_params = { #t填充extra_params,用于提供调用pipe方法时的参数
        "__event_emitter__": __event_emitter__,
        "__event_call__": __event_call__,
        "__chat_id__": metadata.get("chat_id", None),
        "__session_id__": metadata.get("session_id", None),
        "__message_id__": metadata.get("message_id", None),
        "__task__": __task__,
        "__task_body__": __task_body__,
        "__files__": files,
        "__user__": user.model_dump() if isinstance(user, UserModel) else {},
        "__metadata__": metadata,
        "__request__": request,
    }
    extra_params["__tools__"] = get_tools(#根据tool_ids设置extra_params的__tools__
        request,
        tool_ids,
        user,
        {
            **extra_params,
            "__model__": models.get(form_data["model"], None),
            "__messages__": form_data["messages"],
            "__files__": files,
        },
    )

    if model_info:
        if model_info.base_model_id:#暂不考虑
            form_data["model"] = model_info.base_model_id

        params = model_info.params.model_dump()

        if params:#暂不考虑
            system = params.pop("system", None)
            form_data = apply_model_params_to_body_openai(params, form_data)
            form_data = apply_model_system_prompt_to_body(
                system, form_data, metadata, user
            )

    pipe_id = get_pipe_id(form_data)#从表单中获取管道对应的函数ID
    function_module = get_function_module_by_id(request, pipe_id) #根据ID获取模块

    pipe = function_module.pipe#得到该管道的pipe方法

    #从表单数据、用户和extra_params中找到并组织好调用pipe方法时所有的参数
    params = get_function_params(function_module, form_data, user, extra_params)

    if form_data.get("stream", False):#一般是流式应答

        async def stream_content(): #执行管道的pipe方法,并处理流式应答
            try:

                 '''

                   重要:管道中调用ollama返回的数据是ndjson

                   ndjson是一种数据交换格式,其中每行包含一个独立的json对象,

                   需要把所有的行合并成一个大的json对象,langchain完成了合并,

                   所以调用管道后,返回的内容就是应答内容

                '''
                res = await execute_pipe(pipe, params)

                # Directly return if the response is a StreamingResponse
                if isinstance(res, StreamingResponse):
                    async for data in res.body_iterator:
                        yield data
                    return
                if isinstance(res, dict):
                    yield f"data: {json.dumps(res)}\n\n"
                    return

            except Exception as e:
                log.error(f"Error: {e}")
                yield f"data: {json.dumps({'error': {'detail':str(e)}})}\n\n"
                return

            if isinstance(res, str): #正常情况进入本分支

                ''' --------把大模型应答内容转换成标准openai格式------------

                   返回数据格式如下:

                    {"data":

                             {

                                   "id", "langchain_pipe-{uuid}",

                                   "created": "当前时间",

                                   "model": "langchain_pipe",    

                                    choices:[

                                                     {

                                                            "index":0,

                                                            "logprobs":None,

                                                            "finish_reason":None

                                                             "delta":{

                                                                 "content": "管道返回内容"

                                                              }

                                                      }

                                         ]

                                }

                          }         

                '''
                message = openai_chat_chunk_message_template(form_data["model"], res)
                yield f"data: {json.dumps(message)}\n\n"

            if isinstance(res, Iterator):
                for line in res:
                    yield process_line(form_data, line)

            if isinstance(res, AsyncGenerator):
                async for line in res:
                    yield process_line(form_data, line)

            if isinstance(res, str) or isinstance(res, Generator):
                finish_message = openai_chat_chunk_message_template(
                    form_data["model"], ""
                )
                finish_message["choices"][0]["finish_reason"] = "stop"
                yield f"data: {json.dumps(finish_message)}\n\n"
                yield "data: [DONE]"

        #调用stream_content()后,以流式应答返回供process_chat_response处理

        return StreamingResponse(stream_content(), media_type="text/event-stream")
    else:#非流式请求,不须考虑
        try:
            res = await execute_pipe(pipe, params) #执行管道的pipe方法

        except Exception as e:
            log.error(f"Error: {e}")
            return {"error": {"detail": str(e)}}

        if isinstance(res, StreamingResponse) or isinstance(res, dict):
            return res
        if isinstance(res, BaseModel):
            return res.model_dump()

        message = await get_message_content(res)
        return openai_chat_completion_message_template(form_data["model"], message)
 

        process_chat_response的处理逻辑在系列3已经分析,在此不做赘述。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐