一题击破:为什么“双工人+技能+停机窗口”的排程题,是人和大模型的共同噩梦?(附Python自动求解器)

引子
你可能见过“把任务排排坐”的调度题,但当题目写着“只输出 JSON、必须最小化工期、带技能约束、还夹着工人停机窗口”时,很多人(包括大模型)都会心头一紧。表面上,它像是把几个任务接龙;实际上,它是资源受限项目调度问题(RCPSP)的一个强变体:有工人,有技能,有先后依赖,还要躲开不可用时段;更“毒”的是,任务不可切片,时间又是整数格点,任何一个看似合理的局部选择,都可能在后面把你“锁死”。本文围绕参考题的“任务A”(忽略任务B),系统讲清楚这类题为什么极度考验多步骤推理,为什么靠“思维链”式人类或LLM的直觉很难稳拿最优;并给出一份可复用的Python自动求解器(CP-SAT)和下界计算方法,让你既能产出可行解,又能给出严谨的最优性证据路径。


1. 题型拆解:这不是普通的排程,而是强耦合的RCPSP

题面关键词:双工人、技能约束、停机窗口、不可切片(non-preemptive)、整数时间、先序依赖、目标是最小化总工期(makespan)。将其形式化:

  • 资源与技能
    • 工人 W1: 技能 A、B;不可用窗口 [8,9)。
    • 工人 W2: 技能 B、C;不可用窗口 [13,14)。
    • 技能约束:A 任务只能给 W1;C 只能给 W2;B 可以给 W1 或 W2。
  • 时间与执行
    • 所有时间均为非负整数,任务不可切片。
    • 同一工人上的任务区间不得重叠;允许相邻(finish = next start)。
    • 任何任务不得跨越该工人的不可用时间窗(即执行区间不可与[8,9)、[13,14)相交)。
  • 图结构
    • 有向无环图(DAG)的先序依赖:如 T6 依赖 T3、T4,等等。
  • 目标
    • 最小化 makespan = max_i finish(T_i)。

这与经典RCPSP相比,多了“技能-工人-黑窗”的三重耦合;与单机或并行机调度比,又多了完整的DAG约束和不可切片。这种结构会导致决策变量“横向”(在两名工人之间分配B类任务)和“纵向”(在依赖链上安排先后)相互牵制。任何一边的局部贪心,都会改变另一边的可行域和最优性。


2. 难点剖面:四种耦合让搜索空间指数爆炸

  • 依赖耦合(图拓扑)
    • 不是所有任务都能立刻排,必须等前驱完成。最早开始时间(EST)来自图的拓扑序;但资源冲突会让你无法在EST开工,从而产生二次延迟。
  • 资源耦合(同工人不可重叠)
    • 一个工人一次只能干一个任务;两个工人就像两条轨道,你要把任务拼成两条不交叠的链,同时还得满足跨链的先后约束。
  • 技能耦合(A/W1专属,C/W2专属,B可选)
    • A、C任务的归属是固定的;B任务的选择会影响两条轨道的拥塞与等待,错误的分配可能把关键路径压“更紧”,导致工期膨胀。
  • 停机窗口耦合(黑窗)
    • NoOverlap + 黑窗相当于在轨道中间塞了一个“不可跨越的障碍”。任务不可切片意味着无法穿越断点——要么在断点前完成,要么在断点后开始。这会将连续时间切成多个“可用片段”,增加碎片化和填充难度。

这些因素叠加,导致:

  • 搜索空间指数级:B类任务的二元分配 × 工人内的任务排序 × 断点处的分块选择 × 依赖传播导致的可行域缩紧。
  • 强非凸性与离散性:没有“平滑”调整的空间,微小的决策变化(比如把某个B任务从W1换给W2)会触发“多米诺骨牌效应”。

3. 为什么“思维链”很难直接给出最优?

