🚀 本文内容:深入讲解Dify插件开发的进阶技巧,涵盖Manifest文件配置、通用数据结构(I18nObject、ProviderConfig等)、模型API接口调用、持久化存储机制、工具返回消息类型、多语言README支持以及核心的"反向调用"功能——让插件能够调用Dify主平台的App、Model、Tool和Node服务,实现与Dify生态的深度集成。

画板

进阶指南 1 - 插件常规

Manifest 文件 - 属性详解

Manifest 文件是一个 YAML 格式的文件,定义了插件的最基本信息。如果该文件格式不正确,则插件的解析和打包都会失败。

# 插件版本
version: 0.0.1
# 插件类型,目前仅支持plugin,未来将支持bundle
type: "plugin"
# 插件作者
author: "Yeuoly"
# 插件名称
name: "neko"
# 插件标签,多语言
label:
  en_US: "Neko"
# 插件创建时间,Marketplace要求不能晚于当前时间
created_at: "2024-07-12T08:03:44.658609186Z"
# 插件图标
icon: "icon.svg"
# 插件申请的资源
resource:
  # 最大内存使用量,主要与SaaS上的AWS Lambda资源申请有关,单位字节
  memory: 1048576
  # 权限申请
  permission:
    # 工具反向调用权限
    tool:
      enabled: true
    # 模型反向调用权限
    model:
      enabled: true
      # 是否启用LLM权限
      llm: true
    # 注册endpoint权限
    endpoint:
      enabled: true
    # app反向调用权限
    app:
      enabled: true
    # 申请持久化存储权限
    storage:
      enabled: true
      size: 1048576
# 插件扩展的具体能力的yaml文件列表
## 不允许没有扩展
## 不允许同时扩展工具和模型
## 不允许同时扩展模型和Endpoints
plugins:
  endpoints:
    - "provider/neko.yaml"
  tools:
    - "xxx/xxx.yaml"
  models:
    - "xxx/xxx.yaml"
  agent_strategies:
    - "xxx/xxx.yaml"

# 插件元数据
meta:
  # 插件的manifest版本
  version: 0.0.1
  # 插件支持的架构
  arch:
    - "amd64"
    - "arm64"
  # 插件运行时配置
  runner:
    # 插件编程语言
    language: "python"
    # 插件编程语言版本
    version: "3.10"
    # 插件程序入口
    entrypoint: "main"
# 隐私政策文件路径
privacy: "./privacy.md"

通用规范 & 数据结构

路径规范

在 Manifest 或者任何 yaml 文件中填写文件路径时,根据文件类型遵循以下两种规范:

  • 多媒体文件(比如图片、视频):将文件放在插件根目录的 _assets文件夹中。
  • 普通文件(如 .py.yaml):写该文件在插件项目中的绝对路径

数据结构

通用结构

在定义插件时,有些数据结构可以在工具、模型和 Endpoints 之间复用。【了解这些数据结构,便于后续开发插件时,能够方便的使用】

  • I18nObject :符合 IETF BCP 47 标准的国际化结构,目前支持 en_US、zh_Hans、ja_JP、pt_BR(葡萄牙语(巴西)) 四种语言。
  • ProviderConfig 通用的提供者表单结构,适用于 ToolEndpoint

I18nObject 为例,其源代码如下:

模型结构

Dify 模型插件开发时,会涉及到供应商(Provider)、AI 模型实体(AIModelEntity)、模型类型(ModelType)、配置方法(ConfigurateMethod)、模型特性(ModelFeature)、参数规则(ParameterRule)、价格配置(PriceConfig)以及各种凭据模式的详细数据结构。


以模型类型为例,其源代码如下:

@docs(
    description="The model type",
)
class ModelType(Enum):
    """
    Enum class for model type.
    """

    LLM = "llm"
    TEXT_EMBEDDING = "text-embedding"
    RERANK = "rerank"
    SPEECH2TEXT = "speech2text"
    MODERATION = "moderation"
    TTS = "tts"
    TEXT2IMG = "text2img"

模型插件 API 接口

模型 API 接口 - Dify Docs 中介绍了模型的 API 接口(就是 Python 中的抽象方法)及数据结构。

以大模型调用为例,及核心 API 接口如下:提供的是抽象方法,由具体的模型来实现此抽象方法。在 openai_compatiable\llm.py 中实现了一个兼容 OpenAI API 的大模型调用方法,

其它的模型类型,还有很多方法,在此不再赘述。

持久化存储

Dify 插件中支持调用方法实现持久化存储,便于在多次交互中维护状态,实现有状态应用和记忆功能。常用于用户偏好、对话历史、API 令牌、缓存数据、文件存储功等功能。

# 访问存储接口
storage = self.session.storage

