AI代码开发宝库系列:Dify本地化部署和应用
Dify平台以"开源+低代码+高灵活"为核心,通过Workflow和Chatflow两种编排方式,结合RAG知识库与外部工具集成,大大降低了LLM应用的开发门槛。无论是简单的联网搜索、文生图任务,还是复杂的智能客服、文档分析系统,都可通过拖拽节点、配置参数快速实现。同时,Dify支持本地化部署和API集成,兼顾了企业数据安全与外部系统对接需求,适用于办公自动化、智能客服、垂直领域问答等多种场景。赶
Dify本地化部署和应用详解:从入门到精通的AI应用开发指南

在AI技术飞速发展的今天,如何快速构建自己的AI应用成为了许多开发者和企业关注的焦点。今天我要给大家介绍一个强大的开源平台——Dify,它能够让你无需复杂编码就能轻松构建生成式AI原生应用,而且支持本地化部署,保障数据安全!
一、Dify平台是什么?为什么选择它?
Dify是一个开源的LLM(大语言模型)应用开发平台,它的定位非常明确:让开发者无需复杂编码即可构建生成式AI原生应用,上手难度甚至低于LangChain。它支持Agent构建、AI工作流编排、RAG检索、模型管理,兼容主流大模型(OpenAI、Qwen、Anthropic等)。
Dify的五大应用类型
Dify支持五种不同类型的应用,满足各种场景需求:
-
聊天助手:对话式交互助手,适合构建客服机器人
-
文本生成应用:专注文本创作、分类、翻译等单任务
-
Agent:可分解任务、调用工具的智能助手
-
Chatflow(对话流):多轮对话场景,支持上下文记忆与动态编排
-
Workflow(工作流):自动化、批处理类单轮任务,单向生成结果
Chatflow vs Workflow:如何选择?
| 维度 | Chatflow(对话流) | Workflow(工作流) |
|---|---|---|
| 核心场景 | 多轮交互式对话(如智能客服、AI助教) | 自动化批处理(如文档分析、联网搜索) |
| 交互特性 | 支持上下文记忆,动态响应用户输入 | 单轮输入输出,无需实时交互 |
| 专属功能 | 含Answer节点,支持流式输出、用户引导建议 | 侧重节点串联,支持批量数据处理 |
| 适用场景 | 智能客服、语义搜索、任务引导类应用 | 文档解析、数据处理、自动化生成类任务 |
二、Dify本地化部署:三步搞定私有化部署
对于企业用户来说,数据安全是最重要的考量因素之一。Dify支持本地化部署,让你的数据完全掌握在自己手中。
推荐部署方式:Docker Compose
前提条件:安装Docker、Docker Compose与Git。
核心步骤:
-
克隆代码仓库:
git clone https://github.com/langgenius/dify.git -
配置环境变量:进入
dify/docker目录,复制.env.example为.env,配置访问地址(APP_URL)、数据库信息、模型API Key -
启动服务:
docker compose up -d(后台运行) -
初始化访问:首次访问
http://你的IP地址/install设置管理员账户,后续通过http://你的IP地址登录
维护操作:更新用git pull + docker compose pull,重启服务用docker compose up -d。
三、四大实战案例:从简单到复杂的应用构建
案例1:LLM联网搜索(Workflow)
目标:用户输入问题,提取关键字后通过Tavily Search检索,最终总结结果。
核心流程:
-
开始节点:接收用户问题(如"黄金价格和哪些因素有关")
-
LLM节点1:提取关键字(如"黄金价格 影响因素")
-
Tavily Search节点:用关键字检索网页内容(需提前配置API Key并授权)
-
LLM节点2:总结检索结果,输出结构化回答
案例2:古诗词文生图(Workflow)
目标:输入古诗,生成对应风格图像。
核心流程:
-
开始节点:接收古诗词(如"离离原上草")
-
LLM节点1:根据诗句描绘画面(如"辽阔草原,绿草如茵,微风吹拂形成绿浪")
-
LLM节点2:将画面描述译为英文,添加风格前缀(如"ancient china")
-
Flux工具节点:调用SiliconFlow提供的Flux模型,按英文提示词生成图像
-
结束节点:返回图像URL
案例3:智能客服(Chatflow)
目标:按"营销咨询/投诉处理/其他"分类响应,自动解答证券相关问题或处理登录故障等投诉。
核心流程:
-
开始节点:接收用户查询(如"什么是竞价盘?""我登录失败了")
-
问题分类器(LLM):将问题分为三类
-
分支处理:根据分类调用不同知识库或处理逻辑
-
共情回复:提供解决方案并给出建议
案例4:智能文档分析助手(Workflow+MinerU)
目标:用户上传含图表、公式的PDF(如科研论文),可针对内容提问。
核心流程:
-
开始节点:接收上传的PDF文件和用户问题
-
MinerU插件节点:解析PDF内容(支持复杂排版、图表提取)
-
LLM节点:基于解析后的文本,回答用户问题
四、Dify API集成:让应用开发更简单
Dify API调用(Python)
Dify为每种应用类型提供了专属的API端点:
-
聊天应用API(/chat-messages):支持多轮对话,通过conversation_id维持上下文
-
完成应用API(/completion-messages):适用于单轮文本生成,不保留对话状态
-
工作流应用API(/workflows/run):触发已发布的工作流,输入参数通过inputs传递
我们还开发了一个智能客户端,可以自动检测应用类型并选择合适的端点:
# 创建客户端 client = DifyAgentClient(BASE_URL, API_KEY) # 智能调用(自动检测应用类型) result = client.chat_completion( user_input="你的问题", user_id="用户ID", stream=False, app_type="auto" # 自动检测 )
五、Dify vs Coze:哪个更适合你?
| 对比维度 | Dify | Coze |
|---|---|---|
| 部署方式 | 支持本地化部署 | 云端为主,私有化部署较新 |
| 应用编排 | Chatflow + Workflow双模式 | 单一流程编排 |
| 知识库功能 | 支持RAG检索增强 | 支持知识库管理 |
| 插件生态 | HTTP请求自定义插件 | 丰富的官方插件 |
| API集成 | 多端点细粒度控制 | 统一API接口 |
| 前端界面 | 功能全面但稍复杂 | 界面简洁易用 |
六、Dify使用最佳实践
知识库优化技巧
-
文档预处理:上传文档前合理分段(如用###分隔)
-
检索策略:选择混合检索提升准确性
-
参数调优:根据内容类型调整Top K值(问答类设3-5)
工具配置要点
-
API Key管理:第三方工具需提前获取API Key并完成授权
-
网络连通性:确保Dify服务能访问外部工具API
-
错误处理:配置合理的超时和重试机制
调试技巧
-
分步试运行:工作流/对话流需分步试运行,查看节点输出日志
-
API调试:调用时打印请求和响应详情,便于问题排查
-
Key保护:API调用时避免明文暴露Key
七、未来应用场景展望
企业级应用
-
智能客服:自动回答产品问题、处理工单
-
知识管理:企业文档智能检索、知识推送
-
办公自动化:会议纪要生成、日报自动撰写
内容创作
-
社交媒体运营:自动发布、内容优化
-
电商营销:商品描述生成、评论分析
-
教育培训:个性化学习计划、答疑解惑
生活助手
-
个人助理:日程管理、信息查询
-
健康管理:饮食建议、运动计划
-
旅行规划:路线推荐、景点介绍
八、完整代码示例
以下是一个完整的Dify Agent客户端实现,支持聊天、完成和工作流三种应用类型:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Dify Agent 客户端
用于调用Dify平台的Agent API
支持聊天、完成和工作流应用类型
"""
import requests
import json
import time
from typing import Dict, Any, Optional
class DifyAgentClient:
"""Dify Agent API 客户端类"""
def __init__(self, base_url: str, api_key: str):
"""
初始化Dify Agent客户端
Args:
base_url (str): Dify API的基础URL
api_key (str): 应用的API密钥
"""
self.base_url = base_url.rstrip('/')
self.api_key = api_key
self.headers = {
'Authorization': f'Bearer {api_key}',
'Content-Type': 'application/json',
'Accept': 'application/json'
}
def chat_completion(self,
user_input: str,
user_id: str = "default_user",
conversation_id: Optional[str] = None,
stream: bool = False,
app_type: str = "auto") -> Dict[str, Any]:
"""
调用Dify Agent进行对话
Args:
user_input (str): 用户输入的消息
user_id (str): 用户ID,默认为"default_user"
conversation_id (str, optional): 会话ID,用于维持对话上下文
stream (bool): 是否使用流式响应,默认为False
app_type (str): 应用类型,"chat"、"completion"、"workflow"或"auto"(自动检测)
Returns:
Dict[str, Any]: API响应结果
"""
if app_type == "auto":
# 先尝试chat端点,如果失败则尝试completion端点,最后尝试workflow端点
result = self._try_chat_endpoint(user_input, user_id, conversation_id, stream)
if result.get("error") and "not_chat_app" in str(result.get("message", "")):
print("检测到非聊天应用,切换到completion端点...")
result = self._try_completion_endpoint(user_input, user_id, stream)
if result.get("error") and "app_unavailable" in str(result.get("message", "")):
print("检测到非完成应用,切换到工作流端点...")
return self._try_workflow_endpoint(user_input, user_id, stream)
return result
elif app_type == "chat":
return self._try_chat_endpoint(user_input, user_id, conversation_id, stream)
elif app_type == "completion":
return self._try_completion_endpoint(user_input, user_id, stream)
elif app_type == "workflow":
return self._try_workflow_endpoint(user_input, user_id, stream)
else:
return {
"error": True,
"message": f"不支持的应用类型: {app_type}"
}
def _try_chat_endpoint(self, user_input: str, user_id: str, conversation_id: Optional[str], stream: bool) -> Dict[str, Any]:
"""尝试使用chat端点"""
url = f"{self.base_url}/chat-messages"
# 构建基础payload
payload = {
"inputs": {},
"query": user_input,
"response_mode": "streaming" if stream else "blocking",
"user": user_id
}
# 只有当conversation_id不为None且不为空字符串时才添加
if conversation_id:
payload["conversation_id"] = conversation_id
try:
if stream:
return self._handle_streaming_response(url, payload)
else:
return self._handle_blocking_response(url, payload)
except requests.exceptions.RequestException as e:
return {
"error": True,
"message": f"请求失败: {str(e)}"
}
except Exception as e:
return {
"error": True,
"message": f"未知错误: {str(e)}"
}
def _try_completion_endpoint(self, user_input: str, user_id: str, stream: bool) -> Dict[str, Any]:
"""尝试使用completion端点"""
url = f"{self.base_url}/completion-messages"
# 根据官方文档,completion端点的正确格式
payload = {
"inputs": {}, # 空对象,不是包含query的对象
"response_mode": "streaming" if stream else "blocking",
"user": user_id
}
try:
if stream:
return self._handle_streaming_response(url, payload)
else:
return self._handle_blocking_response(url, payload)
except requests.exceptions.RequestException as e:
return {
"error": True,
"message": f"请求失败: {str(e)}"
}
except Exception as e:
return {
"error": True,
"message": f"未知错误: {str(e)}"
}
def _try_workflow_endpoint(self, user_input: str, user_id: str, stream: bool) -> Dict[str, Any]:
"""尝试使用workflow端点"""
url = f"{self.base_url}/workflows/run"
# 工作流端点的payload格式
payload = {
"inputs": {"query": user_input}, # 工作流通常需要在inputs中传递参数
"response_mode": "streaming" if stream else "blocking",
"user": user_id
}
try:
if stream:
return self._handle_workflow_streaming_response(url, payload)
else:
return self._handle_workflow_blocking_response(url, payload)
except requests.exceptions.RequestException as e:
return {
"error": True,
"message": f"请求失败: {str(e)}"
}
except Exception as e:
return {
"error": True,
"message": f"未知错误: {str(e)}"
}
def run_workflow(self,
inputs: Dict[str, Any],
user_id: str = "default_user",
stream: bool = False) -> Dict[str, Any]:
"""
直接运行工作流
Args:
inputs (dict): 工作流输入参数
user_id (str): 用户ID,默认为"default_user"
stream (bool): 是否使用流式响应,默认为False
Returns:
Dict[str, Any]: API响应结果
"""
url = f"{self.base_url}/workflows/run"
payload = {
"inputs": inputs,
"response_mode": "streaming" if stream else "blocking",
"user": user_id
}
try:
if stream:
return self._handle_workflow_streaming_response(url, payload)
else:
return self._handle_workflow_blocking_response(url, payload)
except requests.exceptions.RequestException as e:
return {
"error": True,
"message": f"请求失败: {str(e)}"
}
except Exception as e:
return {
"error": True,
"message": f"未知错误: {str(e)}"
}
def completion_message(self,
user_input: str,
user_id: str = "default_user",
stream: bool = False,
inputs: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""
直接调用completion端点,支持自定义inputs
Args:
user_input (str): 用户输入的消息
user_id (str): 用户ID,默认为"default_user"
stream (bool): 是否使用流式响应,默认为False
inputs (dict, optional): 自定义输入参数
Returns:
Dict[str, Any]: API响应结果
"""
url = f"{self.base_url}/completion-messages"
# 如果没有提供inputs,尝试不同的格式
if inputs is None:
# 尝试多种可能的inputs格式
possible_inputs = [
{}, # 空对象(官方文档格式)
{"text": user_input}, # 文本格式
{"query": user_input}, # 查询格式
{"input": user_input}, # 输入格式
{"prompt": user_input} # 提示格式
]
else:
possible_inputs = [inputs]
for input_format in possible_inputs:
payload = {
"inputs": input_format,
"response_mode": "streaming" if stream else "blocking",
"user": user_id
}
try:
if stream:
result = self._handle_streaming_response(url, payload)
else:
result = self._handle_blocking_response(url, payload)
# 如果成功,返回结果
if not result.get("error"):
return result
# 如果是app_unavailable错误,尝试下一种格式
if "app_unavailable" in str(result.get("message", "")):
continue
else:
# 其他错误直接返回
return result
except Exception as e:
continue
# 所有格式都失败了
return {
"error": True,
"message": "所有输入格式都失败了,请检查应用配置和API密钥"
}
def _handle_blocking_response(self, url: str, payload: Dict[str, Any]) -> Dict[str, Any]:
"""处理阻塞式响应"""
try:
response = requests.post(url, headers=self.headers, json=payload, timeout=60)
# 打印调试信息
print(f"请求URL: {url}")
print(f"请求头: {self.headers}")
print(f"请求体: {json.dumps(payload, ensure_ascii=False, indent=2)}")
print(f"响应状态码: {response.status_code}")
if response.status_code != 200:
print(f"响应内容: {response.text}")
return {
"error": True,
"message": f"HTTP {response.status_code}: {response.text}"
}
result = response.json()
return {
"error": False,
"data": result,
"answer": result.get("answer", ""),
"conversation_id": result.get("conversation_id", ""),
"message_id": result.get("message_id", "")
}
except requests.exceptions.RequestException as e:
return {
"error": True,
"message": f"网络请求异常: {str(e)}"
}
except json.JSONDecodeError as e:
return {
"error": True,
"message": f"JSON解析错误: {str(e)}"
}
def _handle_workflow_blocking_response(self, url: str, payload: Dict[str, Any]) -> Dict[str, Any]:
"""处理工作流阻塞式响应"""
try:
response = requests.post(url, headers=self.headers, json=payload, timeout=60)
# 打印调试信息
print(f"请求URL: {url}")
print(f"请求头: {self.headers}")
print(f"请求体: {json.dumps(payload, ensure_ascii=False, indent=2)}")
print(f"响应状态码: {response.status_code}")
if response.status_code != 200:
print(f"响应内容: {response.text}")
return {
"error": True,
"message": f"HTTP {response.status_code}: {response.text}"
}
result = response.json()
# 工作流响应格式可能不同
outputs = result.get("data", {}).get("outputs", {})
# 尝试从outputs中提取答案
answer = ""
if isinstance(outputs, dict):
# 常见的输出字段名
for key in ["answer", "result", "output", "text", "response"]:
if key in outputs:
answer = str(outputs[key])
break
# 如果没有找到,使用第一个值
if not answer and outputs:
answer = str(list(outputs.values())[0])
return {
"error": False,
"data": result,
"answer": answer,
"outputs": outputs,
"workflow_run_id": result.get("workflow_run_id", ""),
"task_id": result.get("task_id", "")
}
except requests.exceptions.RequestException as e:
return {
"error": True,
"message": f"网络请求异常: {str(e)}"
}
except json.JSONDecodeError as e:
return {
"error": True,
"message": f"JSON解析错误: {str(e)}"
}
def _handle_streaming_response(self, url: str, payload: Dict[str, Any]) -> Dict[str, Any]:
"""处理流式响应"""
try:
response = requests.post(url, headers=self.headers, json=payload, stream=True, timeout=60)
# 打印调试信息
print(f"请求URL: {url}")
print(f"请求头: {self.headers}")
print(f"请求体: {json.dumps(payload, ensure_ascii=False, indent=2)}")
print(f"响应状态码: {response.status_code}")
if response.status_code != 200:
print(f"响应内容: {response.text}")
return {
"error": True,
"message": f"HTTP {response.status_code}: {response.text}"
}
full_answer = ""
conversation_id = ""
message_id = ""
for line in response.iter_lines():
if line:
line = line.decode('utf-8')
if line.startswith('data: '):
try:
data = json.loads(line[6:])
if data.get('event') == 'message':
full_answer += data.get('answer', '')
elif data.get('event') == 'message_end':
conversation_id = data.get('conversation_id', '')
message_id = data.get('id', '')
except json.JSONDecodeError:
continue
return {
"error": False,
"answer": full_answer,
"conversation_id": conversation_id,
"message_id": message_id
}
except requests.exceptions.RequestException as e:
return {
"error": True,
"message": f"网络请求异常: {str(e)}"
}
def _handle_workflow_streaming_response(self, url: str, payload: Dict[str, Any]) -> Dict[str, Any]:
"""处理工作流流式响应"""
try:
response = requests.post(url, headers=self.headers, json=payload, stream=True, timeout=60)
# 打印调试信息
print(f"请求URL: {url}")
print(f"请求头: {self.headers}")
print(f"请求体: {json.dumps(payload, ensure_ascii=False, indent=2)}")
print(f"响应状态码: {response.status_code}")
if response.status_code != 200:
print(f"响应内容: {response.text}")
return {
"error": True,
"message": f"HTTP {response.status_code}: {response.text}"
}
full_answer = ""
workflow_run_id = ""
task_id = ""
final_outputs = {}
for line in response.iter_lines():
if line:
line = line.decode('utf-8')
if line.startswith('data: '):
try:
data = json.loads(line[6:])
event = data.get('event', '')
if event == 'workflow_started':
workflow_run_id = data.get('workflow_run_id', '')
task_id = data.get('task_id', '')
elif event == 'workflow_finished':
final_outputs = data.get('data', {}).get('outputs', {})
# 尝试从outputs中提取答案
if isinstance(final_outputs, dict):
for key in ["answer", "result", "output", "text", "response"]:
if key in final_outputs:
full_answer = str(final_outputs[key])
break
if not full_answer and final_outputs:
full_answer = str(list(final_outputs.values())[0])
elif event == 'node_finished':
# 节点完成,可能包含中间结果
node_outputs = data.get('data', {}).get('outputs', {})
if isinstance(node_outputs, dict):
for key in ["answer", "result", "output", "text", "response"]:
if key in node_outputs:
full_answer += str(node_outputs[key])
break
except json.JSONDecodeError:
continue
return {
"error": False,
"answer": full_answer,
"outputs": final_outputs,
"workflow_run_id": workflow_run_id,
"task_id": task_id
}
except requests.exceptions.RequestException as e:
return {
"error": True,
"message": f"网络请求异常: {str(e)}"
}
def get_conversation_messages(self, conversation_id: str, user_id: str = "default_user") -> Dict[str, Any]:
"""
获取会话历史消息
Args:
conversation_id (str): 会话ID
user_id (str): 用户ID
Returns:
Dict[str, Any]: 会话消息列表
"""
url = f"{self.base_url}/messages"
params = {
"conversation_id": conversation_id,
"user": user_id
}
try:
response = requests.get(url, headers=self.headers, params=params, timeout=30)
response.raise_for_status()
result = response.json()
return {
"error": False,
"data": result
}
except requests.exceptions.RequestException as e:
return {
"error": True,
"message": f"获取会话消息失败: {str(e)}"
}
# 使用示例
if __name__ == "__main__":
# 配置你的Dify服务地址和API Key
BASE_URL = "你的dify_base_url"
API_KEY = "你的dify_api_key"
# 创建客户端
client = DifyAgentClient(BASE_URL, API_KEY)
# 智能调用示例(自动检测应用类型)
result = client.chat_completion(
user_input="你好,介绍一下你自己",
user_id="test_user",
stream=False,
app_type="auto"
)
if result.get("error"):
print(f"调用失败: {result.get('message')}")
else:
print(f"回复: {result.get('answer')}")
结语
Dify平台以"开源+低代码+高灵活"为核心,通过Workflow和Chatflow两种编排方式,结合RAG知识库与外部工具集成,大大降低了LLM应用的开发门槛。无论是简单的联网搜索、文生图任务,还是复杂的智能客服、文档分析系统,都可通过拖拽节点、配置参数快速实现。
同时,Dify支持本地化部署和API集成,兼顾了企业数据安全与外部系统对接需求,适用于办公自动化、智能客服、垂直领域问答等多种场景。
赶紧试试Dify平台吧,让你的AI创意快速变成现实!
更多推荐


所有评论(0)