许多“思维链”式推理会沿着“先安排最早能做的任务”“把B任务尽量塞给空闲的工人”之类的启发式前行。这在小规模、无黑窗的平滑场景尚可,但在本题中会频繁失效,原因包括:

  • 局部贪心的盲点:给W1塞入短B任务,看似“填缝”,却可能推迟后续A任务的开始,使关键路径延长。
  • 黑窗的“隐形代价”:在[8,9)与[13,14)附近,一个看似合适的安排可能刚好跨窗而不可行;为了绕开黑窗产生的强制空闲,会令后续链路整体右移。
  • 关键路径的动态性:忽略资源时的关键路径是31(后文给出计算方法),但任何资源/黑窗冲突都可能让新的路径变成“真·关键”。单纯凭思维链很难对这种路径切换有全局感知。
  • 证明最优性的困难:就算凑出一个不错的工期,也很难“口头”证明它是最优。你至少需要一个下界,与可行解的上界相遇,才构成严谨的最优性证据。

因此,“解释性推理”适合帮助人理解和设计启发式,但很难保证全局最优与可复现。要想稳拿最优,还是要倚重系统性的搜索(分支定界、约束传播)和可计算的下界。


4. 科学的路径:下界+精确求解器的“双证据法”

  • 下界(Lower Bounds)
    • 关键路径下界(忽略资源/黑窗):DAG上最长路径之和。对本题数据,可计算为31(示例计算见下一节)。
    • 工人资源下界(仅计“只能由该工人完成”的工时+黑窗):把W1必须做的A类总时长、W2必须做的C类总时长,放在带黑窗的单机上,计算完成这些工时所需的最小时间跨度。这给出 makespan 的资源侧下界。
  • 上界(Upper Bound)
    • 由一个可行排程产生。若上界=下界,即证最优;否则,继续搜索或加强下界。

这种策略特别适合有“只准输出JSON、且需回填下界”的评测框架:你既给出可行解,也提供算得严谨的下界,二者一对照,最优与否一目了然。


5. 关键下界的计算:严谨又可复用

5.1 关键路径下界(忽略资源与黑窗)
在DAG中做最长路(每条边权为0,节点权为任务工期),即可得到“最早完工时间(EF)”。以题中数据为例(只展示结论的形成路径,不展开冗长推导):

  • T1=4, T2=5
  • T3= T1+3=7;T4= T1+6=10;T5= T2+4=9
  • T6= max(T3,T4)+7 = max(7,10)+7 = 17
  • T7= max(T3,T5)+5 = max(7,9)+5 = 14
  • T8= max(T4,T5)+6 = max(10,9)+6 = 16
  • T9= T6+4 = 21
  • T10= max(T6,T7)+6 = max(17,14)+6 = 23
  • T11= max(T7,T8)+3 = max(14,16)+3 = 19
  • T12= max(T9,T10,T11)+8 = max(21,23,19)+8 = 31

因此,critical_path 下界为 31。无论如何调资源与黑窗,makespan 不可能小于 31。

5.2 工人资源下界(“只能由该工人完成”的工时 + 黑窗)
思想:对每个工人 w,先把“必须由 w 完成”的任务时长相加为 P_w(如W1的A类任务、W2的C类任务)。考虑时间区间 [0, T) 上工人可用时间长度 avail(T) = T − blackout_overlap([0,T))。要在 [0, T) 内完成 P_w,必须有 avail(T) ≥ P_w。最小满足的 T 即为该工人的资源下界。对单一黑窗 [u,v),这个 T 可解析;若有多窗,用“迭代定点”即可:

  • 初值 T = P_w
  • 迭代更新 T ← P_w + overlap([0,T)),直至收敛(两三步即可)。

题中:

  • W1 必须做的 A 任务:T1=4, T3=3, T6=7, T9=4,合计 P_W1 = 18;黑窗 [8,9) 长度1。
    • 若 T ≤ 8,可用时间=T,但 T=18>8;若 T ≥ 9,可用时间=T−1,故 T−1 ≥ 18 → T ≥ 19。因此 resource_bound_W1 = 19。
  • W2 必须做的 C 任务:T8=6, T12=8,合计 P_W2=14;黑窗 [13,14)。
    • 若 T ≥ 14,可用时间=T−1,故 T−1 ≥14 → T≥15。因此 resource_bound_W2=15。