# 存储数据(必须转换为字节)
storage.set("user_name", "John Doe".encode('utf-8'))
# 检索数据
name_bytes = storage.get("user_name")
if name_bytes:
    name = name_bytes.decode('utf-8')
    print(f"Retrieved name: {name}")

# 存储JSON数据
import json
user_data = {"name": "John", "age": 30, "preferences": ["AI", "NLP"]}
storage.set("user_data", json.dumps(user_data).encode('utf-8'))

# 检索JSON数据
user_data_bytes = storage.get("user_data")
if user_data_bytes:
    user_data = json.loads(user_data_bytes.decode('utf-8'))
    print(f"User preferences: {user_data['preferences']}")

self.session.storage的实现类为 StorageInvocation,其源码如下:

class StorageInvocation(BackwardsInvocation[dict]):
    def set(self, key: str, val: bytes) -> None:
        """
        set a value into persistence storage.

        :raises:
            StorageInvocationError: If the invocation returns an invalid data.
        """
        for data in self._backwards_invoke(
            InvokeType.Storage,
            dict,
            {"opt": "set", "key": key, "value": hexlify(val).decode()},
        ):
            if data["data"] == "ok":
                return

            raise StorageInvocationError(f"unexpected data: {data['data']}")

    def get(self, key: str) -> bytes:
        """get a key from persistence storage.

        :raises:
            NotFoundError: If the caller gets a key that does not exist.
        """

        for data in self._backwards_invoke(
            InvokeType.Storage,
            dict,
            {
                "opt": "get",
                "key": key,
            },
        ):
            return unhexlify(data["data"])

        raise StorageInvocationError("no data found")

    def delete(self, key: str) -> None:
        """delete a key from persistence storage.

        :raises:
            StorageInvocationError: If the invocation returns an invalid data.
        """
        for data in self._backwards_invoke(
            InvokeType.Storage,
            dict,
            {
                "opt": "del",
                "key": key,
            },
        ):
            if data["data"] == "ok":
                return

            raise StorageInvocationError(f"unexpected data: {data['data']}")

        raise StorageInvocationError("no data found")

    def exist(self, key: str) -> bool:
        """Check for the existence of a key in persistence storage.

        :raises:
            StorageInvocationError: If the invocation does not return any data.
        """
        for data in self._backwards_invoke(
            InvokeType.Storage,
            dict,
            {
                "opt": "exist",
                "key": key,
            },
        ):
            return data["data"]

        raise StorageInvocationError("no data found")

工具返回

Dify 工具插件,支持返回各种不同类型的消息(图片 URL、链接、文本、文件、JSON),支持创建变量和流式变量消息。

在前面的例子中,我们返回了 JSON 消息,JSON 消息的本质就是返回一个 json_object 对象,然后其中的内容是我们自定义的 json 字段。

# 返回json消息
yield self.create_json_message({
    "current_time": current_time.strftime(time_format),
    "time_zone": str(current_time.astimezone().tzinfo),
    "timestamp": current_time.timestamp()
})

# json消息创建的实现方法
def create_json_message(self, json: Mapping | list) -> T:
    return self.response_type(
        type=InvokeMessage.MessageType.JSON,
        message=InvokeMessage.JsonMessage(json_object=json),
    )

class JsonMessage(BaseModel):
    json_object: Mapping | list

    def to_dict(self):
        return {"json_object": self.json_object}       

其它的消息支持:文本、JSON、图片、链接、BLOB、变量、流式变量、日志、知识库。

def create_text_message(self, text: str) -> T:
    return self.response_type(
        type=InvokeMessage.MessageType.TEXT,
        message=InvokeMessage.TextMessage(text=text),
    )

def create_json_message(self, json: Mapping | list) -> T:
    return self.response_type(
        type=InvokeMessage.MessageType.JSON,
        message=InvokeMessage.JsonMessage(json_object=json),
    )

def create_image_message(self, image_url: str) -> T:
    """
    create an image message

    :param image: the url of the image
    :return: the image message
    """
    return self.response_type(
        type=InvokeMessage.MessageType.IMAGE,
        message=InvokeMessage.TextMessage(text=image_url),
    )

def create_link_message(self, link: str) -> T:
    """
    create a link message

    :param link: the url of the link
    :return: the link message
    """
    return self.response_type(
        type=InvokeMessage.MessageType.LINK,
        message=InvokeMessage.TextMessage(text=link),
    )

def create_blob_message(self, blob: bytes, meta: dict | None = None) -> T:
    """
    create a blob message

    :param blob: the blob
    :return: the blob message
    """
    return self.response_type(
        type=InvokeMessage.MessageType.BLOB,
        message=InvokeMessage.BlobMessage(blob=blob),
        meta=meta,
    )

