crewai进一步功能的实现
齁齁齁齁齁齁齁齁齁齁齁齁齁
crewai进一步功能的实现
Memory功能部署
crewAI Memory 开启功能需要crew里进行定义:
Crew(memory=True)
这个时候会开启一整套记忆系统
- Short-term Memory: 记住当前执行过程中的上下文。
- Long-term Memory: 只要数据库存在,甚至可以记住几天前运行的结果(基于 Vector Store)。
- Entity Memory: 记住特定实体(如“项目A”的主题是什么)。
- Context Memory:它和短期记忆很像,但关注的是:语境一致性,而不是内容本身
长期记忆通过向量数据库(Vector Store)以文本 → embedding → 向量的形式进行存储
这套记忆系统中,有entity memory和long-term memory是需要进行embedder设置的,其他都可以通过只开启memory来解决
衍生问题
如果你需要开启memory功能的话,你就需要使用内嵌的embedder功能
对于embedding provider,crewai的默认行为定义为:
embedder = {
"provider": "openai",
"config": {
"model": "text-embedding-3-small"
}
}
当然你也可以选择其他发行商的内嵌ai,但是问题也就出在这。我远控的机子是无法访问外网的,所以这里理所当然我第一时间想到的是使用huggingface的本地embedding库。可是只要你是使用他给的内嵌方法填入的话,crewai都会默认你使用的是线上的transformer库,所以会想你索要api_key,于是我们又想到了用猴子补丁来试一试能不能绕过这个判断。
我们设置了一个假的环境变量key,希望他在判断之后再去使用本地的库模型。但是依旧行不通,它认死了必须要走线上的链接方式。后面我下载了proxychain去访问外网,然后去调用线上的key看看会不会好一点,又由于找的节点都是一些免费的节点,所以链接不是很稳定,且llm的模型又要连接到上交的大模型那边,我怀疑他们会封ip,所以实际下来效果并不好,可能才跑一会链接就会断开来。
解决方案
最后在不断的尝试之下,我发现crewai的memory其实并不是只能通过
memory=ture
来实现的,我们也可以通过
memory=True,
memory_config={
"use_short_term": True,
"use_long_term": True,
"use_entity": False,
}
这样来单独控制每个功能的开关,这也就意味着我们能对entity_memory使用的embedder进行自定义设置,所以我们自己封装了一个embedder,放在本地,然后再开启entity_memory的时候对其进行调用,实测下来终于没有问题了,且本地的数据会放在同一目录下的另一个文件下,方便ai后续的阅读记忆,修改代码如下
import os
import numpy as np
from pathlib import Path
from typing import Any, Optional
from uuid import uuid4
import chromadb
from chromadb.config import Settings
from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity
from crewai import Agent, Task, Crew, Process, LLM
from crewai.memory.entity.entity_memory import EntityMemory
from crewai.memory.short_term.short_term_memory import ShortTermMemory
from crewai.memory.storage.rag_storage import RAGStorage
from crewai.utilities.paths import db_storage_path
# =====================================================
# 1. 基础配置
# =====================================================
EMBEDDING_MODEL = os.getenv(
"LOCAL_EMBEDDING_MODEL",
"/path/to/local/sentence-transformers/all-MiniLM-L6-v2"
)
# CrewAI Memory 存储目录
os.environ["CREWAI_STORAGE_DIR"] = "./crewai_memory_data"
Path(os.environ["CREWAI_STORAGE_DIR"]).expanduser().mkdir(parents=True, exist_ok=True)
def storage_snapshot(title: str):
base = Path(os.environ["CREWAI_STORAGE_DIR"]).resolve()
print(f"\n[{title}] CREWAI_STORAGE_DIR={base}")
if not base.exists():
print("- (目录尚未创建)")
return
items = list(base.iterdir())
print(f"- 共 {len(items)} 项")
# =====================================================
# 2. 自定义本地 Chroma Storage
# =====================================================
class LocalChromaStorage(RAGStorage):
def __init__(self, type: str, model_path: str, **kwargs):
self._model_path = model_path
self._collection = None
super().__init__(type=type, **kwargs)
def _initialize_app(self):
from chromadb.utils.embedding_functions import SentenceTransformerEmbeddingFunction
base_path = Path(os.environ["CREWAI_STORAGE_DIR"]).resolve()
persist_path = str((base_path / self.type).resolve())
client = chromadb.PersistentClient(
path=persist_path,
settings=Settings(allow_reset=True),
)
embedding_fn = SentenceTransformerEmbeddingFunction(
model_name=self._model_path
)
self._collection = client.get_or_create_collection(
name="memory",
embedding_function=embedding_fn,
metadata={"hnsw:space": "cosine"},
)
self._client = self._collection
def save(self, data: Any, metadata: Optional[dict] = None, **kwargs):
if self._collection is None:
self._initialize_app()
ids = [str(uuid4())]
documents = [str(data)]
metadatas = [metadata or {}]
self._collection.add(
ids=ids,
documents=documents,
metadatas=metadatas
)
def search(self, query: str, limit: int = 3, **kwargs):
if self._collection is None:
self._initialize_app()
results = self._collection.query(
query_texts=[query],
n_results=limit,
include=["documents", "distances"],
)
docs = results["documents"][0]
dists = results["distances"][0]
return [
{"content": d, "score": 1 - dist}
for d, dist in zip(docs, dists)
]
# =====================================================
# 3. LLM 配置
# =====================================================
LLM_API_KEY = os.getenv("LLM_API_KEY", "YOUR_API_KEY")
LLM_BASE_URL = os.getenv("LLM_BASE_URL", "https://example-llm-api/v1")
LLM_MODEL = os.getenv("LLM_MODEL", "your-llm-model")
llm = LLM(
model=f"openai/{LLM_MODEL}",
base_url=LLM_BASE_URL,
api_key=LLM_API_KEY
)
# =====================================================
# 4. 本地 RAG 示例
# =====================================================
embed_model = SentenceTransformer(EMBEDDING_MODEL)
documents = [
"工业控制系统安全的核心是可用性。",
"冬季安全演练重点包括应急响应和日志分析。",
"PLC 固件漏洞是常见风险之一。",
]
doc_embeddings = embed_model.encode(documents)
def rag_search(query, top_k=2):
q_emb = embed_model.encode([query])
sims = cosine_similarity(q_emb, doc_embeddings)[0]
idx = sims.argsort()[-top_k:][::-1]
return [documents[i] for i in idx]
# =====================================================
# 5. Agent + Memory 测试
# =====================================================
agent = Agent(
role="记忆测试 Agent",
goal="基于资料回答问题并记住关键信息",
llm=llm,
memory=True,
verbose=True
)
query = "冬季安全演练的重点是什么?"
context = rag_search(query)
task_1 = Task(
description="根据资料回答问题并记住结论。\n" + "\n".join(context),
agent=agent
)
task_2 = Task(
description="不提供资料,直接回答:冬季安全演练的重点是什么?",
agent=agent
)
short_term = ShortTermMemory(
storage=LocalChromaStorage("short_term", EMBEDDING_MODEL)
)
entity_mem = EntityMemory(
storage=LocalChromaStorage("entities", EMBEDDING_MODEL)
)
crew = Crew(
agents=[agent],
tasks=[task_1, task_2],
process=Process.sequential,
short_term_memory=short_term,
entity_memory=entity_mem,
mem
主要就是通过自定义了一个 LocalChromaStorage来帮助我们在本地设置embedder
1) 为什么要写 LocalChromaStorage这个类?
CrewAI 的短期记忆和实体记忆本质都依赖一个“RAG 存储层”:
- save(…) :把文本(以及元数据)写到向量库
- search(query, …) :把 query 做 embedding,然后相似度检索出相关记忆条目
之前报错的根源就是: 默认 storage 在初始化 embedding function 时走了远程 provider(OpenAI 或 HuggingFace API),所以要求对应的环境变量 key 。
解决办法:把这层 storage 换成“完全本地”的实现。
2) init
- type :区分是 short_term 还是 entities ,用于落盘路径隔离。
- model_path :本地 SentenceTransformer 模型路径。
- self._collection = None :延迟初始化(第一次 save/search 才真正打开/创建 collection)。
- super().init(…) :让 CrewAI 的 RAGStorage 父类把必要字段初始化好(保持兼容)。
3) _initialize_app
- SentenceTransformerEmbeddingFunction :这是 Chroma 自带的 本地 embedding function ,直接加载 sentence-transformers 模型在本机算向量。
- base_path = Path(os.environ[“CREWAI_STORAGE_DIR”])… :明确落盘到你设置的目录里(不依赖外部默认规则)。
- persist_path = base_path / self.type / agents :每种 memory 一个目录,比如:
- …/short_term/…
- …/entities/…
- chromadb.PersistentClient(path=persist_path, …) :使用本地持久化 Chroma。
- get_or_create_collection(… embedding_function=embedding_fn …) :绑定本地 embedding function,这一步决定了后续 query/add 全都离线。
- self._client = self._collection :兼容 CrewAI 父类内部可能访问 _client 的地方。
4) save
- 把 CrewAI 传进来的各种结构(list/dict/其它)统一成 (text, metadata) 列表。
- 用 uuid4() 生成 ids。
- self._collection.add(…) :写入 Chroma,collection 会自动调用本地 embedding function 算向量并落盘。
5) search
- self._collection.query(query_texts=[query], …) :Chroma 会用本地 embedding function 对 query 编码后检索。
- 返回结构里我同时提供了:
- content :CrewAI 的 contextual memory 里会用 result[“content”] 拼上下文(你之前 KeyError 就是这里)。
- text :保留一个别名,避免不同版本/不同调用路径读取 text 时丢字段。
RAG功能部署
RAG功能也是基于在embedder的基础上,将我们的prompt或者是其他一些有价值的信息写入到本地文件中,在使用crewai的时候直接让ai利用argtools或者是我们自己封装的工具来进行文件的阅读,从而快速方便的获取重要的前置消息。
衍生问题
在实际部署中,我是先解决rag的部署才通过相同的办法去部署memory的功能的,所以这里部署的解决方案和memory功能的思路是相对一致的。略有不同的是,虽然rag依旧是建立在embedder模型的基础之上。但是相对于memory功能,他的embedder内嵌功能是更加灵活的,因为他不必要在明显crew的时候写入到config里,他可以自己进行自定义封装。
在部署中我发现如果在使用官方的文件读取工具,即readtxttools的时候,crewai也会强制要求你进行线上api的调用,所以这里我们继续使用basetools的方法来将本地文件通过embedder切片成相应的向量之中
解决方案
embedding的核心功能就是通过将文本转成向量的方式来存储,在寻找的时候通过对比querry和本地向量的相似度来寻找答案,所以这里我们可以自己封装一个建议的rag_tools
import os
import re
from sentence_transformers import SentenceTransformer
import chromadb
from chromadb.utils import embedding_functions
from crewai import Agent, Task, Crew, Process, LLM
from crewai.tools import BaseTool
from pydantic import BaseModel, Field
from typing import Type, List
# ==============================
# 1. 基础配置
# ==============================
# 本地 embedding 模型(HuggingFace 缓存路径)
EMBEDDING_MODEL_PATH = (
"/path/to/.cache/huggingface/hub/"
"models--sentence-transformers--all-MiniLM-L6-v2/"
"snapshots/xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
)
# 本地文档路径
DOC_PATH = "/path/to/project/ICS_Security_Plan.txt"
# LLM 配置(示例占位)
LLM_API_KEY = "YOUR_API_KEY_HERE"
LLM_BASE_URL = "https://api.example.com/v1"
LLM_MODEL = "your-llm-model-name"
# Chroma 集合名
COLLECTION_NAME = "ics_rag_demo_collection"
# ==============================
# 2. 初始化 Embedding & 向量库
# ==============================
print("✅ [Init] 加载本地 embedding 模型...")
embedding_model = SentenceTransformer(EMBEDDING_MODEL_PATH)
embedding_function = embedding_functions.SentenceTransformerEmbeddingFunction(
model_name=EMBEDDING_MODEL_PATH
)
print("✅ [Init] 初始化 Chroma 向量库...")
client = chromadb.Client()
# 若存在旧集合则删除
try:
client.delete_collection(COLLECTION_NAME)
except:
pass
collection = client.create_collection(
name=COLLECTION_NAME,
embedding_function=embedding_function
)
# ==============================
# 3. 文档语义切分
# ==============================
def semantic_split(text: str) -> List[str]:
"""
按文档结构进行语义切分(例如:按“第X周”)
"""
chunks = []
lines = text.split("\n")
current_chunk = []
for line in lines:
line = line.strip()
if not line:
continue
if line.startswith("【") and "周】" in line:
if current_chunk:
chunks.append("\n".join(current_chunk))
current_chunk = [line]
else:
current_chunk.append(line)
if current_chunk:
chunks.append("\n".join(current_chunk))
return chunks
print("✅ [Index] 读取文档并写入向量库...")
if os.path.exists(DOC_PATH):
with open(DOC_PATH, "r", encoding="utf-8") as f:
text = f.read()
chunks = semantic_split(text)
collection.add(
documents=chunks,
ids=[f"chunk-{i}" for i in range(len(chunks))]
)
print(f"✅ 已入库 {len(chunks)} 个语义片段")
else:
raise FileNotFoundError("文档不存在")
# ==============================
# 4. 自定义本地 RAG Tool
# ==============================
class SearchInput(BaseModel):
query: str = Field(..., description="检索问题")
class LocalRAGTool(BaseTool):
name: str = "Local ICS Plan Search Tool"
description: str = "用于从本地工控安全学习计划中检索相关内容"
args_schema: Type[BaseModel] = SearchInput
def _run(self, query: str) -> str:
results = collection.query(
query_texts=[query],
n_results=5
)
docs = results.get("documents", [[]])[0]
ids = results.get("ids", [[]])[0]
if not docs:
return "未找到相关内容"
# 按 chunk 顺序重排
combined = list(zip(ids, docs))
def extract_num(x):
m = re.search(r"chunk-(\d+)", x)
return int(m.group(1)) if m else -1
combined.sort(key=lambda x: extract_num(x[0]))
ordered_docs = [doc for _, doc in combined]
return "\n\n---\n\n".join(ordered_docs)
# ==============================
# 5. CrewAI Agent & Task
# ==============================
llm = LLM(
model=f"openai/{LLM_MODEL}",
base_url=LLM_BASE_URL,
api_key=LLM_API_KEY
)
rag_tool = LocalRAGTool()
agent = Agent(
role="ICS Security Teaching Assistant",
goal="基于学习计划准确回答问题",
backstory="你熟悉完整的工控安全学习计划,请基于文档回答。",
tools=[rag_tool],
llm=llm,
verbose=True
)
task = Task(
description="请回答:第二周的核心目标和关键任务是什么?",
expected_output="第二周的目标与任务列表",
agent=agent
)
crew = Crew(
agents=[agent],
tasks=[task],
process=Process.sequential,
verbose=True
)
# ==============================
# 6. 运行
# ==============================
result = crew.kickoff()
print("\n====== 最终回答 ======\n")
print(result)
此外,由于是使用的自定义ragtools,所以工具在对文本进行切片的时候是按照权重来拍顺序的,这会导致我们文件最后还原的时候是乱序的,所以我们又加了一个id序列号,让切片还原的时候按照原来的顺序进行排序还原,这样就不会乱序了
更多推荐



所有评论(0)