综合下界应取多项的最大值:LB = max(critical_path, resource_bound_W1, resource_bound_W2) = max(31, 19, 15) = 31。

这意味着:除非你找到 makespan=31 的可行解,否则最优值>31;如果恰好找到31,那就“下上界相遇”,可判最优。


6. 自动求解:用 OR-Tools CP-SAT 建模,一次性吃下技能+黑窗+依赖

模型要点:

  • 变量
    • 每个任务 t:共享 start_t, end_t(整数),并为每个允许的工人 w 建一个可选区间 interval(t,w)(presence 由布尔 assign[t,w] 控制),所有允许的 w 恰有一个被选择。
  • 约束
    • 先序:start_v ≥ end_u(对每条依赖边 u→v)。
    • 工人不重叠:对每个工人 w,AddNoOverlap(所有 interval(_,w) + 黑窗固定区间)。
    • 技能:允许的 w 仅来自技能规则(A→W1,C→W2,B→W1|W2)。
    • 黑窗:把 [8,9)、[13,14) 作为固定区间加入对应工人的 NoOverlap,即可确保“任务不得跨越不可用窗”。
  • 目标
    • Minimize makespan,其中 makespan = max_t end_t。

另外,我们提供下界计算函数,求 critical_path、resource_bound_W1/W2,用于输出或验证。

下面是完整、可运行的Python代码(需要安装 ortools)。你可以直接粘贴运行,得到一份可行解和下界;若解与下界相等,即证最优。

# pip install ortools

from ortools.sat.python import cp_model

def merge_intervals(intervals):
    if not intervals:
        return []
    itv = sorted(intervals)
    merged = [list(itv[0])]
    for s, e in itv[1:]:
        if s <= merged[-1][1]:
            merged[-1][1] = max(merged[-1][1], e)
        else:
            merged.append([s, e])
    return [(s, e) for s, e in merged]

def overlap_with_prefix(T, windows):
    # windows: list of [s,e), assumed non-overlapping and sorted
    if T <= 0:
        return 0
    ov = 0
    for s, e in windows:
        if T <= s:
            break
        ov += max(0, min(T, e) - s)
    return ov

def resource_bound_required_span(P, blackout_windows):
    # Given mandatory processing P (sum of durations) on a single machine
    # with downtime windows (union), compute minimal T with available_time(T) >= P.
    # Solve T = P + overlap([0,T)) by fixed-point iteration.
    W = merge_intervals(blackout_windows)
    T = P
    for _ in range(10):
        ov = overlap_with_prefix(T, W)
        new_T = P + ov
        if new_T == T:
            break
        T = new_T
    return T

def critical_path_length(tasks):
    # tasks: dict name -> {'dur':int, 'deps':[names]}
    # DAG longest path (node weights = durations)
    from collections import defaultdict, deque
    indeg = {t: 0 for t in tasks}
    graph = defaultdict(list)
    for v, data in tasks.items():
        for u in data['deps']:
            graph[u].append(v)
            indeg[v] += 1
    # topological order
    q = deque([t for t in tasks if indeg[t] == 0])
    ef = {t: tasks[t]['dur'] for t in tasks}  # earliest finish ignoring resources
    while q:
        u = q.popleft()
        for v in graph[u]:
            ef[v] = max(ef[v], ef[u] + tasks[v]['dur'])
            indeg[v] -= 1
            if indeg[v] == 0:
                q.append(v)
    return max(ef.values())

