前文“OpenAI Assistants API架构”中的两个 listthreads.runs.listthreads.messages.list),本文重点讨论它俩的应用范式:
先通过 threads.runs.list 轮询确认先前任务完成 → 提交新任务 → 轮询 runs.retrieve 确认新 Run 完成 → threads.messages.list 提取“新增的助手回复”(即新任务结果)。

一、两个轮询( threads.runs.listruns.retrieve

完整可运行代码

import openai
import time

# 1. 初始化配置(替换为你的API Key)
OPENAI_API_KEY = "你的OpenAI API Key"
client = openai.OpenAI(api_key=OPENAI_API_KEY)

def main():
    # ===================== 流程1:通过threads.runs.list轮询,确认先前任务完成 =====================
    # 步骤1.1:创建/获取线程(示例用新建线程,实际可替换为已有线程ID)
    thread = client.beta.threads.create()
    thread_id = thread.id
    print(f"创建线程成功,线程ID:{thread_id}")

    # 步骤1.2:向线程添加用户消息(新任务的输入)
    user_prompt = "请计算200到300的整数和,并只返回计算结果"
    client.beta.threads.messages.create(
        thread_id=thread_id,
        role="user",
        content=user_prompt
    )
    print(f"添加用户消息:{user_prompt}")

    # 步骤1.3:轮询threads.runs.list,检查先前任务是否完成
    print("\n【流程1】轮询检查线程先前任务是否完成...")
    while True:
        # 核心:调用threads.runs.list获取线程下所有Run
        runs = client.beta.threads.runs.list(thread_id=thread_id).data
        # 判断是否有未完成的Run(in_progress/cancelling等状态)
        has_unfinished_run = any([run.status in ["in_progress", "cancelling", "queued"] for run in runs])
        
        if not has_unfinished_run:
            print("先前任务已全部完成,可执行新任务")
            break
        print("先前任务未完成,继续等待...")
        time.sleep(2)  # 每2秒轮询一次

    # ===================== 流程2:通过threads.runs.create提交新任务 =====================
    print("\n【流程2】提交新任务(创建Run)...")
    # 先创建一个极简的助手(仅处理计算任务)
    assistant = client.beta.assistants.create(
        name="计算助手",
        instructions="仅返回数学计算结果,无多余文字",
        model="gpt-3.5-turbo"
    )
    # 提交新任务(核心:threads.runs.create)
    new_run = client.beta.threads.runs.create(
        thread_id=thread_id,
        assistant_id=assistant.id
    )
    new_run_id = new_run.id
    print(f"新任务提交成功,Run ID:{new_run_id}")

    # ===================== 流程3:通过threads.messages.list,获取新任务结果 =====================
    print("\n【流程3】轮询获取新任务结果...")
    assistant_reply = None
    while True:
        # 步骤3.1:先确认新Run已完成(这是获取结果的前提)
        current_run = client.beta.threads.runs.retrieve(thread_id=thread_id, run_id=new_run_id)
        if current_run.status != "succeeded":
            print("新任务未完成,继续等待...")
            time.sleep(2)
            continue
        
        # 步骤3.2:调用threads.messages.list获取线程所有消息
        messages = client.beta.threads.messages.list(thread_id=thread_id).data
        
        # 步骤3.3:筛选新任务的助手回复(role=assistant且关联当前Run)
        for msg in messages:
            # 助手消息的run_id会关联到生成它的Run
            if msg.role == "assistant" and msg.run_id == new_run_id:
                assistant_reply = msg.content[0].text.value
                break
        
        if assistant_reply:
            break
        print("暂未获取到助手回复,继续轮询...")
        time.sleep(1)

    # 输出最终结果
    print("\n===================== 任务结果 =====================")
    print(f"用户提问:{user_prompt}")
    print(f"助手回复(新任务结果):{assistant_reply}")

    # 清理测试资源(可选)
    client.beta.assistants.delete(assistant.id)
    client.beta.threads.delete(thread_id)

if __name__ == "__main__":
    main()

运行结果示例

创建线程成功,线程ID:thread_xxxxxxx
添加用户消息:请计算200到300的整数和,并只返回计算结果

【流程1】轮询检查线程先前任务是否完成...
先前任务已全部完成,可执行新任务

【流程2】提交新任务(创建Run)...
新任务提交成功,Run ID:run_xxxxxxx

【流程3】轮询获取新任务结果...
新任务未完成,继续等待...
新任务未完成,继续等待...
暂未获取到助手回复,继续轮询...

===================== 任务结果 =====================
用户提问:请计算200到300的整数和,并只返回计算结果
助手回复(新任务结果):25550

注意事项

  1. 轮询间隔:示例中用 2秒/1秒 间隔,可根据任务复杂度调整(复杂任务如文件解析用5秒,简单计算用1秒),避免高频调用触发API速率限制;
  2. 状态判断threads.runs.list 返回的Run状态中,in_progress(进行中)、queued(排队中)、cancelling(取消中)都属于“未完成”状态,需全部排除;
  3. 结果筛选:通过 msg.run_id == new_run_id 精准匹配新任务的助手回复,避免获取历史消息;
  4. 环境要求:确保安装最新版OpenAI SDK(执行 pip install openai>=1.0.0)。

二、拆分为两个异步程序

将流程拆分为两个独立程序

  • 程序1(任务提交端):负责「通过 threads.runs.list 轮询确认先前任务完成 + threads.runs.create 提交新任务」,并将核心ID(线程ID/Run ID)保存到本地文件,供程序2读取;
  • 程序2(结果获取端):异步运行,负责「通过 threads.messages.list 轮询获取新任务结果」,从本地文件读取ID后独立轮询。

两个程序通过本地JSON文件实现简单通信(适合演示异步场景)。


程序1:任务提交程序(流程1+流程2)

文件名:submit_task.py

import openai
import time
import json

# 1. 配置初始化
OPENAI_API_KEY = "你的OpenAI API Key"
client = openai.OpenAI(api_key=OPENAI_API_KEY)
# 用于程序间通信的文件路径
ID_FILE_PATH = "task_ids.json"

def main():
    # ===================== 流程1:threads.runs.list轮询,确认先前任务完成 =====================
    # 步骤1:创建线程(实际场景可替换为已有线程ID)
    thread = client.beta.threads.create()
    thread_id = thread.id
    print(f"【程序1】创建线程成功,线程ID:{thread_id}")

    # 步骤2:添加用户任务消息
    user_prompt = "请计算1到500的整数和,仅返回数字结果"
    client.beta.threads.messages.create(
        thread_id=thread_id,
        role="user",
        content=user_prompt
    )
    print(f"【程序1】添加用户任务:{user_prompt}")

    # 步骤3:轮询threads.runs.list,检查先前任务是否完成
    print("\n【程序1】流程1:轮询检查先前任务状态...")
    while True:
        # 核心:调用threads.runs.list获取所有Run
        runs = client.beta.threads.runs.list(thread_id=thread_id).data
        # 判断是否有未完成的Run
        has_unfinished = any([run.status in ["in_progress", "queued"] for run in runs])
        
        if not has_unfinished:
            print("【程序1】先前任务已完成,可提交新任务")
            break
        print("【程序1】仍有未完成任务,继续等待...")
        time.sleep(2)

    # ===================== 流程2:threads.runs.create提交新任务 =====================
    # 步骤1:创建极简助手(仅处理计算任务)
    assistant = client.beta.assistants.create(
        name="异步计算助手",
        instructions="仅返回数学计算的数字结果,无任何多余文字",
        model="gpt-3.5-turbo"
    )

    # 步骤2:提交新Run(核心:threads.runs.create)
    new_run = client.beta.threads.runs.create(
        thread_id=thread_id,
        assistant_id=assistant.id
    )
    run_id = new_run.id
    print(f"\n【程序1】流程2:新任务提交成功,Run ID:{run_id}")

    # 步骤3:保存线程ID/Run ID到文件,供程序2读取
    task_ids = {
        "thread_id": thread_id,
        "run_id": run_id,
        "assistant_id": assistant.id  # 可选,用于清理资源
    }
    with open(ID_FILE_PATH, "w", encoding="utf-8") as f:
        json.dump(task_ids, f, ensure_ascii=False)
    print(f"【程序1】任务ID已保存到 {ID_FILE_PATH}")

    # 可选:清理助手(若无需复用)
    # client.beta.assistants.delete(assistant.id)

if __name__ == "__main__":
    main()

程序2:异步结果获取程序

文件名:get_result.py

import openai
import time
import json
import os

# 1. 配置初始化
OPENAI_API_KEY = "你的OpenAI API Key"
client = openai.OpenAI(api_key=OPENAI_API_KEY)
ID_FILE_PATH = "task_ids.json"  # 和程序1保持一致
POLL_INTERVAL = 2  # 轮询间隔(秒)

def wait_for_id_file():
    """等待程序1生成ID文件"""
    print("【程序2】等待程序1提交任务并生成ID文件...")
    while not os.path.exists(ID_FILE_PATH):
        time.sleep(POLL_INTERVAL)
    print("【程序2】已获取ID文件,开始轮询结果...")

def main():
    # 步骤1:等待程序1生成ID文件
    wait_for_id_file()

    # 步骤2:读取线程ID/Run ID
    with open(ID_FILE_PATH, "r", encoding="utf-8") as f:
        task_ids = json.load(f)
    thread_id = task_ids["thread_id"]
    run_id = task_ids["run_id"]
    print(f"【程序2】读取到任务ID:线程ID={thread_id},Run ID={run_id}")

    # ===================== 流程3:threads.messages.list轮询获取新任务结果 =====================
    assistant_reply = None
    while True:
        # 子步骤1:先确认Run已完成(这是获取结果的前提)
        current_run = client.beta.threads.runs.retrieve(
            thread_id=thread_id,
            run_id=run_id
        )
        if current_run.status != "succeeded":
            print(f"【程序2】新任务未完成,当前状态:{current_run.status},继续等待...")
            time.sleep(POLL_INTERVAL)
            continue

        # 子步骤2:调用threads.messages.list获取所有消息
        messages = client.beta.threads.messages.list(thread_id=thread_id).data

        # 子步骤3:筛选当前Run对应的助手回复
        for msg in messages:
            if msg.role == "assistant" and msg.run_id == run_id:
                assistant_reply = msg.content[0].text.value
                break

        if assistant_reply:
            break
        print("【程序2】暂未获取到助手回复,继续轮询...")
        time.sleep(POLL_INTERVAL)

    # 输出最终结果
    print("\n===================== 异步任务结果 =====================")
    print(f"【程序2】新任务结果:{assistant_reply}")

    # 可选:删除ID文件(清理资源)
    os.remove(ID_FILE_PATH)

if __name__ == "__main__":
    main()

运行步骤与结果示例

步骤1:运行程序1(提交任务)
python submit_task.py

输出示例:

【程序1】创建线程成功,线程ID:thread_xxxxxxx
【程序1】添加用户任务:请计算1到500的整数和,仅返回数字结果

【程序1】流程1:轮询检查先前任务状态...
【程序1】先前任务已完成,可提交新任务

【程序1】流程2:新任务提交成功,Run ID:run_xxxxxxx
【程序1】任务ID已保存到 task_ids.json
步骤2:运行程序2(异步获取结果)
python get_result.py

输出示例:

【程序2】等待程序1提交任务并生成ID文件...
【程序2】已获取ID文件,开始轮询结果...
【程序2】读取到任务ID:线程ID=thread_xxxxxxx,Run ID=run_xxxxxxx
【程序2】新任务未完成,当前状态:in_progress,继续等待...
【程序2】新任务未完成,当前状态:in_progress,继续等待...

===================== 异步任务结果 =====================
【程序2】新任务结果:125250

  1. 异步核心:程序1提交任务后立即结束,程序2独立轮询,完全模拟“提交-异步获取”的生产场景;
  2. 轮询逻辑threads.messages.list 本身无法判断任务是否完成,需先通过 runs.retrieve 确认Run状态为 succeeded,再提取结果;
  3. 异常处理:实际生产中可添加文件读写异常捕获、ID合法性校验等;
  4. 扩展场景:若需多实例/分布式部署,可将JSON文件替换为Redis/MQ等中间件,核心逻辑不变。

异步方式的小结

程序 核心职责 关键API 通信方式
程序1(submit_task.py) 确认先前任务完成 + 提交新任务 threads.runs.listthreads.runs.create 将thread_id/run_id写入JSON文件
程序2(get_result.py) 异步轮询获取结果 threads.runs.retrievethreads.messages.list 从JSON文件读取ID,轮询结果
  1. 程序1聚焦「任务提交前的状态检查 + 提交任务」,是异步流程的“发起端”;
  2. 程序2聚焦「异步轮询结果」,是异步流程的“消费端”;
  3. 两个程序解耦(如,通过本地文件),完美体现 OpenAI Assistants API 异步处理的核心设计。

三、 threads.runs.listthreads.runs.retrieve 的区别

1、核心定位(一句话总结)

方法 核心定位 裁判员类比
threads.runs.list 批量查询:获取指定线程下所有 Run 的列表(仅返回核心信息) 裁判员查看某运动员的「所有比赛记录清单」(只看比赛ID、状态、时间)
threads.runs.retrieve 单条查询:根据 thread_id + run_id 获取单个 Run 的完整详情 裁判员调取某一场比赛的「详细判罚记录」(含比赛状态、失败原因、工具调用日志等)

2、全维度对比表

对比维度 client.beta.threads.runs.list client.beta.threads.runs.retrieve
核心作用 批量获取线程下所有 Run 的概要信息 获取单个 Run 的完整、实时的详细信息
必选参数 仅需 thread_id(指定线程) thread_id + run_id(指定线程+指定Run)
返回值 包含多个 Run 对象的列表(data 字段),每个 Run 仅返回核心字段(id、status、created_at、assistant_id 等) 单个 Run 对象,返回全量字段(含 status、last_error、tools、usage、expires_at 等)
使用场景 1. 检查线程下是否有未完成的 Run(如异步提交前的批量校验);
2. 复盘线程的所有 Run 历史;
3. 统计线程的 Run 数量/状态分布
1. 轮询单个 Run 的实时状态(如异步等待任务完成);
2. 排查单个 Run 失败的原因(查看 last_error);
3. 获取单个 Run 的资源使用情况(token 消耗)
调用频次 低频调用(如提交新任务前调用1次) 高频调用(如轮询 Run 状态时,每2-5秒调用1次)
返回数据量 数据量随 Run 数量增加而增加(批量) 数据量固定(仅单个 Run 的详情)

3、各自核心使用场景 + 代码示例

1. threads.runs.list:批量查询(流程1:检查所有先前 Run)

核心场景:提交新任务前,批量检查线程下是否有未完成的 Run(避免并行执行)。

import openai
client = openai.OpenAI(api_key="你的API Key")

def check_all_runs_in_thread(thread_id):
    # 调用list:获取线程下所有Run的列表
    runs = client.beta.threads.runs.list(thread_id=thread_id).data
    
    # 批量判断是否有未完成的Run
    unfinished_runs = [run for run in runs if run.status in ["in_progress", "queued"]]
    if unfinished_runs:
        print(f"发现 {len(unfinished_runs)} 个未完成的Run:{[r.id for r in unfinished_runs]}")
        return False
    else:
        print("线程下所有Run均已完成")
        return True

# 调用示例
thread_id = "你的线程ID"
check_all_runs_in_thread(thread_id)
2. threads.runs.retrieve:单条查询(流程3:轮询单个 Run 状态)

核心场景:提交新任务后,轮询单个 Run 的实时状态,直到完成。

import openai
import time
client = openai.OpenAI(api_key="你的API Key")

def poll_single_run_status(thread_id, run_id):
    # 循环调用retrieve:获取单个Run的实时状态
    while True:
        run = client.beta.threads.runs.retrieve(thread_id=thread_id, run_id=run_id)
        if run.status in ["succeeded", "failed", "cancelled"]:
            # 返回完整的Run详情(含失败原因等)
            return run
        print(f"Run {run_id} 状态:{run.status},继续等待...")
        time.sleep(2)

# 调用示例
thread_id = "你的线程ID"
run_id = "你的Run ID"
final_run = poll_single_run_status(thread_id, run_id)
print(f"Run最终状态:{final_run.status}")
# 若失败,查看详细原因(仅retrieve能获取)
if final_run.status == "failed":
    print(f"失败原因:{final_run.last_error.message}")

要点

  1. 查询范围list 是“批量查所有”,retrieve 是“单条查详情”;
  2. 使用场景
    • 批量校验/历史复盘 → 用 list
    • 轮询单个 Run 状态/排查失败原因 → 用 retrieve
  3. 数据完整性retrieve 返回全量字段,是异步轮询的核心;list 仅返回核心字段,适合批量筛选。

结合之前的异步流程:

  • 流程1(检查先前任务完成)→ 用 threads.runs.list(批量查所有 Run,判断是否有未完成的);
  • 流程3(轮询新任务状态)→ 用 threads.runs.retrieve(单条查新 Run 的实时状态);
    两者配合是 OpenAI Assistants API 异步处理的标准范式。
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