def create_variable_message(self, variable_name: str, variable_value: Any) -> T:
    """
    create a variable message

    :param variable_name: the name of the variable
    :param variable_value: the value of the variable
    :return: the variable message
    """
    return self.response_type(
        type=InvokeMessage.MessageType.VARIABLE,
        message=InvokeMessage.VariableMessage(variable_name=variable_name, variable_value=variable_value),
    )

def create_stream_variable_message(self, variable_name: str, variable_value: str) -> T:
    """
    create a variable message that will be streamed to the frontend

    NOTE: variable value should be a string, only string is streaming supported now

    :param variable_name: the name of the variable
    :param variable_value: the value of the variable
    :return: the variable message
    """
    return self.response_type(
        type=InvokeMessage.MessageType.VARIABLE,
        message=InvokeMessage.VariableMessage(
            variable_name=variable_name,
            variable_value=variable_value,
            stream=True,
        ),
    )

def create_log_message(
    self,
    label: str,
    data: Mapping[str, Any],
    status: InvokeMessage.LogMessage.LogStatus = InvokeMessage.LogMessage.LogStatus.SUCCESS,
    parent: T | None = None,
    metadata: Mapping[LogMetadata, Any] | None = None,
) -> T:
    """
    create a log message with status "start"
    """
    return self.response_type(
        type=InvokeMessage.MessageType.LOG,
        message=InvokeMessage.LogMessage(
            label=label,
            data=data,
            status=status,
            parent_id=parent.message.id
            if parent and isinstance(parent.message, InvokeMessage.LogMessage)
            else None,
            metadata=metadata,
        ),
    )

def create_retriever_resource_message(
    self,
    retriever_resources: list[InvokeMessage.RetrieverResourceMessage.RetrieverResource],
    context: str,
) -> T:
    """
    create a retriever resource message
    """
    return self.response_type(
        type=InvokeMessage.MessageType.RETRIEVER_RESOURCES,
        message=InvokeMessage.RetrieverResourceMessage(
            retriever_resources=retriever_resources,
            context=context,
        ),
    )

def finish_log_message(
    self,
    log: T,
    status: InvokeMessage.LogMessage.LogStatus = InvokeMessage.LogMessage.LogStatus.SUCCESS,
    error: str | None = None,
    data: Mapping[str, Any] | None = None,
    metadata: Mapping[LogMetadata, Any] | None = None,
) -> T:
    """
    mark log as finished
    """
    assert isinstance(log.message, InvokeMessage.LogMessage)
    return self.response_type(
        type=InvokeMessage.MessageType.LOG,
        message=InvokeMessage.LogMessage(
            id=log.message.id,
            label=log.message.label,
            data=data or log.message.data,
            status=status,
            parent_id=log.message.parent_id,
            error=error,
            metadata=metadata or log.message.metadata,
        ),
    )

自定义输出变量:

  • 先修改 yaml 配置,告知返回哪些自定义变量。name、age、profile
  • 修改 py 代码,创建自定义变量消息。
identity:
  author: example_author
  name: example_tool
  label:
    en_US: Example Tool
    zh_Hans: 示例工具
    ja_JP: ツール例
    pt_BR: Ferramenta de exemplo
description:
  human:
    en_US: A simple tool that returns a name
    zh_Hans: 返回名称的简单工具
    ja_JP: 名前を返す簡単なツール
    pt_BR: Uma ferramenta simples que retorna um nome
  llm: A simple tool that returns a name variable
output_schema:
  type: object
  properties:
    name:
      type: string
      description: "The name returned by the tool"
    age:
      type: integer
      description: "The age returned by the tool"
    profile:
      type: object
      properties:
        interests:
          type: array
          items:
            type: string
        location:
          type: string

Python 代码如下:

def run(self, inputs):
    # Generate complex structured data
    user_data = {
        "name": "Bob",
        "age": 30,
        "profile": {
            "interests": ["coding", "reading", "hiking"],
            "location": "San Francisco"
        }
    }
    
    # Return individual variables
    self.create_variable_message("name", user_data["name"])
    self.create_variable_message("age", user_data["age"])
    self.create_variable_message("profile", user_data["profile"])
    
    # Also return a text message for display
    return self.create_text_message(f"User {user_data['name']} processed successfully")

README 多语言支持

Dify 插件支持 README 的多语言,默认情况下,README 对应的是英语,可以扩展其它语言。

扩展的 README 语言的目录结构如下:【在 readme 目录下创建日语、葡萄牙语、简体中文对应的 README 文件】

...
├── main.py
├── manifest.yaml
├── readme
│   ├── README_ja_JP.md
│   ├── README_pt_BR.md
│   └── README_zh_Hans.md
├── README.md
...

输出插件日志

在插件开发及运行过程中,支持输出日志到远程调试时的标准输出以及插件守护进程的容器日志(仅社区版)中。直接使用插件 SDK 实现的一个适用于 Python 标准库的 logging的处理器。

from collections.abc import Generator
from typing import Any
from dify_plugin import Tool
from dify_plugin.entities.tool import ToolInvokeMessage