def solve_rcpsp_with_skills_and_blackouts():
    # Problem data (Task A)
    tasks = {
        "T1":  {"dur": 4, "skill": "A", "deps": []},
        "T2":  {"dur": 5, "skill": "B", "deps": []},
        "T3":  {"dur": 3, "skill": "A", "deps": ["T1"]},
        "T4":  {"dur": 6, "skill": "B", "deps": ["T1"]},
        "T5":  {"dur": 4, "skill": "B", "deps": ["T2"]},
        "T6":  {"dur": 7, "skill": "A", "deps": ["T3", "T4"]},
        "T7":  {"dur": 5, "skill": "B", "deps": ["T3", "T5"]},
        "T8":  {"dur": 6, "skill": "C", "deps": ["T4", "T5"]},
        "T9":  {"dur": 4, "skill": "A", "deps": ["T6"]},
        "T10": {"dur": 6, "skill": "B", "deps": ["T6", "T7"]},
        "T11": {"dur": 3, "skill": "B", "deps": ["T7", "T8"]},
        "T12": {"dur": 8, "skill": "C", "deps": ["T9", "T10", "T11"]},
    }
    workers = ["W1", "W2"]
    skills_of_worker = {
        "W1": {"A", "B"},
        "W2": {"B", "C"},
    }
    # Blackouts: half-open [s,e)
    blackout = {
        "W1": [(8, 9)],
        "W2": [(13, 14)],
    }

    # Allowed workers per task by skill rule
    allowed = {}
    for t, data in tasks.items():
        s = data["skill"]
        if s == "A":
            allowed[t] = ["W1"]
        elif s == "C":
            allowed[t] = ["W2"]
        else:  # "B"
            allowed[t] = ["W1", "W2"]

    # Lower bounds
    cp_lb = critical_path_length(tasks)  # 31 for this instance
    # Mandatory durations per worker (A->W1, C->W2)
    P_W1 = sum(tasks[t]["dur"] for t in tasks if tasks[t]["skill"] == "A")
    P_W2 = sum(tasks[t]["dur"] for t in tasks if tasks[t]["skill"] == "C")
    rb_W1 = resource_bound_required_span(P_W1, blackout["W1"])
    rb_W2 = resource_bound_required_span(P_W2, blackout["W2"])

    # CP-SAT model
    model = cp_model.CpModel()
    horizon = sum(tasks[t]["dur"] for t in tasks) + 50  # generous

    # Variables
    start = {}
    end = {}
    assign = {t: {} for t in tasks}  # assign[t][w] -> BoolVar
    intervals_by_worker = {w: [] for w in workers}

    for t, data in tasks.items():
        dur = data["dur"]
        start[t] = model.NewIntVar(0, horizon, f"start_{t}")
        end[t] = model.NewIntVar(0, horizon, f"end_{t}")

        # Create optional intervals per allowed worker sharing the same start/end
        allowed_ws = allowed[t]
        if len(allowed_ws) == 1:
            w = allowed_ws[0]
            assign[t][w] = model.NewBoolVar(f"assign_{t}_{w}")
            model.Add(assign[t][w] == 1)  # forced
            itv = model.NewOptionalIntervalVar(start[t], dur, end[t], assign[t][w], f"intv_{t}_{w}")
            intervals_by_worker[w].append(itv)
        else:
            # exactly one
            lits = []
            for w in allowed_ws:
                b = model.NewBoolVar(f"assign_{t}_{w}")
                assign[t][w] = b
                itv = model.NewOptionalIntervalVar(start[t], dur, end[t], b, f"intv_{t}_{w}")
                intervals_by_worker[w].append(itv)
                lits.append(b)
            model.AddExactlyOne(lits)

    # Precedence constraints
    for v, data in tasks.items():
        for u in data["deps"]:
            model.Add(start[v] >= end[u])

    # NoOverlap per worker, with blackout intervals as fixed intervals
    for w in workers:
        # Add blackout intervals
        for idx, (s, e) in enumerate(merge_intervals(blackout[w])):
            bs = model.NewIntVar(s, s, f"{w}_blk_{idx}_s")
            be = model.NewIntVar(e, e, f"{w}_blk_{idx}_e")
            blk = model.NewIntervalVar(bs, e - s, be, f"{w}_blk_{idx}")
            intervals_by_worker[w].append(blk)
        model.AddNoOverlap(intervals_by_worker[w])

    # Makespan
    makespan = model.NewIntVar(0, horizon, "makespan")
    model.AddMaxEquality(makespan, [end[t] for t in tasks])
    model.Minimize(makespan)

    # Optional: improve search by branching on assignment first
    decision_bools = []
    for t in tasks:
        for w in assign[t]:
            decision_bools.append(assign[t][w])
    if decision_bools:
        model.AddDecisionStrategy(decision_bools, cp_model.CHOOSE_FIRST, cp_model.SELECT_MAX_VALUE)

    solver = cp_model.CpSolver()
    solver.parameters.max_time_in_seconds = 30.0
    solver.parameters.num_search_workers = 8
    solver.parameters.log_search_progress = False

    status = solver.Solve(model)

    result = {
        "status": solver.StatusName(status),
        "objective": solver.ObjectiveValue() if status in (cp_model.OPTIMAL, cp_model.FEASIBLE) else None,
        "critical_path_lb": cp_lb,
        "resource_bound_W1": rb_W1,
        "resource_bound_W2": rb_W2,
        "assignments": [],
    }

    if status in (cp_model.OPTIMAL, cp_model.FEASIBLE):
        # Extract assignments
        for t in sorted(tasks.keys(), key=lambda x: int(x[1:])):  # T1..T12
            dur = tasks[t]["dur"]
            s = int(solver.Value(start[t]))
            f = int(solver.Value(end[t]))
            chosen_w = None
            for w in assign[t]:
                if solver.Value(assign[t][w]) == 1:
                    chosen_w = w
                    break
            if chosen_w is None:
                # for forced ones (A/C), there is only one key
                chosen_w = list(assign[t].keys())[0]
            result["assignments"].append({
                "task": t, "worker": chosen_w, "start": s, "finish": f, "dur": dur
            })

    return result

