crewai进一步功能的实现

Memory功能部署

crewAI Memory 开启功能需要crew里进行定义:

Crew(memory=True)

这个时候会开启一整套记忆系统

  • Short-term Memory: 记住当前执行过程中的上下文。
  • Long-term Memory: 只要数据库存在,甚至可以记住几天前运行的结果(基于 Vector Store)。
  • Entity Memory: 记住特定实体(如“项目A”的主题是什么)。
  • Context Memory:它和短期记忆很像,但关注的是:语境一致性,而不是内容本身

长期记忆通过向量数据库(Vector Store)以文本 → embedding → 向量的形式进行存储

这套记忆系统中,有entity memory和long-term memory是需要进行embedder设置的,其他都可以通过只开启memory来解决

衍生问题

如果你需要开启memory功能的话,你就需要使用内嵌的embedder功能

对于embedding provider,crewai的默认行为定义为:

embedder = {
  "provider": "openai",
  "config": {
    "model": "text-embedding-3-small"
  }
}

当然你也可以选择其他发行商的内嵌ai,但是问题也就出在这。我远控的机子是无法访问外网的,所以这里理所当然我第一时间想到的是使用huggingface的本地embedding库。可是只要你是使用他给的内嵌方法填入的话,crewai都会默认你使用的是线上的transformer库,所以会想你索要api_key,于是我们又想到了用猴子补丁来试一试能不能绕过这个判断。

我们设置了一个假的环境变量key,希望他在判断之后再去使用本地的库模型。但是依旧行不通,它认死了必须要走线上的链接方式。后面我下载了proxychain去访问外网,然后去调用线上的key看看会不会好一点,又由于找的节点都是一些免费的节点,所以链接不是很稳定,且llm的模型又要连接到上交的大模型那边,我怀疑他们会封ip,所以实际下来效果并不好,可能才跑一会链接就会断开来。

解决方案

最后在不断的尝试之下,我发现crewai的memory其实并不是只能通过

memory=ture

来实现的,我们也可以通过

    memory=True,
    memory_config={
        "use_short_term": True,
        "use_long_term": True,
        "use_entity": False, 
    }

这样来单独控制每个功能的开关,这也就意味着我们能对entity_memory使用的embedder进行自定义设置,所以我们自己封装了一个embedder,放在本地,然后再开启entity_memory的时候对其进行调用,实测下来终于没有问题了,且本地的数据会放在同一目录下的另一个文件下,方便ai后续的阅读记忆,修改代码如下

import os
import numpy as np
from pathlib import Path
from typing import Any, Optional
from uuid import uuid4

import chromadb
from chromadb.config import Settings
from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity

from crewai import Agent, Task, Crew, Process, LLM
from crewai.memory.entity.entity_memory import EntityMemory
from crewai.memory.short_term.short_term_memory import ShortTermMemory
from crewai.memory.storage.rag_storage import RAGStorage
from crewai.utilities.paths import db_storage_path

# =====================================================
# 1. 基础配置
# =====================================================

EMBEDDING_MODEL = os.getenv(
    "LOCAL_EMBEDDING_MODEL",
    "/path/to/local/sentence-transformers/all-MiniLM-L6-v2"
)

# CrewAI Memory 存储目录
os.environ["CREWAI_STORAGE_DIR"] = "./crewai_memory_data"
Path(os.environ["CREWAI_STORAGE_DIR"]).expanduser().mkdir(parents=True, exist_ok=True)

def storage_snapshot(title: str):
    base = Path(os.environ["CREWAI_STORAGE_DIR"]).resolve()
    print(f"\n[{title}] CREWAI_STORAGE_DIR={base}")
    if not base.exists():
        print("- (目录尚未创建)")
        return
    items = list(base.iterdir())
    print(f"- 共 {len(items)} 项")

# =====================================================
# 2. 自定义本地 Chroma Storage
# =====================================================

