flow是什么

flow能够创建结构化、事件驱动的工作流,两大核心特点:

1、实现条件、循环、分支的逻辑

2、实现事件驱动(类似于中断机制)

有啥用呢?

不同任务之间共享状态,任务与任务之间实现无缝编排,输入输出高度可控,协作形式、输入输出格式定义都能更精确控制。

核心要素

@start

@listen

你可以监听一个方法或者某个状态

当方法/状态就绪的时候,会触发监听函数被动执行

@router[可选]

可以为一个结果的输出创建非常复杂的处理(路由)机制

状态管理

所有状态全部存入state属性

非结构化状态

用户手动修改状态,提供了高度的灵活性

结构化状态

利用pydantic定义更清晰严格的state属性

listen控制

or、and

在监听过程中,进行布尔逻辑运算,实现更复杂的事件触发逻辑【这个太好理解了,多说一个字都是浪费生命】

语法:

@listen(and_(start_method, second_method))

router

@router()装饰器允许我们根据上一个flow的状态设置一个状态路由,你可以自主定义这个路由规则,进而更好控制后续的执行流。

有点绕?

假设上一个flow的过程是吃饭,那么吃完饭我该干嘛呢?我可以设置一个路由规则(你爱怎么定就怎么定):

  • 吃饱了,去看电视

  • 没吃饱,去喝牛奶

  • 。。。

下文给了一个实际例子,看了就明白了。

绘制流程图

图形化展示AI工作流程,展示各种任务、相互连接、数据流交互细节。

如何生成

方法一:

flow.plot("my_flow_plot")

你会得到一个“my_flow_plot.html”

方法二:

$ crewai flow plot

应用示例

flow单独使用

from crewai.flow.flow import Flow, listen, start
from pydantic import BaseModel


class ExampleState(BaseModel):
    counter: int = 0
    message: str = ""


class StructuredExampleFlow(Flow[ExampleState]):

    @start()
    def first_method(self):
        self.state.message = "Hello from structured flow"
        print(f"State after first_method: {self.state}")

    @listen(first_method)
    def second_method(self):
        self.state.counter += 1
        self.state.message += " - updated"
        print(f"State after second_method: {self.state}")

    @listen(second_method)
    def third_method(self):
        self.state.counter += 1
        self.state.message += " - updated again"

        print(f"State after third_method: {self.state}")


flow = StructuredExampleFlow()
flow.kickoff()
flow.plot("test")

flow+crew联合使用

总体实现与上面类似,

1、把crew的实现用单独的类实现

2、在flow的listen里面调用crew的kickoff,获取实际输入输出内容

民以食为天,举个吃饭的例子吧 ^_^

test_flow_app

利用flow框架调用crew用一个agent写一首诗,另一个agent判断作者吃饱了没。

利用flow路由机制对不同的case进行不同处理。

from crewai.flow.flow import Flow, listen, start, router
from pydantic import BaseModel

from test_poetry import poetry_crew
from test_distinguish import distinguish_crew

class ExampleState(BaseModel):
    counter: int = 0
    message: str = ""
    eating_state: str = ""


class StructuredExampleFlow(Flow[ExampleState]):

    def __init__(self, input_str=""):
        super().__init__(ExampleState())
        self.input_str = input_str
        
    @start()
    def eating(self):      
        result = poetry_crew.kickoff(inputs={'input_str': self.input_str})  # 以参数形式调用crew
        print(result.raw)  # 打印实际的诗歌内容
        self.state.message = result.raw  # 只保存实际的文本内容
        print(f"State after first_method: {self.state}")
        
    @router(eating)
    def next_step(self): 
        result = distinguish_crew.kickoff(inputs={'input_str': self.state.message}) 
        print(f"State after next_step: {self.state}")
        self.state.eating_state = result.raw.strip()  # 获取原始输出并去除空白字符
        print(result.raw)  # 打印判断结果
        if self.state.eating_state == "full":
            return "watch_TV"
        elif self.state.eating_state == "hungry":
            return "drink milk"
        else:
            return "something wrong"

    @listen("watch_TV")
    def method_watch_tv(self):
        self.state.counter += 1
        self.state.message = "吃饱了撑得慌,去看看电影吧"
        print(f"State after method_watch_tv: {self.state}")

    @listen("drink milk")
    def method_drink_milk(self):
        self.state.counter += 1
        self.state.message = "没吃饱咋整?喝点牛奶对付一下好了"
        print(f"State after method_drink_milk: {self.state}")

    @listen("something wrong")
    def method_something_wrong(self):
        self.state.counter += 1
        self.state.message = "出了点问题"
        print(f"State after method_something_wrong: {self.state}")