# 导入 logging 和自定义处理器
import logging
from dify_plugin.config.logger_format import plugin_logger_handler

# 使用自定义处理器设置日志
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
logger.addHandler(plugin_logger_handler)


class LoggerDemoTool(Tool):
    def _invoke(self, tool_parameters: dict[str, Any]) -> Generator[ToolInvokeMessage]:

        # 以不同级别输出日志信息
        logger.info("This is an INFO log message.")
        logger.warning("This is a WARNING log message.")
        logger.error("This is an ERROR log message.")

        yield self.create_text_message("Hello, Dify!")

进阶指南 2 - 反向调用

Dify 插件正常情况下是被调用方,但是也可以反向调用,即插件调用 Dify 主平台上的服务

  • App(访问应用数据)
  • Model(调用平台内的模型)
  • Tool(调用平台内的其它工具插件)
  • Node(调用工作流应用中的节点)

本质上来说,就是提供了一套 Python 的 API,能够直接调用到 Dify 中的服务

App

插件支持调用 Dify 中的应用服务。使用 self.session.app调用。

包括三种接口类型:

  • chat 接口:用于 Chatbot、Agent、Chatflow 应用
  • workflow 接口:用于 Workflow 应用
  • completion 接口:用于文本生成应用

Model

插件反向调用 Dify 上的 Model,利用 self.session.model来调用模型。包括调用 LLM、Summary、TextEmbedding、Rerank、TTS、Speech2Text 和 Moderation 模型的具体方法。

注意:如果调用的 LLM 不支持工具调用,那么传递的工具调用参数就会失效。

下面是直接调用 llm 的例子:

from collections.abc import Generator
from typing import Any

from dify_plugin import Tool
from dify_plugin.entities.model.llm import LLMModelConfig
from dify_plugin.entities.tool import ToolInvokeMessage
from dify_plugin.entities.model.message import SystemPromptMessage, UserPromptMessage

class LLMTool(Tool):
    def _invoke(self, tool_parameters: dict[str, Any]) -> Generator[ToolInvokeMessage]:
        response = self.session.model.llm.invoke(
            model_config=LLMModelConfig(
                provider='openai',
                model='gpt-4o-mini',
                mode='chat',
                completion_params={}
            ),
            prompt_messages=[
                SystemPromptMessage(
                    content='you are a helpful assistant'
                ),
                UserPromptMessage(
                    content=tool_parameters.get('query')
                )
            ],
            stream=True
        )

        for chunk in response:
            if chunk.delta.message:
                assert isinstance(chunk.delta.message.content, str)
                yield self.create_text_message(text=chunk.delta.message.content)

Tool

插件可以调用 Dify 中的其他插件。比如某个场景下,A 工具调用后,还需要调用 B 工具来完善。利用 self.session.tool方法即可解决。支持调用 buildin 工具(内置的工具)、api(自定义工具)、workflow(把工作流当做工具)。

Node

插件可以调用 Dify 中的工作流中的节点(就两个)。Workflow 中的 ParameterExtractorQuestionClassifier 节点封装了复杂的提示词和代码逻辑,能够通过 LLM 完成难以用硬编码解决的任务。使用 self.session.workflow_node来调用。

参考

1.多语言 README - Dify Docs

相关博文

1.第 1 篇 Linux 下部署 Dify 1.7.1
2.第 2 篇 Dify 插件离线安装
3.第 3 篇 Dify 入门示例 - 聊天助手
4.第 4 篇 Dify 示例:数据库执行Agent
5.第 5 篇 Dify 报错解决:The length of output variable xxx must be less than 30 elements
6.第 6 篇 Dify 接入大模型并使用
7.第 7 篇 Dify 应用介绍 + 聊天助手&Agent 应用关键点说明
8.第 8 篇 RAG 必知概念及原理详解
9.第 9 篇 Dify 知识库原理详解
10.第 10 篇 Dify 知识库手把手案例
11.第 11 篇 Dify 入坑记录:插件安装报错,[ERROR]init environment failed_ failed to install dependencies
12.第 12 篇 Dify 入坑记录:database插件连接未关闭
13.第 13 篇 Dify 工作流 详解
14.第 14 篇 Dify 知识库检索,如何返回完整文档?
15.第 15 篇 Dify 应用发布与被集成的7种方式
16.第 16 篇 Dify 记忆 & mem0 插件实战
17.第 17 篇 Dify 1.7.1 → 1.11.1 完整升级指南:避开那些必踩的坑

18.第 18 篇 Dify 工具调用报错:tool invoke error validation error for VariableMessage_n Value error, Only basic
19.第 19 篇 手把手教你开发 Dify 插件:从零实现一个时间工具
20.第 20 篇 Dify插件开发进阶:常规开发 & 反向调用

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