class LocalChromaStorage(RAGStorage):
    def __init__(self, type: str, model_path: str, **kwargs):
        self._model_path = model_path
        self._collection = None
        super().__init__(type=type, **kwargs)

    def _initialize_app(self):
        from chromadb.utils.embedding_functions import SentenceTransformerEmbeddingFunction

        base_path = Path(os.environ["CREWAI_STORAGE_DIR"]).resolve()
        persist_path = str((base_path / self.type).resolve())

        client = chromadb.PersistentClient(
            path=persist_path,
            settings=Settings(allow_reset=True),
        )

        embedding_fn = SentenceTransformerEmbeddingFunction(
            model_name=self._model_path
        )

        self._collection = client.get_or_create_collection(
            name="memory",
            embedding_function=embedding_fn,
            metadata={"hnsw:space": "cosine"},
        )
        self._client = self._collection

    def save(self, data: Any, metadata: Optional[dict] = None, **kwargs):
        if self._collection is None:
            self._initialize_app()

        ids = [str(uuid4())]
        documents = [str(data)]
        metadatas = [metadata or {}]

        self._collection.add(
            ids=ids,
            documents=documents,
            metadatas=metadatas
        )

    def search(self, query: str, limit: int = 3, **kwargs):
        if self._collection is None:
            self._initialize_app()

        results = self._collection.query(
            query_texts=[query],
            n_results=limit,
            include=["documents", "distances"],
        )

        docs = results["documents"][0]
        dists = results["distances"][0]

        return [
            {"content": d, "score": 1 - dist}
            for d, dist in zip(docs, dists)
        ]

# =====================================================
# 3. LLM 配置
# =====================================================

LLM_API_KEY = os.getenv("LLM_API_KEY", "YOUR_API_KEY")
LLM_BASE_URL = os.getenv("LLM_BASE_URL", "https://example-llm-api/v1")
LLM_MODEL = os.getenv("LLM_MODEL", "your-llm-model")

llm = LLM(
    model=f"openai/{LLM_MODEL}",
    base_url=LLM_BASE_URL,
    api_key=LLM_API_KEY
)

# =====================================================
# 4. 本地 RAG 示例
# =====================================================

embed_model = SentenceTransformer(EMBEDDING_MODEL)

documents = [
    "工业控制系统安全的核心是可用性。",
    "冬季安全演练重点包括应急响应和日志分析。",
    "PLC 固件漏洞是常见风险之一。",
]

doc_embeddings = embed_model.encode(documents)

def rag_search(query, top_k=2):
    q_emb = embed_model.encode([query])
    sims = cosine_similarity(q_emb, doc_embeddings)[0]
    idx = sims.argsort()[-top_k:][::-1]
    return [documents[i] for i in idx]

# =====================================================
# 5. Agent + Memory 测试
# =====================================================

agent = Agent(
    role="记忆测试 Agent",
    goal="基于资料回答问题并记住关键信息",
    llm=llm,
    memory=True,
    verbose=True
)

query = "冬季安全演练的重点是什么?"
context = rag_search(query)

task_1 = Task(
    description="根据资料回答问题并记住结论。\n" + "\n".join(context),
    agent=agent
)

task_2 = Task(
    description="不提供资料,直接回答:冬季安全演练的重点是什么?",
    agent=agent
)

short_term = ShortTermMemory(
    storage=LocalChromaStorage("short_term", EMBEDDING_MODEL)
)
entity_mem = EntityMemory(
    storage=LocalChromaStorage("entities", EMBEDDING_MODEL)
)

crew = Crew(
    agents=[agent],
    tasks=[task_1, task_2],
    process=Process.sequential,
    short_term_memory=short_term,
    entity_memory=entity_mem,
    mem

主要就是通过自定义了一个 LocalChromaStorage来帮助我们在本地设置embedder

1) 为什么要写 LocalChromaStorage这个类?

CrewAI 的短期记忆和实体记忆本质都依赖一个“RAG 存储层”:

  • save(…) :把文本(以及元数据)写到向量库
  • search(query, …) :把 query 做 embedding,然后相似度检索出相关记忆条目
    之前报错的根源就是: 默认 storage 在初始化 embedding function 时走了远程 provider(OpenAI 或 HuggingFace API),所以要求对应的环境变量 key 。
    解决办法:把这层 storage 换成“完全本地”的实现。