if __name__ == "__main__":
    input_str = '请就吃饭的场景写一首诗,要突出很美味,很满足'
    input_str = '请就吃饭的场景写一首诗,要突出很美味,不够吃'
    flow = StructuredExampleFlow(input_str)
    flow.kickoff()
    flow.plot("test")

test_poetry

import os
from crewai import Agent, Task, Crew, LLM

LLM_QW = LLM(
    model='dashscope/qwen-plus', 
    base_url='https://dashscope.aliyuncs.com/compatible-mode/v1', 
    api_key=os.environ.get("QWEN_API_KEY"), 
    stream=False, # 设置为同步模式
    temperature=0.7, # 控制模型输出的随机性
    request_timeout=240, # 增加请求超时时间到240秒
    max_retries=2,
)

poetry_agent = Agent(
    role='You are a poet. You can write poetry.',
    goal='Write beautiful poems based on the given themes',
    backstory='你是一位精通五言、七言绝句,严格遵循平仄对仗的诗人',
    llm=LLM_QW,
)

poetry_task = Task(
    name='poetry_task',
    description='Write a poem based on the following theme: {input_str}',
    expected_output='一首对仗工整的诗歌',
    agent=poetry_agent,
)

poetry_crew = Crew(
    agents=[poetry_agent],
    tasks=[poetry_task],
    llm=LLM_QW)

if __name__ == '__main__':

    input_str = '请就吃饭的场景写一首诗,要突出我吃得很饱'
    input_str = '请就吃饭的场景写一首诗,要突出我没吃饱,肚子还很饿'
    result = poetry_crew.kickoff(inputs={'input_str': input_str})
    print(result)

test_distinguish

import os
from crewai import Agent, Task, Crew, LLM

LLM_QW = LLM(
    model='dashscope/qwen-plus', 
    base_url='https://dashscope.aliyuncs.com/compatible-mode/v1', 
    api_key=os.environ.get("QWEN_API_KEY"), 
    stream=False, # 设置为同步模式
    temperature=0.7, # 控制模型输出的随机性
    request_timeout=240, # 增加请求超时时间到240秒
    max_retries=2,
)

poetry_agent = Agent(
    role='你是一位精通诗歌的文人,你能分辨出诗歌的意境,并抓住要点',
    goal='明确判断一首诗的真实含义,并给出简明扼要的结论',
    backstory='你会读出一首诗,并给出对它的理解',
    llm=LLM_QW,
)

poetry_task = Task(
    name='poetry_task',
    description='根据输入的诗歌内容: {input_str},准确判断诗歌实际的意思,并直接输出判断',
    expected_output='只允许两种输出:full,hungry',
    agent=poetry_agent,
)

distinguish_crew = Crew(
    agents=[poetry_agent],
    tasks=[poetry_task],
    llm=LLM_QW)

if __name__ == '__main__':

    input_str = '碗尽粒无存,腹圆衣带紧。箸停犹抚腹,笑语饱中真。'
    input_str = "饭尽匙空碗底凉,腹鸣如鼓尚彷徨。盘中残渍犹堪恋,未饱饥肠恨日长。"
    result = distinguish_crew.kickoff(inputs={'input_str': input_str})
    print(result)

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