def validate_solution(result, tasks, blackout, skills_of_worker):
    if result["objective"] is None:
        return False, ["No solution."]
    assigns = result["assignments"]
    # Build map
    by_task = {a["task"]: a for a in assigns}
    # 1) Skill feasibility
    violations = []
    for a in assigns:
        t, w = a["task"], a["worker"]
        skill = tasks[t]["skill"]
        if skill not in skills_of_worker[w]:
            violations.append(f"Skill: {t} assigned to {w} but requires {skill}")
    # 2) Precedence
    for v, data in tasks.items():
        sv = by_task[v]["start"]
        for u in data["deps"]:
            fu = by_task[u]["finish"]
            if sv < fu:
                violations.append(f"Precedence: {v} starts {sv} before {u} finishes {fu}")
    # 3) No overlap per worker and no crossing blackouts
    from collections import defaultdict
    by_worker = defaultdict(list)
    for a in assigns:
        by_worker[a["worker"]].append((a["task"], a["start"], a["finish"]))
    def intervals_overlap(i, j):
        # [s,f) overlap if s_i < f_j and s_j < f_i
        return (i[1] < j[2]) and (j[1] < i[2])

    for w, arr in by_worker.items():
        arr.sort(key=lambda x: (x[1], x[2]))
        # pairwise overlap
        for i in range(len(arr)):
            for j in range(i+1, len(arr)):
                if intervals_overlap(arr[i], arr[j]):
                    violations.append(f"Overlap: {w} overlaps {arr[i][0]}[{arr[i][1]},{arr[i][2]}) with {arr[j][0]}[{arr[j][1]},{arr[j][2]})")
        # blackout crossing
        for t, s, f in arr:
            for bs, be in blackout[w]:
                if not (f <= bs or s >= be):
                    violations.append(f"Blackout: {w} runs {t} across [{bs},{be})")
    return (len(violations) == 0), violations

if __name__ == "__main__":
    res = solve_rcpsp_with_skills_and_blackouts()
    print("Status:", res["status"])
    print("Makespan:", res["objective"])
    print("Lower bounds -> CP:", res["critical_path_lb"],
          " W1:", res["resource_bound_W1"],
          " W2:", res["resource_bound_W2"])
    for a in res["assignments"]:
        print(a)