2) init
  • type :区分是 short_term 还是 entities ,用于落盘路径隔离。
  • model_path :本地 SentenceTransformer 模型路径。
  • self._collection = None :延迟初始化(第一次 save/search 才真正打开/创建 collection)。
  • super().init(…) :让 CrewAI 的 RAGStorage 父类把必要字段初始化好(保持兼容)。
3) _initialize_app
  • SentenceTransformerEmbeddingFunction :这是 Chroma 自带的 本地 embedding function ,直接加载 sentence-transformers 模型在本机算向量。
  • base_path = Path(os.environ[“CREWAI_STORAGE_DIR”])… :明确落盘到你设置的目录里(不依赖外部默认规则)。
  • persist_path = base_path / self.type / agents :每种 memory 一个目录,比如:
    • …/short_term/…
    • …/entities/…
  • chromadb.PersistentClient(path=persist_path, …) :使用本地持久化 Chroma。
  • get_or_create_collection(… embedding_function=embedding_fn …) :绑定本地 embedding function,这一步决定了后续 query/add 全都离线。
  • self._client = self._collection :兼容 CrewAI 父类内部可能访问 _client 的地方。
4) save
  • 把 CrewAI 传进来的各种结构(list/dict/其它)统一成 (text, metadata) 列表。
  • 用 uuid4() 生成 ids。
  • self._collection.add(…) :写入 Chroma,collection 会自动调用本地 embedding function 算向量并落盘。
5) search
  • self._collection.query(query_texts=[query], …) :Chroma 会用本地 embedding function 对 query 编码后检索。
  • 返回结构里我同时提供了:
    • content :CrewAI 的 contextual memory 里会用 result[“content”] 拼上下文(你之前 KeyError 就是这里)。
    • text :保留一个别名,避免不同版本/不同调用路径读取 text 时丢字段。

RAG功能部署

RAG功能也是基于在embedder的基础上,将我们的prompt或者是其他一些有价值的信息写入到本地文件中,在使用crewai的时候直接让ai利用argtools或者是我们自己封装的工具来进行文件的阅读,从而快速方便的获取重要的前置消息。

衍生问题

在实际部署中,我是先解决rag的部署才通过相同的办法去部署memory的功能的,所以这里部署的解决方案和memory功能的思路是相对一致的。略有不同的是,虽然rag依旧是建立在embedder模型的基础之上。但是相对于memory功能,他的embedder内嵌功能是更加灵活的,因为他不必要在明显crew的时候写入到config里,他可以自己进行自定义封装。

在部署中我发现如果在使用官方的文件读取工具,即readtxttools的时候,crewai也会强制要求你进行线上api的调用,所以这里我们继续使用basetools的方法来将本地文件通过embedder切片成相应的向量之中

解决方案

embedding的核心功能就是通过将文本转成向量的方式来存储,在寻找的时候通过对比querry和本地向量的相似度来寻找答案,所以这里我们可以自己封装一个建议的rag_tools

import os
import re
from sentence_transformers import SentenceTransformer
import chromadb
from chromadb.utils import embedding_functions
from crewai import Agent, Task, Crew, Process, LLM
from crewai.tools import BaseTool
from pydantic import BaseModel, Field
from typing import Type, List

# ==============================
# 1. 基础配置
# ==============================

# 本地 embedding 模型(HuggingFace 缓存路径)
EMBEDDING_MODEL_PATH = (
    "/path/to/.cache/huggingface/hub/"
    "models--sentence-transformers--all-MiniLM-L6-v2/"
    "snapshots/xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
)

# 本地文档路径
DOC_PATH = "/path/to/project/ICS_Security_Plan.txt"

# LLM 配置(示例占位)
LLM_API_KEY = "YOUR_API_KEY_HERE"
LLM_BASE_URL = "https://api.example.com/v1"
LLM_MODEL = "your-llm-model-name"

# Chroma 集合名
COLLECTION_NAME = "ics_rag_demo_collection"

# ==============================
# 2. 初始化 Embedding & 向量库
# ==============================

print("✅ [Init] 加载本地 embedding 模型...")
embedding_model = SentenceTransformer(EMBEDDING_MODEL_PATH)

embedding_function = embedding_functions.SentenceTransformerEmbeddingFunction(
    model_name=EMBEDDING_MODEL_PATH
)

print("✅ [Init] 初始化 Chroma 向量库...")
client = chromadb.Client()

# 若存在旧集合则删除
try:
    client.delete_collection(COLLECTION_NAME)
except:
    pass

collection = client.create_collection(
    name=COLLECTION_NAME,
    embedding_function=embedding_function
)

# ==============================
# 3. 文档语义切分
# ==============================

def semantic_split(text: str) -> List[str]:
    """
    按文档结构进行语义切分(例如:按“第X周”)
    """
    chunks = []
    lines = text.split("\n")
    current_chunk = []

    for line in lines:
        line = line.strip()
        if not line:
            continue

        if line.startswith("【") and "周】" in line:
            if current_chunk:
                chunks.append("\n".join(current_chunk))
            current_chunk = [line]
        else:
            current_chunk.append(line)

    if current_chunk:
        chunks.append("\n".join(current_chunk))

    return chunks


print("✅ [Index] 读取文档并写入向量库...")
if os.path.exists(DOC_PATH):
    with open(DOC_PATH, "r", encoding="utf-8") as f:
        text = f.read()

    chunks = semantic_split(text)

    collection.add(
        documents=chunks,
        ids=[f"chunk-{i}" for i in range(len(chunks))]
    )

    print(f"✅ 已入库 {len(chunks)} 个语义片段")
else:
    raise FileNotFoundError("文档不存在")

# ==============================
# 4. 自定义本地 RAG Tool
# ==============================

class SearchInput(BaseModel):
    query: str = Field(..., description="检索问题")

class LocalRAGTool(BaseTool):
    name: str = "Local ICS Plan Search Tool"
    description: str = "用于从本地工控安全学习计划中检索相关内容"
    args_schema: Type[BaseModel] = SearchInput

    def _run(self, query: str) -> str:
        results = collection.query(
            query_texts=[query],
            n_results=5
        )

        docs = results.get("documents", [[]])[0]
        ids = results.get("ids", [[]])[0]

        if not docs:
            return "未找到相关内容"

        # 按 chunk 顺序重排
        combined = list(zip(ids, docs))

        def extract_num(x):
            m = re.search(r"chunk-(\d+)", x)
            return int(m.group(1)) if m else -1

        combined.sort(key=lambda x: extract_num(x[0]))
        ordered_docs = [doc for _, doc in combined]

        return "\n\n---\n\n".join(ordered_docs)

# ==============================
# 5. CrewAI Agent & Task
# ==============================

llm = LLM(
    model=f"openai/{LLM_MODEL}",
    base_url=LLM_BASE_URL,
    api_key=LLM_API_KEY
)

rag_tool = LocalRAGTool()

agent = Agent(
    role="ICS Security Teaching Assistant",
    goal="基于学习计划准确回答问题",
    backstory="你熟悉完整的工控安全学习计划,请基于文档回答。",
    tools=[rag_tool],
    llm=llm,
    verbose=True
)

task = Task(
    description="请回答:第二周的核心目标和关键任务是什么?",
    expected_output="第二周的目标与任务列表",
    agent=agent
)

crew = Crew(
    agents=[agent],
    tasks=[task],
    process=Process.sequential,
    verbose=True
)

# ==============================
# 6. 运行
# ==============================

result = crew.kickoff()
print("\n====== 最终回答 ======\n")
print(result)

此外,由于是使用的自定义ragtools,所以工具在对文本进行切片的时候是按照权重来拍顺序的,这会导致我们文件最后还原的时候是乱序的,所以我们又加了一个id序列号,让切片还原的时候按照原来的顺序进行排序还原,这样就不会乱序了

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