使用说明与期望:

  • 运行后会输出状态(OPTIMAL/FEASIBLE等)、makespan、三类下界,以及每个任务的分配与时间窗。
  • 若 makespan 等于 critical_path_lb(本例31),你即可宣告“全局最优”;若更大,则仍是可行上界,可继续调整参数/启发式或等待更久搜索。
  • 代码中还包含验证器函数 validate_solution(上面主函数未调用,你可根据需要在末尾调用它),对技能、先序、重叠与黑窗做机器校验,便于生成符合评测Schema的JSON。

7. 为什么这段建模能“一次性”压住全部约束?

  • 统一的 start/end + 可选区间(Optional Interval)
    我们为每个任务只设一个 start_t、end_t,再为每个允许的工人建一个可选区间 interval(t,w),这使得:
    • 先序只需写在(start/end)层面,不关心具体分配;
    • 不同工人上的 NoOverlap 自动生效;未被选择的区间不产生约束;
    • 黑窗被当作固定区间塞进 NoOverlap,从而“不可跨越”。
  • 布尔分配 + ExactlyOne
    B类任务只需“ExactlyOne(assign[T, W1], assign[T, W2])”,A/C任务固定为1。无须额外“技能兼容”的子句。
  • 目标函数使用 AddMaxEquality
    makespan 作为所有任务结束时间的最大值,CP-SAT能做很强的传播,帮助剪枝。

这正是现代CP-SAT在调度问题中的杀手锏:用间隔变量统一表示时间、分配与占用,配合NoOverlap和MaxEquality,传播强且简洁。


8. 性能与稳健性:让搜索更聪明

  • 决策顺序
    让求解器先决定 assign 布尔变量(哪位工人做B任务),再收缩时间域,实际常常能更快地切断不良分支。
  • 下界喂给人/系统
    在“只可输出JSON”的评测中,把 critical_path 与 resource_bound 一并输出,可以辅助判定“目前最优性”。若上界>下界,继续求解;若相等,即证最优。
  • 视野扩展
    若任务规模扩大或黑窗增多,可考虑:
    • 使用更多“累积资源”或“时间窗传播”的全局约束;
    • 引入松弛解(如MILP的LP松弛)作为更强的下界;
    • 对称性破坏(例如在同技能同前驱的B类任务之间强加一个指数序,以减少同构分支)。

9. 超越“思维链”:对人类与LLM的启示

  • 多步骤推理的瓶颈
    这类题的难,不在于“看不懂规则”,而在于“组合空间的全局权衡”。人和LLM都容易被局部模式诱导,做出短视决策。即便给出一段详细“思维链”,也很可能只是在证明一个次优解“看起来合理”,并不能提供“等于下界”的最优性证据。
  • 工具化思维
    把复杂约束交给通用求解器(CP-SAT/MILP),让其在严谨的数学框架中做系统搜索;把“解释”留给高层——比如为什么这个解可行、下界如何计算、还可以在哪些方向增强。这种“解释+求解分离”的范式,往往胜过在自然语言中穷尽路径。
  • 面向生产的可验证性
    有了验证器与下界,你可以将答案自动转成评测所需的JSON Schema:给出 assignments、makespan、lower_bounds,并让 violations 为空。即便未必找到最优,你也能提供“充分可检”的可行解与严谨下界,提升结果的可信度与可复现性。

10. 小结与行动

  • 这类“带技能与停机窗口的RCPSP”之所以难,是因为依赖、资源、技能、黑窗四重耦合造成的强非凸离散优化;局部直觉容易误导,思维链难以保全局最优。
  • 科学的解法是“下界+精确求解”的双证据路径:关键路径下界+资源下界给出硬下限;CP-SAT 给出可行上界;二者相遇即证最优。
  • 文中给出的Python代码,采用OR-Tools的可选间隔+NoOverlap+MaxEquality建模,能一揽子处理技能、黑窗、先序与不可切片约束;同时提供可复用的下界计算与可行性校验思路。

如果你在自己的数据上跑出了更好的上界、或发现了更紧的下界构造,或者想把模型拓展到多技能多工人、多黑窗、甚至多资源的工业级RCPSP,欢迎在评论区留言:你遇到的最大瓶颈是什么?更偏好CP-SAT还是MILP?你会怎样为这类题目设计更强的下界与剪枝?一起交流。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