CppCon 2021 学习:From Eager Futures/Promises to Lazy Continuations
https://github.com/CppCon/CppCon2021/blob/main/Presentations/Benjamin_Hindman_CppCon_2021.pdfhttps://github.com/3rdparty/eventualshttps://github.com/3rdparty/eventuals-grpc因为上面的依赖全是github的项目下载到本地特别慢,所
这个ppt看不懂记录一下后面再看
https://github.com/CppCon/CppCon2021/blob/main/Presentations/Benjamin_Hindman_CppCon_2021.pdf
https://github.com/3rdparty/eventuals
https://github.com/3rdparty/eventuals-grpc
因为上面的依赖全是github的项目下载到本地特别慢,所有我们打算用本地的vscode连接到github上已经配置好的codespaces环境进行学习,环境上一个博客已经介绍
https://blog.csdn.net/TM1695648164/article/details/150452163?fromshare=blogdetail&sharetype=blogdetail&sharerId=150452163&sharerefer=PC&sharesource=TM1695648164&sharefrom=from_link
在之前的https://github.com/xiaqiu-xz/eventuals_example.git 进行学习
#include "eventuals/eventual.h" // 引入 Eventual 类,用于异步操作
#include "eventuals/promisify.h" // 引入 Promisify 功能(可选,当前示例未使用)
#include <glog/logging.h> // 引入 Google glog 日志库,提供 CHECK_EQ 宏
#include <iostream>
#include <string>
#include <thread> // std::thread
#include <unistd.h> // sleep 函数
using namespace eventuals;
int main(int argc, char **argv) {
std::cout << "Starting program..." << std::endl;
// 定义一个 Eventual 对象 e,返回 std::string 类型
auto e = []() {
// Eventual 构造函数接收一个 lambda,lambda 接收一个 Continuation k
return Eventual<std::string>([](auto &k) {
// 在新线程中异步执行
auto thread = std::thread([&k]() mutable {
sleep(2); // 模拟耗时操作,睡眠 2 秒
std::cout << "Thread finished sleeping, calling k.Start()..." << std::endl;
k.Start("awake!"); // 触发 continuation,将结果传递出去
});
thread.detach(); // 分离线程,主线程无需 join
std::cout << "thread will sleep for 2 seconds ..." << std::endl;
});
};
std::cout << "Waiting for result..." << std::endl;
// 执行 Eventual,获取结果(阻塞直到 k.Start() 被调用)
auto result = *e();
std::cout << "Got result: " << result << std::endl;
// 检查返回值是否符合预期
CHECK_EQ("awake!", result);
std::cout << "CHECK_EQ passed!" << std::endl;
return 0;
}
1⃣ 引入头文件
#include "eventuals/eventual.h" // Eventual 类,用于封装异步操作
#include "eventuals/promisify.h" // Promisify 功能,可把普通函数封装为 Eventuals (未用)
#include <glog/logging.h> // Google 日志库,提供 CHECK_EQ 宏
#include <iostream>
#include <string>
#include <thread> // std::thread
#include <unistd.h> // sleep 函数
eventuals/eventual.h
是核心,提供了Eventual<T>
类型,用于异步操作和链式调用。promisify.h
可将普通函数转换成 Eventual,但这个示例未使用。glog
的CHECK_EQ(a,b)
用于断言,如果 a ≠ b 会终止程序。thread
和sleep
用于模拟异步操作。
2⃣ 使用命名空间
using namespace eventuals;
- 为了直接使用
Eventual<T>
等类型而不写完整命名空间。
3⃣ 主函数
int main(int argc, char **argv) {
std::cout << "Starting program..." << std::endl;
- 程序入口,输出启动信息。
4⃣ 定义异步 Eventual
auto e = []() {
return Eventual<std::string>([](auto &k) {
auto thread = std::thread([&k]() mutable {
sleep(2);
std::cout << "Thread finished sleeping, calling k.Start()..." << std::endl;
k.Start("awake!");
});
thread.detach();
std::cout << "thread will sleep for 2 seconds ..." << std::endl;
});
};
分析:
e
是一个 lambda,返回Eventual<std::string>
。Eventual<std::string>
构造函数接收一个 lambda,参数k
是 Continuation:异步操作完成后调用k.Start(value)
传回结果。- 在
Eventual
内部,我们启动一个新线程执行耗时操作:sleep(2)
模拟延迟- 打印提示信息
k.Start("awake!")
将结果传给Eventual
thread.detach()
表示线程独立运行,不阻塞主线程。
注意:&k
捕获 continuation 的引用,使用mutable
允许 lambda 修改其捕获变量。
5⃣ 执行 Eventual 并获取结果
std::cout << "Waiting for result..." << std::endl;
auto result = *e();
*e()
会 启动 Eventual 的执行 并阻塞,直到k.Start()
被调用返回结果。- 这里
Eventual
内的线程会在 2 秒后触发k.Start("awake!")
,所以result
最终是"awake!"
。
6⃣ 输出结果并检查
std::cout << "Got result: " << result << std::endl;
CHECK_EQ("awake!", result);
std::cout << "CHECK_EQ passed!" << std::endl;
- 输出返回的字符串
CHECK_EQ
检查返回值是否正确,如果失败程序会中止- 成功的话,打印
"CHECK_EQ passed!"
7⃣ 程序结束
return 0;
- 程序正常退出。
总结逻辑
- 定义一个异步操作
Eventual<std::string>
- 内部启动独立线程,等待 2 秒后返回
"awake!"
- 调用
*e()
阻塞等待结果 - 获取结果并验证
核心概念:
- Eventual:异步操作封装
- Continuation (
k
):事件完成后的回调 *e()
:执行 Eventual 并获取结果(阻塞)
如果你愿意,我可以画一张线程与 Eventual 执行流程图,帮助你更直观地理解它的异步行为。
结果输出
Starting program...
Waiting for result...
thread will sleep for 2 seconds ...
Thread finished sleeping, calling k.Start()...
Got result: awake!
CHECK_EQ passed!
添加test2.cpp
#include "eventuals/compose.h" // 提供 >> 操作符,用于组合 Eventuals
#include "eventuals/just.h" // 提供 Just<T>,直接返回一个值的 Eventual
#include "eventuals/promisify.h" // 将普通函数转为 Eventual(此示例未用)
#include "eventuals/raise.h" // 提供 Raise<T>,触发异常的 Eventual
using namespace eventuals;
int main(int argc, char** argv) {
// 定义一个 Eventual 链 e
auto e = []() {
return Just("hello") // 生成一个 Eventual,直接返回 "hello"
>> Raise(RuntimeError("Oh no!")) // 抛出异常 RuntimeError,Eventual 链中断
>> Just("world"); // 如果没有异常,这里会返回 "world",但本例不会执行
};
try {
*e(); // 执行 Eventual 链,阻塞直到完成或者抛异常
std::abort(); // 如果没有抛异常,程序终止(理论上不会执行到这里)
} catch (const TypeErasedError& e) { // 捕获 Eventuals 中抛出的异常
CHECK_STREQ("Oh no!", e.what()); // 检查异常信息是否符合预期
}
return 0;
}
核心逻辑说明
Just("hello")
:创建一个返回"hello"
的 Eventual。Raise(RuntimeError("Oh no!"))
:抛出异常,链条中止,后续的Just("world")
不会执行。*e()
:执行 Eventual 链,会在Raise
处触发异常。TypeErasedError
:Eventuals 内部将异常类型擦除(type-erased),统一捕获。CHECK_STREQ
:断言异常信息是否正确。
BUILD.bazel 添加
# 定义一个 C++ 可执行文件 target
cc_binary(
name = "test2", # 可执行文件的名字
srcs = ["test2.cpp"], # 源文件列表
deps = [
# 依赖的外部库,这里依赖 eventuals 库
"@com_github_3rdparty_eventuals//eventuals",
],
# 注意:至少在 macOS 上不能静态链接,因为某些系统库需要动态加载
linkstatic = False,
)
构建程序
bazel build -c dbg :test2
WARNING: Logging before InitGoogleLogging() is written to STDERR
W20250816 14:42:23.097555 95659 promisify.h:166] WARNING: exception thrown while dereferencing eventual: Oh no!
terminate called after throwing an instance of 'eventuals::RuntimeError'
what(): Oh no!
.github/workflows/build.yml 添加Run test2
on:
push:
branches:
- "main"
pull_request:
branches:
- "**"
workflow_dispatch:
jobs:
build:
name: Build
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Build all examples
run: bazel build ...
shell: bash
# 运行 main 可执行文件
- name: Run main
run: bazel run :main
shell: bash
# 运行 main 可执行文件
- name: Run test2
run: bazel run :test2
shell: bash
test3.cpp
这段使用 eventuals::expected
和 Then
的代码添加详细注释,帮助理解错误处理与链式调用逻辑。
#include "eventuals/expected.h" // 提供 expected<T> 类型,用于包装返回值或错误
#include "eventuals/promisify.h" // 将普通函数封装为 Eventual(可选,示例未用)
#include "eventuals/then.h" // 提供 Then 操作符,用于链式处理结果
using namespace eventuals;
// 定义一个返回 expected<int> 的函数 SomeFunction
expected<int> SomeFunction(int i) {
if (i > 100) {
// 如果 i 大于 100,返回一个错误(unexpected)
return make_unexpected("> 100");
} else {
// 否则返回正常值 i(expected)
return i; // 也可以写作 return expected(i);
}
}
int main(int argc, char** argv) {
// 使用链式调用将 SomeFunction(42) 的结果转换为字符串
// Then 会接收 expected 的值,如果是正常值就调用 lambda
// 最终返回 "42"
CHECK_EQ("42", *(SomeFunction(42) >> Then([](int i) {
return std::to_string(i); // 将整数转换为字符串
})));
return 0;
}
核心逻辑说明
expected<int>
:类似std::optional
,但是可以携带错误信息。make_unexpected("> 100")
:生成一个错误对象,用于标记操作失败。Then([](int i){ ... })
:如果expected
内有正常值,则执行 lambda,将值传递下去;如果是错误,则跳过 lambda。SomeFunction(42) >> Then(...)
:通过>>
链式操作将 expected 的值传入下一步处理。*
运算符解包 expected 的结果,供CHECK_EQ
比较。
test4
这段使用 Eventuals Generator + Map + Reduce 的 C++ 代码添加详细注释:
#include "eventuals/compose.h" // 提供 compose 操作符,用于组合 Eventuals
#include "eventuals/generator.h" // 提供 Generator 类型,用于生成序列
#include "eventuals/iterate.h" // 提供 Iterate,用于遍历容器
#include "eventuals/map.h" // 提供 Map 操作符,用于对每个元素做转换
#include "eventuals/promisify.h" // 可将普通函数包装为 Eventual(示例未用)
#include "eventuals/reduce.h" // 提供 Reduce 操作符,用于累积结果
#include "eventuals/then.h" // 提供 Then,用于链式处理值
using namespace eventuals;
// 定义一个生成器函数,返回 Generator 对象,类型为 std::string
Generator::Of<std::string> SomeFunction() {
return []() {
// 使用 Iterate 遍历字符串列表
return Iterate({"hello", " ", "world", "!"})
>> Map([](std::string&& s) { // 对每个字符串应用 Map
s[0] = std::toupper(s[0]); // 将首字母大写
return std::move(s); // 返回转换后的字符串
});
};
}
int main(int argc, char** argv) {
auto e = []() {
// 使用 Generator 生成的序列,并通过 Reduce 累积成一个完整的字符串
return SomeFunction()
>> Reduce(
/* 初始结果 = */ std::string(), // Reduce 的初始值为空字符串
[](auto& result) {
// 对生成器每个元素执行 Then
return Then([&](auto&& value) {
result += value; // 将元素拼接到结果中
return true; // 返回 true 表示继续累积
});
});
};
// 执行 Eventual 并获取结果
CHECK_EQ("Hello World!", *e());
return 0;
}
核心逻辑说明
- Generator + Iterate:
Iterate({"hello", " ", "world", "!"})
生成一个字符串序列,每次提供一个元素。 - Map:对每个字符串的首字母进行大写处理。
- Reduce:将生成器产生的每个字符串累加到初始结果(空字符串)中。
- Then:在 Reduce 内处理每个元素,将处理结果加入最终字符串。
- 最终结果:
"Hello World!"
test5.cpp
这段使用 Eventuals::Just 的 C++ 代码可以加上注释如下:
#include "eventuals/just.h" // 引入 Just,用于创建立即返回固定值的 Eventual
#include "eventuals/promisify.h" // 可将普通函数包装为 Eventual(示例未用)
// 使用命名空间简化写法,实际项目中也可只引入所需符号
using namespace eventuals;
int main(int argc, char** argv) {
// 定义一个 Eventual 对象 e,返回一个整数 42
auto e = []() {
return Just(42); // Just 创建一个立即返回值的 Eventual
};
// 执行 Eventual 并获取结果,阻塞直到结果准备好
int i = *e(); // BLOCKING! 阻塞等待 Eventual 计算完成
// 检查结果是否符合预期
CHECK_EQ(42, i);
return 0;
}
核心逻辑说明
- Just(42):创建一个 Eventual,执行时立即返回固定值
42
。 *e()
:阻塞执行 Eventual,并获取最终值。- CHECK_EQ(42, i):使用 Google glog 的检查宏,确保返回值正确。
Eventuals
Eventuals 是一个 C++ 库,用于编写异步计算(asynchronous computations),核心概念是 continuations(延续/续体)。
核心特点对比
- 与常用回调(Callbacks)对比
- 回调是最常用的异步方法,但:
- 不易组合(组合多个异步操作复杂)
- 很少支持取消操作
- 逻辑难以推理
- Eventuals 解决了这些问题,提供了更易组合和可取消的异步机制。
- 回调是最常用的异步方法,但:
- 与 Futures/Promises 对比
- Futures/Promises 支持组合和取消,但:
- 性能可能差(锁开销、动态内存分配)
- 多数实现是急切求值(eager),导致引用透明性丢失
(即相同输入不保证返回相同输出,因为可能已经执行过副作用)
- Eventuals 的不同:
- 懒惰求值(lazy):必须显式调用
Start()
才会执行 - 延续(Continuation)不是类型擦除的,可以直接使用、保存等
- 允许编译器做更多优化,性能更好
- 代价是需要更多头文件实现,可能导致编译时间增加,但可以通过
Task
类型来缓解
- 懒惰求值(lazy):必须显式调用
- Futures/Promises 支持组合和取消,但:
其他特点
- 提供多种抽象(abstractions),简化异步计算:
- 例如
Stream
用于异步流处理 - 使用 builder 模式 构建
- 例如
- 灵感来源:
- 基于构建和使用 libprocess 的经验
- libprocess 被用于 Apache Mesos 大规模生产环境(超过 8 万台主机)
总结理解
- Eventuals 是懒执行的异步框架
- 必须调用
Start()
才会执行 - 与 Future/Promise 不同,避免默认锁开销和类型擦除带来的性能损失
- 必须调用
- 延续(Continuation)可直接操作
- 可以保存、传递、组合
- 支持取消操作
- 高性能、可组合
- 避免回调地狱(callback hell)
- 可用于复杂异步流、管道(pipeline)
- 适合大规模系统
- 灵感来源于 Mesos 的生产环境经验
这一段 Eventuals 的理论部分 和理解一下,并解释关键概念。
理解
1. Eventuals 的组合理论(Theory)
- 大部分情况下,你会使用 高阶组合器(combinators) 来组合 Eventuals。
- 本指南会从基础的 Eventuals 开始,逐步讲到如何创建自己的 Eventuals。
组合 Eventuals
- 使用重载的
operator>>()
来组合 Eventuals:- 类似于 Bash 的管道
|
,但 Eventuals 使用>>
- 我们也称组合的链条为 pipeline
- 类似于 Bash 的管道
- 为什么不用
operator|()
:operator>>()
在 C++17 及以上版本可以提供更安全的表达式求值顺序- 参考:C++ Paper P0145R3
- 因为组合管道的结果类型 不是类型擦除 的,所以通常使用
auto
来声明返回类型。
启动 Eventuals
- Eventuals 是懒执行的,必须显式调用
Start()
才会执行。 - 通常只在代码的“边缘”(edges)启动 Eventuals,例如
int main()
。 - 在启动前,必须先通过
Terminal()
终结(terminate) Eventual:
auto e = AsynchronousFunction()
>> Terminal()
.start([](auto&& result) {
// Eventual pipeline 成功
})
.fail([](auto&& result) {
// Eventual pipeline 失败
})
.stop([](auto&& result) {
// Eventual pipeline 停止
});
- 终结后的 Eventual 可以使用
Build()
构建为 Continuation(延续)形式:
auto k = Build(std::move(e));
k.Start(); // 启动
- 注意:
- 一旦启动,Eventual 不能被析构/移动,必须等它完成后再处理。
#include "eventuals/eventual.h"
#include "eventuals/terminal.h"
#include "eventuals/just.h"
#include <iostream>
// 使用 eventuals 命名空间,方便直接写 Just / Terminal 等
using namespace eventuals;
int main() {
// 构建一个最简单的 Eventual pipeline。
// Just("hello world") 表示将字符串常量 "hello world"
// 注入到 pipeline 中,作为后续处理的输入。
auto e = Just("hello world")
// 使用 Terminal() 终结 pipeline。
// Terminal() 必须指定至少一个回调,用来处理执行结果。
>> Terminal()
// start():当 pipeline 成功执行时调用。
// 这里 result 就是 Just 注入的字符串。
.start([](auto&& result) {
std::cout << "Success: " << result << std::endl;
})
// fail():当 pipeline 执行失败时调用。
// 这里 error 一般是异常或错误信息。
.fail([](auto&& error) {
std::cerr << "Error: " << error << std::endl;
})
// stop():当 pipeline 被主动停止时调用。
.stop([]() {
std::cerr << "Stopped!" << std::endl;
});
// Build() 将 pipeline 构建为一个 Continuation(延续对象)。
// 注意:只有 Build() 之后的 Continuation 才能调用 Start() 启动。
auto k = Build(std::move(e));
// 启动 pipeline。
// Start() 会触发执行流程,从 Just 注入值,
// 再传递到 Terminal,最终调用对应的回调。
k.Start();
return 0;
}
输出
Success: hello world
与 std::future
集成
- 可以使用
Terminate()
将终结的 Eventual 转为std::promise
+std::future
:
auto e = AsynchronousFunction();
auto [future, k] = Terminate(std::move(e));
k.Start();
auto result = future.get(); // 等待 Eventual 完成
- 在测试中,你也可以用
*
来阻塞式获取结果:
auto result = *AsynchronousFunction(); // 阻塞,仅测试使用
#include <iostream>
#include <string>
#include <chrono>
#include <thread>
#include <future>
#include "eventuals/eventual.h"
#include "eventuals/event-loop.h"
#include "eventuals/then.h"
#include "eventuals/promisify.h"
using namespace eventuals;
// 异步函数示例:模拟一个耗时 1 秒的异步操作,并返回字符串
auto AsynchronousFunction() {
return Just(0) >> Then([](int) {
std::cout << "异步任务执行..." << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(1));
return std::string("Hello from async!");
});
}
int main() {
// 构造默认事件循环,以便 Eventuals 管理异步任务
EventLoop::ConstructDefault();
// --- 示例1: 使用 Promisify 将 Eventual 与 std::future 集成 ---
std::cout << "--- 示例1: 使用 std::future 获取异步结果 ---" << std::endl;
// 获取 Eventual 对象
auto e = AsynchronousFunction();
// Promisify 会返回 std::future + continuation(可启动任务)
// future: 用于阻塞获取结果
// k: continuation,用于启动 Eventual pipeline
auto [future, k] = Promisify("chained-tasks", std::move(e));
// 启动异步任务
k.Start();
// 阻塞等待异步任务完成并获取结果
auto result1 = future.get();
std::cout << "使用 std::future 获取结果: " << result1 << std::endl;
std::cout << std::endl;
// --- 示例2: 测试场景下的阻塞式获取结果 ---
// 注意:'*' 阻塞式获取仅用于测试或简单场景,生产中应尽量避免阻塞主线程
std::cout << "--- 示例2: 使用 '*' 阻塞式获取结果 ---" << std::endl;
// 直接对 Eventual 使用解引用操作符 '*' 获取结果
// 内部会阻塞当前线程直到异步任务完成
auto result2 = *AsynchronousFunction();
std::cout << "使用 '*' 阻塞式获取结果: " << result2 << std::endl;
// 清理事件循环
EventLoop::DestructDefault();
return 0;
}
2. 常用组合器
Just
- 最基本的 Eventual,将一个值注入到 pipeline 中:
Just("hello world");
Then
- 最常用的组合器,用于在 pipeline 中继续处理异步计算的结果:
http::Get("https://3rdparty.dev")
>> Then([](http::Response&& response) {
return SomeAsynchronousFunction(response); // 返回一个 Eventual
});
- 也可以返回同步值,不必返回 Eventual:
http::Get("https://3rdparty.dev")
>> Then([](auto&& response) {
return response.code == 200; // 同步值
});
If
- 用于 条件分支,根据不同条件执行不同 Eventual:
- 异步“if”语句:
http::Get("https://3rdparty.dev")
>> Then([](auto&& response) {
return If(response.code != 200)
.then(http::Get("https://www.3rdparty.dev"))
.otherwise(Just(response)); // "otherwise" = else
});
- 可以结合
Then()
做额外处理:
http::Get("https://3rdparty.dev")
>> Then([](auto&& response) {
return If(response.code != 200)
.then(http::Get("https://www.3rdparty.dev"))
.otherwise(Then([body = response.body]() {
return "Received HTTP Status OK with body: " + body;
}));
});
总结理解
- 管道(Pipeline)概念
- 使用
>>
串联 Eventuals - 每个管道可以终结(
Terminal()
)再启动(Build()
+Start()
)
- 使用
- 懒执行与终结
- 必须显式启动,启动前先终结
- 可以和
std::future
集成
- 常用组合器
Just()
:注入值Then()
:处理结果,可返回 Eventual 或同步值If()
:异步条件分支,.then()
/.otherwise()
- 测试中简化写法
- 可使用
*
阻塞获取结果,但仅用于测试
- 可使用
#include <iostream> // 标准输入输出库,用于 std::cout
#include <string> // 使用 std::string 处理字符串
#include <chrono> // 使用 std::chrono 模拟延时
#include <thread> // 使用 std::this_thread::sleep_for
#include <future> // 使用 std::future 处理异步结果
#include "eventuals/eventual.h" // 引入 Eventual 类,定义异步操作
#include "eventuals/event-loop.h" // 引入 EventLoop,管理异步任务
#include "eventuals/then.h" // 引入 Then 组合器,处理异步结果
#include "eventuals/terminal.h" // 引入 Terminal 组合器,手动处理回调
#include "eventuals/promisify.h" // 引入 Promisify,将 Eventual 转换为 future
#include "eventuals/just.h" // 引入 Just,将值注入异步管道
#include "eventuals/http.h" // HTTP 请求模拟(假设库提供)
#include "eventuals/if.h" // If 组合器,实现异步条件分支
using namespace eventuals; // 使用 Eventuals 命名空间
// 模拟 HTTP 响应结构
struct HttpResponse {
int code; // HTTP 状态码
std::string body; // 响应内容
};
// 模拟异步 HTTP GET 请求
auto HttpGet(const std::string& url) {
return Just(0) >> Then([url](int) -> HttpResponse {
std::cout << "正在请求: " << url << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(500)); // 模拟网络延迟
// 根据 URL 返回不同的模拟响应
if (url.find("3rdparty.dev") != std::string::npos &&
url.find("www.") == std::string::npos) {
return HttpResponse{404, "Not Found"}; // 主服务器404
} else if (url.find("www.3rdparty.dev") != std::string::npos) {
return HttpResponse{200, "Hello from backup server!"}; // 备用服务器200
} else {
return HttpResponse{200, "Hello World!"}; // 默认成功
}
});
}
// 模拟异步处理函数,处理 HttpResponse 并返回字符串
auto SomeAsynchronousFunction(const HttpResponse& response) {
return Just(response) >> Then([](const HttpResponse& resp) {
std::cout << "异步处理响应,状态码: " << resp.code << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(300)); // 模拟处理延迟
return "异步处理完成: " + resp.body; // 返回处理后的字符串
});
}
int main() {
// 初始化默认事件循环,必须在异步操作前调用
EventLoop::ConstructDefault();
// --- 示例1: 基本 Just 用法 ---
std::cout << "=== 示例1: Just - 将值注入到 pipeline ===\n" << std::endl;
// 将一个字符串注入 Eventual pipeline,并使用 Then 处理
auto [future1, k1] = Promisify(
"just-example",
Just(std::string("hello world")) >> Then([](const std::string& value) {
std::cout << "从 Just 获得值: " << value << std::endl;
return value + " - 处理完成"; // 返回新的字符串
})
);
k1.Start(); // 启动 Eventual pipeline
auto result1 = future1.get(); // 阻塞等待结果
std::cout << "最终结果: " << result1 << "\n" << std::endl;
// --- 示例2: Then 处理异步结果 ---
std::cout << "=== 示例2: Then - 处理异步计算结果 ===\n" << std::endl;
auto [future2, k2] =
Promisify(
"then-async-example",
HttpGet("https://example.com") >> Then([](const HttpResponse& response) {
// 返回一个异步 Eventual
return SomeAsynchronousFunction(response);
})
);
k2.Start(); // 启动异步任务
auto result2 = future2.get(); // 等待结果
std::cout << "异步处理结果: " << result2 << "\n" << std::endl;
// --- 示例3: Then 返回同步值 ---
std::cout << "=== 示例3: Then - 返回同步值 ===\n" << std::endl;
auto [future3, k3] =
Promisify(
"then-sync-example",
HttpGet("https://example.com") >> Then([](const HttpResponse& response) {
// 返回同步值,Eventuals 自动封装为 Eventual
bool success = response.code == 200;
std::cout << "请求是否成功: " << (success ? "是" : "否") << std::endl;
return success;
})
);
k3.Start();
auto result3 = future3.get();
std::cout << "同步处理结果: " << result3 << "\n" << std::endl;
// --- 示例4: 使用 If 实现条件分支 ---
std::cout << "=== 示例4: 使用 If 实现条件分支 ===\n" << std::endl;
auto [future4, k4] =
Promisify(
"conditional-example",
HttpGet("https://3rdparty.dev") >> Then([](const HttpResponse& response) -> auto {
std::cout << "第一次请求状态码: " << response.code << std::endl;
// 条件分支,如果第一次请求失败,则访问备用服务器
return If(response.code != 200)
.yes([]() {
return HttpGet("https://www.3rdparty.dev") >>
Then([](const HttpResponse& backup_response) {
return backup_response.body; // 返回备用服务器内容
});
})
.no([body = response.body]() {
return "Received HTTP Status OK with body: " + body;
});
})
);
k4.Start();
auto result4 = future4.get();
std::cout << "条件分支结果: " << result4 << "\n" << std::endl;
// --- 示例5: 使用 Terminal() 手动处理回调 ---
std::cout << "=== 示例5: 使用 Terminal() 手动处理回调 ===\n" << std::endl;
auto e6 = HttpGet("https://example.com") >>
Terminal()
.start([](const HttpResponse& response) {
std::cout << "Pipeline 成功! 状态码: " << response.code
<< ", 内容: " << response.body << std::endl;
})
.fail([](auto&& error) {
std::cout << "Pipeline 失败!" << std::endl;
})
.stop([](auto&&) {
std::cout << "Pipeline 停止!" << std::endl;
});
auto k6 = Build(std::move(e6)); // 构建 continuation
k6.Start(); // 启动 pipeline
std::cout << "示例5 完成\n" << std::endl;
// 清理事件循环
EventLoop::DestructDefault();
return 0;
}
注释说明:
- Just
- 将一个值注入 Eventual 管道。
- 可以直接返回同步值或与 Then 组合处理。
- Then
- 处理上一步的结果,可以返回新的 Eventual 或同步值。
- 支持链式异步调用。
- If
- 异步条件分支。
.yes()
和.no()
返回不同 Eventual,实现类似异步 if/else。
- Terminal
- 终结 Eventual pipeline。
- 可以手动处理
.start()/.fail()/.stop()
回调。
- Promisify
- 将 Eventual 转换为
std::promise
+std::future
。 future.get()
阻塞等待结果。k.Start()
启动异步任务。
- 将 Eventual 转换为
- EventLoop
- Eventuals 所有异步任务的调度核心。
- 必须先
ConstructDefault()
初始化,结束后DestructDefault()
清理。
理解:Errors and Error Handling(错误与错误处理)
1. 同步错误(Synchronous Errors)
- 在 同步代码 中,推荐使用
Expected::Of<T>
类型来返回值或错误:
Expected::Of<std::string> GetFullName(const Person& person) {
if (person.has_first_name() && person.has_last_name()) {
return Expected(person.first_name() + " " + person.last_name()); // 正常返回值
} else {
return Unexpected(InvalidPerson("name incomplete")); // 返回错误
}
}
Unexpected()
用于返回错误Expected()
用于显式返回正常值(虽然不是必须,但可以让代码更清晰)
2. Lambda 中返回同步错误
- 当 lambda 中既可能返回正常值,也可能返回错误时,需要在返回类型中显式指定
Expected::Of<T>
:
auto get_full_name = [](const Person& person) -> Expected::Of<std::string> {
if (person.has_first_name() && person.has_last_name()) {
return Expected(person.first_name() + " " + person.last_name());
} else if (!person.has_first_name()) {
return Unexpected(InvalidPerson("missing first name"));
} else {
CHECK(!person.has_last_name());
return Unexpected(InvalidPerson("missing last name"));
}
};
- 原因:
Unexpected()
本身不知道类型T
- 通过在返回类型中指定
Expected::Of<T>
,编译器就能知道Unexpected()
应该转换成哪种Expected
类型。
#include "eventuals/expected.h" // 引入 expected 类型支持,用于表示可能失败的值
#include <string> // 使用 std::string
#include "eventuals/promisify.h" // 将 Eventual 转换为 std::future
#include "eventuals/then.h" // Then 组合器,用于处理异步结果
using namespace eventuals; // 使用 Eventuals 命名空间
int main() {
// 定义一个简单的函数 f,返回 expected<int>,值为 40
auto f = []() { return expected<int>(40); };
// 定义一个 Eventual pipeline 的生成函数 e
auto e = [&]() {
return f() >>
// 第一个 Then:将 expected<int> 中的值加 1,并返回 tl::expected<int, std::string>
Then([](int i) -> expected<int> {
return tl::expected<int, std::string>(i + 1);
}) >>
// 第二个 Then:将整数再次封装为 expected<int>
Then([](int i) {
return Just(expected<int>(i));
}) >>
// 第三个 Then:检查 expected 是否有值,并再次加 1
Then([](expected<int> e) {
CHECK(e.has_value()); // 确认 e 中有值
e = tl::expected<int, std::string>(e.value() + 1); // 值加 1
return e; // 返回新的 expected
});
};
// 使用 * 阻塞式获取 Eventual 的最终结果,并打印
std::cout << *e() << std::endl;
}
注释总结:
expected<int>
- 表示一个可能失败或有值的结果。
tl::expected<int, std::string>
还可以携带错误信息类型。
Then()
- 用于处理上一步 Eventual 的结果。
- 可以返回新的值或新的 Eventual。
Just()
- 将值注入 Eventual pipeline,使其可以继续链式操作。
*e()
- 阻塞式获取 Eventual pipeline 的最终结果(仅在测试或简单场景使用)。
CHECK(e.has_value())
- 确认 expected 中确实有值,防止访问空值导致异常。
3. Expected::Of 可以像 Eventual 一样组合(Composition)
Expected::Of<T>
可以和其他 Eventuals 使用>>
链式组合:
ReadPersonFromFile(file)
>> Then([](Person&& person) {
return GetFullName(person); // 返回 Expected::Of<T>
})
>> Then([](std::string&& full_name) {
...
});
- 这种方式和普通 Eventual 管道一样,错误会自动传播。
4. 高级组合:在 Eventual 中处理同步错误
- 如果想让错误在 Eventual 中继续传播,可以用
Let()
包裹:
ReadPersonFromFile(file)
>> Then(Let([](auto& person) {
return GetFullName(person) // 返回 Expected::Of<T>
>> Then([&](auto&& full_name) {
if (person.has_suffix) {
return full_name + " " + person.suffix();
} else {
return full_name;
}
});
}));
Let()
会把 Eventual 和Expected::Of<T>
结合,使同步错误也能像异步错误一样自动传播到上游。
总结理解
- Expected::Of
- 用于同步函数返回值或错误
- 正常值用
Expected()
,错误用Unexpected()
- Lambda 返回类型显式指定
- 当 lambda 可能返回
Expected()
和Unexpected()
时 - 返回类型写
-> Expected::Of<T>
- 当 lambda 可能返回
- 组合管道
Expected::Of<T>
可以直接和 Eventual 用>>
链式组合- 错误会自动传播
- 高级用法
- 用
Let()
把同步错误嵌入 Eventual 管道 - 可以继续异步组合,保持错误传播逻辑一致
- 用
Eventuals 的异步错误与同步化 部分理解整理一下,并解释关键概念。
#include "eventuals/expected.h" // 引入 expected 类型支持,用于表示可能成功或失败的值
#include <string> // 使用 std::string
#include "eventuals/promisify.h" // 用于将 Eventual 转换为 std::future(本例未使用 future)
#include "eventuals/then.h" // 引入 Then 组合器,用于在 Eventual pipeline 中处理结果
using namespace eventuals; // 使用 Eventuals 命名空间
int main() {
// 定义一个简单函数 f,返回 expected<int>,初始值为 40
auto f = []() { return expected<int>(40); };
// 定义一个 Eventual pipeline 的生成函数 e
auto e = [&]() {
return f() >>
// 第一步 Then:将 f() 返回的整数加 1,并用 tl::expected<int, std::string> 封装
Then([](int i) -> expected<int> {
return tl::expected<int, std::string>(i + 1);
}) >>
// 第二步 Then:将整数再次封装为 expected<int>,并注入 Eventual 管道
Then([](int i) {
return Just(expected<int>(i));
}) >>
// 第三步 Then:对 expected<int> 做检查和处理
Then([](expected<int> e) {
CHECK(e.has_value()); // 确保 expected 中有值
// 对值进行加 1 操作,并生成新的 expected
e = tl::expected<int, std::string>(e.value() + 1);
return e; // 返回新的 expected
});
};
// 使用 '*' 阻塞式获取 Eventual pipeline 的最终结果,并打印
// 注意:这种写法仅适用于测试或简单场景
std::cout << *e() << std::endl;
}
注释重点总结:
expected<T>
/tl::expected<T, E>
- 表示一个可能成功或失败的值。
tl::expected<int, std::string>
可以携带错误信息类型std::string
。
Then()
- 用于处理上一步 Eventual 的结果。
- 可以返回同步值或新的 Eventual。
Just()
- 将值注入 Eventual 管道,使其继续链式操作。
*e()
- 阻塞式获取管道最终结果,只在测试或简单场景中使用。
CHECK(e.has_value())
- 确保 expected 中确实有值,防止访问空值导致异常。
理解:Asynchronous Errors(异步错误)
1. 异步错误处理
- 在 异步代码 中,错误处理比同步复杂,因为不同 eventual 返回值可能类型不同:
- 例如:
Just(T())
与Eventual<T>
类型不同 - 编译器需要明确类型,不能像同步代码只返回
T()
或Expected(T())
那样简单。
- 例如:
- 解决方法:
- 用
If()
条件返回不同类型的 continuation(延续) - 用
Raise()
抛出错误
- 用
示例:HTTP GET 异步请求
auto GetBody(const std::string& uri) {
return http::Get(uri)
>> Then([](auto&& response) {
return If(response.code == 200) // 如果返回码200
.then(Just(response.body)) // 返回body
.otherwise(Raise("HTTP GET failed w/ code " + std::to_string(response.code))); // 否则抛出错误
});
}
If()
不仅用于错误,也可用于 任何条件延续- 例如 HTTP 重定向:
auto GetOrRedirect(const std::string& uri, const std::string& redirect_uri) {
return http::Get(uri)
>> Then([redirect_uri](auto&& response) {
return If(response.code == 503) // 服务不可用
.then(http::Get(redirect_uri)) // 重定向请求
.otherwise(Just(response)); // 否则返回原始响应
});
}
#include <iostream>
#include "eventuals/eventual.h"
#include "eventuals/just.h"
#include "eventuals/then.h" // 需要引入 Then 组合器,用于处理异步结果
#include "eventuals/http.h" // HTTP GET 请求支持
#include "eventuals/if.h" // 异步条件分支 If
#include "eventuals/event-loop.h" // 事件循环管理异步任务
#include "eventuals/terminal.h" // Terminal 组合器,用于处理最终结果回调
#include "eventuals/promisify.h" // 将 Eventual 转换为 std::future
#include <chrono>
#include <future> // std::future
using namespace eventuals;
// ---------------------------
// HTTP GET 封装函数
// ---------------------------
auto GetBody(const std::string& uri) {
return http::Get("https://example.com/") >> Then([](http::Response&& response) -> auto {
auto code = response.code(); // 获取 HTTP 状态码
auto body = std::move(response.body()); // 获取响应内容并移动
// 异步条件分支:根据状态码判断
return If(code == 200)
.yes([body = std::move(body)]() { // 状态码 200
std::cout << "yes" << std::endl;
return Just(std::move(body)); // 返回 body
})
.no([code]() { // 非 200 状态码
std::cout << "no" << std::endl;
// 返回统一的 Eventual 类型,而不是直接返回字符串
return Just(std::string{"HTTP GET failed w/ code "} +
std::to_string(code));
});
});
}
// ---------------------------
// 帮助函数:轮询直到条件满足
// ---------------------------
void RunUntil(const std::function<bool()>& condition) {
while (!condition()) {
EventLoop::Default().RunUntilIdle(); // 处理事件循环中所有任务
}
}
// 模板版本:轮询直到 future 完成
template <typename T>
void RunUntil(const std::future<T>& future) {
return RunUntil([&future]() {
auto status = future.wait_for(std::chrono::nanoseconds::zero());
return status == std::future_status::ready; // future 已就绪
});
}
// ---------------------------
// 主函数
// ---------------------------
int main() {
bool done = false; // 标志异步任务是否完成
// 构造默认事件循环
EventLoop::ConstructDefault();
std::cout << "开始HTTP请求..." << std::endl;
// 创建 Eventual 管道并附加 Terminal 回调
auto e = GetBody("https://www.3rdparty.dev") >>
Terminal()
.start([&done](auto&& result) { // 成功回调
std::cout << " Get SUCCESS:\n" << result << std::endl;
done = true;
})
.fail([&done](auto&&... errors) { // 失败回调
std::cout << " Get FAILED" << std::endl;
done = true;
})
.stop([&done]() { // 停止回调
std::cout << " Get STOPPED" << std::endl;
done = true;
});
// 构建可启动的异步任务
auto k = Build(std::move(e));
k.Start(); // 启动异步管道
// 获取默认事件循环
auto& loop = EventLoop::Default();
// 设置轮询等待超时时间
int timeout = 5000; // 最大等待 5000 ms
int elapsed = 0;
// 手动轮询事件循环直到任务完成或超时
while (!done && elapsed < timeout) {
loop.RunUntilIdle(); // 处理当前事件循环中的所有任务
std::this_thread::sleep_for(std::chrono::milliseconds(10));
elapsed += 10;
}
// 销毁默认事件循环
EventLoop::DestructDefault();
return 0;
}
注释说明:
GetBody()
:使用Then
+If
来处理 HTTP 响应状态码,保证返回的类型统一。Terminal()
:注册成功、失败、停止的回调,类似异步回调。- 事件循环轮询:
RunUntilIdle()
可以让事件循环处理已经注册的异步任务。通过轮询done
标志或future
就绪状态,实现同步等待。 - done 标志:保证主线程可以等待异步任务完成后再退出。
2. 异步代码中的同步化(Synchronization)
- 异步代码也需要同步,但 不能使用阻塞的 std::mutex
- 应该使用 异步感知的锁(Lock)
示例:异步获取锁
Lock lock;
AsynchronousFunction()
>> Acquire(&lock) // 异步获取锁
>> Then([](auto&& result) {
// 在锁保护下操作...
})
>> Release(&lock); // 异步释放锁
#include "eventuals/lock.h" // 引入锁机制,用于异步临界区管理
#include "eventuals/then.h" // Then 组合器,用于异步链式处理
#include "eventuals/promisify.h" // 将 Eventual 转换为 std::future,方便阻塞等待结果
using namespace eventuals;
int main() {
// 定义一个 Lock(相当于互斥锁),用于控制 e1/e2 的执行顺序
Lock lock;
// 定义第一个异步事件 e1:
auto e1 = [&]() {
return Eventual<std::string>() // 声明产生 std::string 的异步事件
.start([](auto& k) { // start() 定义如何启动事件
// 在独立线程中调用 k.Start("t1"),异步产生值 "t1"
std::thread thread([&k]() mutable {
k.Start("t1");
});
thread.detach(); // 分离线程,避免阻塞主线程
})
>> Acquire(&lock) // 获取锁,保证临界区互斥
>> Then([](std::string&& value) { // Then 用于处理获取到的值
return std::move(value); // 直接返回 "t1"
});
};
// 定义第二个异步事件 e2(与 e1 类似,但产生 "t2"):
auto e2 = [&]() {
return Eventual<std::string>()
.start([](auto& k) {
std::thread thread([&k]() mutable {
k.Start("t2");
});
thread.detach();
})
>> Acquire(&lock)
>> Then([](std::string&& value) {
return std::move(value);
});
};
// 定义第三个事件 e3:用于释放锁
auto e3 = [&]() {
return Release(&lock) // 释放锁
>> Then([]() { // 然后返回 "t3"
return "t3";
});
};
// 将 e1/e2/e3 转换为 (future, continuation) 对,便于阻塞等待结果
auto [future1, t1] = Promisify("e1", e1());
auto [future2, t2] = Promisify("e2", e2());
auto [future3, t3] = Promisify("e3", e3());
// 启动 e1
t1.Start();
std::cout << future1.get() << '\n'; // 阻塞等待 e1 的结果,输出 "t1"
// 启动 e2 和 e3(e2 需要锁,e3 会释放锁)
t2.Start();
t3.Start();
// future3.get() 会输出 "t3"
std::cout << future3.get() << '\n';
// future2.get() 等待 e2 拿到锁后执行,输出 "t2"
std::cout << future2.get() << '\n';
}
执行流程说明
t1.Start()
启动e1
→ 产生"t1"
→ 获取锁成功 → 输出"t1"
。t2.Start()
启动e2
→ 产生"t2"
→ 但此时锁还没释放 → 阻塞等待。t3.Start()
启动e3
→ 释放锁 → 返回"t3"
→ 输出"t3"
。future2.get()
终于拿到"t2"
→ 输出"t2"
。
最终输出顺序:
t1
t3
t2
- 这种模式常用于在异步操作中捕获
this
指针,以保证对象状态访问安全
3. 简化模式:Synchronizable 类
- 可以在类中继承
Synchronizable
- 使用
Synchronized()
包裹方法,实现自动锁定/释放
class MyClass : public Synchronizable {
public:
auto MyMethod() {
return Synchronized(
Then([](auto&& result) {
// 在 Synchronizable::lock() 保护下执行
}));
}
};
- 这样就可以在异步方法中安全访问类的成员变量,而不需要手动管理 Lock/Acquire/Release
总结理解
- 异步错误处理
If()
根据条件选择不同 eventualRaise()
抛出错误- 可以像同步错误一样组合,支持自动传播
- 异步同步化
- 异步代码不能阻塞,使用
Lock
/Acquire()
/Release()
- 可以用
Synchronizable
+Synchronized()
简化锁管理
- 异步代码不能阻塞,使用
- 结合使用
- 异步管道 + 条件延续 + 异步锁
- 可以写出安全、可组合、性能高的异步逻辑
#include "eventuals/just.h" // 引入 Just 组合器,用于将值注入异步管道
#include "eventuals/lock.h" // 引入 Lock 和 Synchronizable,用于同步操作
#include "eventuals/promisify.h" // 引入 Promisify,将 Eventual 转换为 std::future
#include <iostream> // 标准输入输出库,用于 std::cout
using namespace eventuals; // 使用 Eventuals 命名空间,简化代码
// 定义 Foo 类,继承自 Synchronizable 以支持同步操作
struct Foo : public Synchronizable {
// 默认构造函数
Foo() {}
// 移动构造函数,确保 Synchronizable 的正确初始化
Foo(Foo&& that) : Synchronizable() {}
// 定义异步操作 Operation,返回 Eventual<std::string>
auto Operation() {
// Synchronized 确保操作在锁保护下执行,防止并发访问
// Just("operation") 注入字符串 "operation" 到异步管道
// Wait 等待某个条件,接受一个通知回调
return Synchronized(Just("operation") >> Wait([](auto notify) {
// Wait 的回调返回一个条件函数,决定是否继续
// 这里始终返回 false,表示不继续等待,直接返回 Just 的值
return [](auto&&...) { return false; };
}));
}
};
int main() {
// 创建 Foo 对象
Foo foo;
// 使用移动构造函数将 foo 移动到 foo2
Foo foo2 = std::move(foo);
// 调用 foo2 的 Operation 方法,获取异步操作的 Eventual
auto value = foo2.Operation();
// 解引用 Eventual,阻塞直到获取结果,并打印
std::cout << *value << '\n';
return 0; // 程序正常退出
}
#include "eventuals/just.h" // 引入 Just 组合器,用于将值注入异步管道
#include "eventuals/lock.h" // 引入 Lock 和 Synchronizable,用于同步操作
#include "eventuals/promisify.h" // 引入 Promisify,将 Eventual 转换为 std::future
#include <iostream> // 标准输入输出库,用于 std::cout
using namespace eventuals; // 使用 Eventuals 命名空间,简化代码
int main() {
// 定义一个 Foo 类型,继承自 Synchronizable(表示这个对象支持同步原语)
struct Foo : public Synchronizable {
auto Operation() {
// Synchronized() 会把内部的操作变成“同步操作”,避免多线程竞争
return Synchronized(
// Then 表示在前一步完成后执行下一步逻辑
Then([]() {
// 返回 Just(42),表示立即产生一个值 42
return Just(42);
}))
>>
// 再加一个 Then,把上一步的值 i 原封不动返回
Then([](auto i) { return i; });
}
};
Foo foo;
// foo.Operation() 构建了一个管道,返回类型是一个 "Deferred pipeline"
// 这里 `*foo.Operation()` 会触发隐式的同步执行并解包结果
// 所以得到的就是 int 值 42
std::cout << *foo.Operation() << '\n';
}
执行过程解析
foo.Operation()
返回一个 Eventual 管道,逻辑是:- 先运行
Then([](){ return Just(42); })
,产生42
。 - 然后运行
Then([](auto i){ return i; })
,把42
原样返回。
- 先运行
- 外面用
Synchronized(...)
包装,保证如果多个线程访问foo.Operation()
,执行是互斥的。 - 最后在
main()
里写*foo.Operation()
,会同步取出管道的结果,得到 42。 std::cout
输出42
。
所以最终打印结果是:
42
事件同步 (Wait/Notify) 示例代码 补全注释,逐行解释:
#include "eventuals/just.h" // 引入 Just 组合器,用于将值注入异步管道
#include "eventuals/lock.h" // 引入 Lock 和 Synchronizable,用于同步操作
#include "eventuals/promisify.h" // 引入 Promisify,将 Eventual 转换为 std::future
#include "eventuals/if.h" // 引入 If 组合器,用于条件逻辑
#include <iostream> // 标准输入输出库,用于 std::cout
using namespace eventuals; // 使用 Eventuals 命名空间,简化代码
int main() {
// 定义一个支持多条件变量的同步对象 Foo
struct Foo : public Synchronizable {
// 等待某个 id 对应的条件变量被唤醒
auto WaitFor(int id) {
return Synchronized(Then([this, id]() {
// 在 map 中查找或插入对应 id 的条件变量
auto [iterator, inserted] = condition_variables_.emplace(id, &lock());
ConditionVariable& condition_variable = iterator->second;
// 返回等待操作(会阻塞直到被唤醒)
return condition_variable.Wait();
}));
}
// 唤醒某个 id 对应的一个等待者
auto NotifyFor(int id) {
return Synchronized(Then([this, id]() {
auto iterator = condition_variables_.find(id);
return If(iterator == condition_variables_.end())
// 如果没找到对应条件变量,返回 false
.yes([]() { return false; })
// 找到了就唤醒一个等待的协程/线程
.no([iterator]() {
ConditionVariable& condition_variable = iterator->second;
condition_variable.Notify();
return true;
});
}));
}
// 唤醒某个 id 对应的所有等待者
auto NotifyAllFor(int id) {
return Synchronized(Then([this, id]() {
auto iterator = condition_variables_.find(id);
return If(iterator == condition_variables_.end())
// 没找到条件变量返回 false
.yes([]() { return false; })
// 找到了就唤醒所有等待者
.no([iterator]() {
ConditionVariable& condition_variable = iterator->second;
condition_variable.NotifyAll();
return true;
});
}));
}
// 保存不同 id 对应的条件变量
std::map<int, ConditionVariable> condition_variables_;
};
Foo foo;
// 创建 3 个等待 id=42 的 future
auto [future1, k1] = Promisify("k1", foo.WaitFor(42));
auto [future2, k2] = Promisify("k2", foo.WaitFor(42));
auto [future3, k3] = Promisify("k3", foo.WaitFor(42));
// 启动这三个等待操作
k1.Start();
k2.Start();
k3.Start();
// 确认三个 future 都还没完成(立即等待会超时)
assert(std::future_status::timeout == future1.wait_for(std::chrono::seconds(0)));
assert(std::future_status::timeout == future2.wait_for(std::chrono::seconds(0)));
assert(std::future_status::timeout == future3.wait_for(std::chrono::seconds(0)));
// 尝试唤醒 id=41(不存在),返回 false
*foo.NotifyFor(41);
// 唤醒 id=42 的一个等待者(成功,返回 true)
*foo.NotifyFor(42);
// future1 会被唤醒并完成
future1.get();
// 另外两个 future 依旧处于等待状态
assert(std::future_status::timeout == future2.wait_for(std::chrono::seconds(0)));
assert(std::future_status::timeout == future3.wait_for(std::chrono::seconds(0)));
// 唤醒 id=42 的所有等待者(成功,返回 true)
std::cout << *foo.NotifyAllFor(42);
// future2、future3 被唤醒并完成
future2.get();
future3.get();
}
运行逻辑总结
- 三个任务都在等待
id=42
的信号。 - 调用
NotifyFor(41)
无效,因为没有人在等id=41
。 - 调用
NotifyFor(42)
唤醒了其中一个等待者(比如future1
)。 - 之后
future2
和future3
仍在等待。 - 调用
NotifyAllFor(42)
唤醒剩余的两个等待者。 - 程序最终输出:
1
(因为 *foo.NotifyAllFor(42)
返回 true
,打印出来就是 1
)
#include "eventuals/lock.h" // 引入 Lock 和 Synchronizable,用于同步操作
#include "eventuals/promisify.h" // 引入 Promisify,将 Eventual 转换为 std::future
#include <iostream> // 标准输入输出库,用于 std::cout
using namespace eventuals; // 使用 Eventuals 命名空间,简化代码
// 这里的例子演示了 ConditionVariable 的一个潜在 bug:
// 如果 Wait 在条件已经满足的情况下依然把 eventual 入队,
// 那么后续 Notify 时可能访问到已经被释放的对象,从而导致 UAF(use-after-free)。
int main() {
struct Foo : public Synchronizable {
Foo() : condition_variable_(&lock()) {
std::cout << "Foo 构造完成,ConditionVariable 绑定 Lock\n";
}
// NotifyAll 用于唤醒所有等待的任务
auto NotifyAll() {
return Synchronized(Then([this]() mutable {
std::cout << "[NotifyAll] 执行 NotifyAll()\n";
condition_variable_.NotifyAll();
}));
}
// Wait 用于等待条件满足
auto Wait() {
return Synchronized(condition_variable_.Wait([]() {
// 这里的谓词永远返回 false,表示条件已经满足,无需等待
std::cout << "[Wait] 条件已满足,直接返回\n";
return false;
}));
}
ConditionVariable condition_variable_; // 条件变量,用于同步
};
// 创建一个 Foo 实例,包含锁和条件变量
Foo foo;
// 调用 Wait,因谓词返回 false,所以不会真的阻塞
std::cout << "调用 foo.Wait() ...\n";
*foo.Wait();
std::cout << "foo.Wait() 已完成\n";
// 调用 NotifyAll,应该没有任何等待的任务被唤醒
std::cout << "调用 foo.NotifyAll() ...\n";
*foo.NotifyAll();
std::cout << "foo.NotifyAll() 已完成\n";
std::cout << "程序结束\n";
}
ConditionVariable
有时你需要在持有锁的情况下等待某个条件满足,这时就需要类似 std::condition_variable
的机制。在 eventuals 里,可以用 ConditionVariable
来实现。
示例:
class SomeAggregateSystem : public Synchronizable {
public:
SomeAggregateSystem()
: initialization_(&lock()) {} // 绑定到 Synchronizable 提供的 lock()
auto MyMethod() {
return Synchronized(
// 等待初始化完成
initalization_.Wait([]() {
return cooling_subsystem_initialized_
&& safety_subsystem_initialized_;
})
>> Then([](auto&& result) {
// 初始化完成后才能执行的逻辑
}));
}
auto InitializeCoolingSubsystem() {
return CoolingSubsystemInitialization()
>> Synchronized(
Then([this]() {
cooling_subsystem_initialized_ = true;
initialization_.Notify(); // 通知条件变量
}));
}
auto InitializeSafetySubsystem() { ... }
private:
ConditionVariable initialization_;
};
这里的逻辑就是:
MyMethod()
在执行时会等待两个子系统都完成初始化;InitializeCoolingSubsystem()
完成后会Notify()
,提示条件变量去重新检查条件;- 当
cooling_subsystem_initialized_ && safety_subsystem_initialized_
成立时,Wait()
才会继续执行。
如果只想简单地等待一次Notify()
,那么可以直接调用Wait()
不传参数。
Task
Task
用来对 continuation/pipeline 进行类型擦除,让你可以不用关心最终拼接出来的复杂类型。
- 目前
Task
内部是用 动态堆分配 实现的; - 未来可能会提供
SizedTask
,支持预分配,避免堆分配开销。 - 注意
Task
需要一个 callable(可调用对象),因为要等到真正执行的时候再进行分配(这样调度器可以做一些优化,比如分配在当前线程的 NUMA 节点内存)。
示例:
Task::Of<int> task = []() { return Asynchronous(); };
这里的 Task::Of<int>
表示这个 Task
会最终产出一个 int
。
它可以像普通 eventual 一样进行组合:
auto e = Task::Of<int>([]() { return Asynchronous(); })
>> Then([](int i) {
return stringify(i);
});
注意:
Task::Of
也需要被 terminate(终止),否则不会执行;- 在测试里可以直接用
*task
运行(但是会阻塞当前线程)。
抽象类与虚函数
你可以定义一个基类,里面的方法返回 Task::Of<T>
,这样派生类可以选择同步实现或者异步实现。
例子:
同步派生类
class DerivedSynchronous : public Base {
public:
Task::Of<std::string> Method() override {
if (SomeCondition()) {
return Task::Success("success");
} else {
return Task::Failure("failure");
}
}
};
这里的 Task::Success
/ Task::Failure
类似于 Expected()
/ Unexpected()
,但名字更直观。
异步派生类
class DerivedAsynchronous : public Base {
public:
Task::Of<std::string> Method() override {
return []() {
return AsynchronousFunction()
>> Then([](bool condition) -> Expected::Of<std::string> {
if (condition) {
return Expected("success");
} else {
return Unexpected("failure");
}
});
};
}
};
这里返回的是一个异步的 pipeline,它在执行过程中会根据结果选择 Expected
或 Unexpected
。
总结:
- ConditionVariable = 异步世界里的条件变量,可以和
Synchronizable
搭配使用。 - Task = 类型擦除的 eventual,能统一同步和异步实现的接口。
- 抽象类 + Task = 可以让派生类灵活选择同步或异步实现,而不用改变函数签名。
这一部分 Eventual
:
Eventual
Eventual
是一种更底层、更灵活的异步构建方式。它的核心思想是:
你可以自己控制什么时候、以什么方式把值传递给后续的 continuation(下一个 eventual)。
基本用法
Eventual<std::string>([](auto& k) {
k.Start("hello world");
});
这里定义了一个 Eventual<std::string>
,它最终会调用 k.Start("hello world")
,把 "hello world"
传给下游的 eventual。
另一种更“显式”的写法是 builder 模式:
Eventual()
.start([](auto& k) {
k.Start("hello world");
});
处理错误和停止
你不仅可以定义正常的 start
,还可以定义 错误处理(fail
)和 停止处理(stop
):
Eventual<std::string>()
.start([](auto& k) {
k.Start("hello world");
})
.fail([](auto& k, auto&&... errors) {
// 出错时执行,可以恢复:比如也能调用 k.Start(...)。
})
.stop([](auto& k) {
// 上游停止时执行。
});
也就是说,fail
和 stop
回调可以选择直接终止,也可以决定“恢复”并继续向下游传值。
使用上下文数据 .context()
有时你需要在 start
、fail
、stop
这几个回调中共享某些状态或数据,可以用 .context()
:
Eventual<std::string>()
.context("hello world")
.start([](auto& data, auto& k) {
k.Start(data);
})
.fail([](auto& data, auto& k, auto&&... errors) {
// 出错时能访问同样的 data。
})
.stop([](auto& data, auto& k) {
// 停止时也能访问 data。
});
这样比单纯在 lambda 捕获里写更灵活,因为 context 可以在多个回调之间共享。
可中断的 Eventual
有时 eventual 启动后,你希望能取消或停止它,这就是 中断(Interrupt) 机制。
默认情况下 eventual 是不可中断的,你需要显式声明:
Eventual<std::string>()
.interruptible()
.start([](auto& k, Interrupt::Handler& handler) {
handler.Install([&k]() {
// 收到中断时,传播 Stop()
k.Stop();
});
k.Start("hello world");
});
这里 handler.Install(...)
就是注册中断处理逻辑。
注册和触发中断
auto [future, k] = Terminate(
Eventual<std::string>()
.interruptible()
.start([](auto& k, Interrupt::Handler& handler) {
handler.Install([&k]() {
k.Stop();
});
// 模拟一个“永远不开始”的异步任务
}));
Interrupt interrupt;
k.Register(interrupt); // 把中断注册到这个 eventual 上
k.Start();
interrupt.Trigger(); // 触发中断
future.get(); // 会抛出异常,表示这个 eventual 被中断了
这里体现了“中断”而不是“取消”的设计哲学。中断的原因可能不只是“取消任务”,还可能是外部系统要求提前退出、错误恢复等。一般来说,在抽象设计中,可以默认把“中断”当成“取消”的一种情况。
总结:
Eventual
= 自己控制异步流程的最底层构建块。.start()
、.fail()
、.stop()
用来定义不同情况下的处理逻辑。.context()
提供共享状态。.interruptible()
让任务可以被外部中断(相当于取消/停止)。
关于 Stream
、Repeat
、Loop
、Map
、Reduce
的说明成理解,并逐一解释:
Stream
(流)
- 用途:允许一个异步计算返回多个值,而不仅仅是一个。
- 不同于
succeed()
/fail()
/stop()
,流使用:emit()
→ 发射一个值ended()
→ 表示流结束,没有更多的值了
- 要把流“转换”回
Eventual
,需要用Loop
:next()
→ 请求下一个值done()
→ 表示不再需要更多值
默认情况下,流是无缓冲的,这样可以实现显式的流控(flow control)和反压(back pressure)。
示例:
Stream<int>()
.context(5) // 初始上下文:count = 5
.next([](auto& count, auto& k) {
if (count-- > 0) {
emit(k, count); // 发射一个值
} else {
ended(k); // 没有更多值了
}
})
>> Loop<int>()
.context(0) // 初始 sum = 0
.body([](auto& sum, auto& stream, auto&& value) {
sum += value;
next(stream); // 请求下一个值
})
.ended([](auto& sum, auto& k) {
succeed(k, sum); // 最终返回总和
});
这个例子中:
Stream<int>()
依次发射 5、4、3、2、1、0(倒数)。Loop<int>()
收集所有值并求和。- 最后返回
sum
。
Repeat
- 可以用来创建一个流,它会重复执行某个异步计算。
- 本质上它和
Stream()
类似,但方便写重复的异步操作。
示例:
Repeat([]() { return Asynchronous(); });
这会不断地重复 Asynchronous()
计算。
Map
和 Reduce
Map
:对流中的每个元素做变换。Reduce
:对流中的元素做聚合,类似函数式编程中的 map-reduce。
示例:
Iterate({1, 2, 3, 4, 5})
>> Map([](int i) {
return i + 1; // 每个元素加 1
})
>> Reduce(
/* 初始 sum = */ 0,
[](auto& sum) {
return Then([&](auto&& value) {
sum += value; // 累加
return true; // 继续
});
});
整个过程:
Iterate({1,2,3,4,5})
→ 相当于一个流,依次发射 1,2,3,4,5Map
→ 每个值变成i+1
→ 结果是 2,3,4,5,6Reduce
→ 从初始sum=0
开始累加 → 最后结果是20
总结一下:
Stream
→ 返回多个值(emit/ended)。Loop
→ 消费流(next/done)。Repeat
→ 重复异步操作,相当于无限流。Map
→ 逐元素转换。Reduce
→ 聚合流成单个结果。
无限 Loop
(Infinite Loop)
有时候你会遇到无限流(比如一个数据源源不断地产生数据)。
这时可以用 Loop()
来无限循环消费这个流。
示例:
SomeInfiniteStream()
>> Map([](auto&& i) { return Foo(i); })
>> Loop(); // 无限循环
含义:
SomeInfiniteStream()
→ 产生一个无限的数据流。Map()
→ 对流里的每个元素应用Foo(i)
转换。Loop()
→ 一直消费下去,没有结束条件。
http
http
命名空间提供了 HTTP 客户端(以及正在开发中的服务端)实现。
HTTP GET 请求
示例:
http::Get("http://example.com") // 使用 'https://' 表示 TLS/SSL
>> Then([](http::Response&& response) {
// 在这里处理响应
});
含义:
执行一个 HTTP GET 请求,返回 http::Response
,然后用 Then()
处理结果。
HTTP POST 请求
示例:
http::Post(
"https://jsonplaceholder.typicode.com/posts",
{{"first", "emily"}, {"last", "schneider"}})
>> Then([](auto&& response) {
// 在这里处理响应
});
含义:
向服务器发送一个 POST 请求,带有表单数据 first=emily, last=schneider
,然后处理响应。
更细粒度控制:http::Client
如果你需要对 HTTP 请求做更细致的控制,可以先创建一个 http::Client
。
例如,不想在 HTTPS 下验证对端证书(verify_peer(false)
):
示例:
http::Client client = http::Client::Builder()
.verify_peer(false)
.Build();
client.Post(
"https://jsonplaceholder.typicode.com/posts",
{{"first", "emily"}, {"last", "schneider"}})
>> Then([](auto&& response) {
// 在这里处理响应
});
含义:
这里 client
配置了“忽略证书验证”,然后用它发起 POST 请求。
更灵活控制:http::Request
如果只想在单个请求上配置一些特殊参数,可以用 http::Request
。
示例:添加请求头
client.Do(
http::Request::Builder()
.uri("https://3rdparty.dev")
.method(http::GET)
.header("key", "value")
.header("another", "example")
.Build())
>> Then([](auto&& response) {
// 在这里处理响应
});
含义:
这里创建了一个 HTTP GET 请求,并在请求中加了两个自定义 header。
请求级别覆盖(Request overrides Client)
如果 http::Request
和 http::Client
配置冲突,以 Request 为准。
示例:
client.Do(
http::Request::Builder()
.uri("https://3rdparty.dev")
.method(http::GET)
.verify_peer(true) // 覆盖 client 的 verify_peer(false)
.Build())
>> Then([](auto&& response) {
// 在这里处理响应
});
含义:
虽然 client
设置了 verify_peer(false)
,但 Request
中强制 verify_peer(true)
,最终以 Request
配置为准。
总结:
Loop()
可以消费无限流。http::Get
/http::Post
用于基本的 HTTP 请求。http::Client
用于全局配置(例如关闭证书验证)。http::Request
用于单次请求的精细化控制,并且能覆盖Client
的默认设置。
TLS/SSL 证书验证 (Certificate Verification)
在前面的例子里你已经看到,可以通过在 http::Client
或 http::Request
构造时调用 verify_peer(false)
来跳过证书验证。
除此之外,你还可以提供一个 CA 根证书 来验证对端。
示例(在客户端中设置证书):
// 从文件中读取 PEM 格式的证书。
std::filesystem::path path = "/path/to/certificate";
Expected::Of<x509::Certificate> certificate = pem::ReadCertificate(path);
CHECK(certificate); // 检查是否成功读取证书。
http::Client client = http::Client::Builder()
.certificate(*certificate) // 配置证书
.Build();
client.Get("https://3rdparty.dev")
>> Then([](auto&& response) {
// 在这里处理响应
});
含义:
pem::ReadCertificate(path)
从 PEM 文件中读取证书;client
使用该证书来验证 HTTPS 对端。
示例(在单个请求中设置证书):
// 从文件中读取 PEM 格式的证书。
std::filesystem::path path = "/path/to/certificate";
Expected::Of<x509::Certificate> certificate = pem::ReadCertificate(path);
http::Client client = http::Client::Builder().Build();
client.Do(
http::Request::Builder()
.uri("https://3rdparty.dev")
.method(http::GET)
.certificate(*certificate) // 仅对这个请求生效
.Build())
>> Then([](auto&& response) {
// 在这里处理响应
});
含义:
这里证书不是绑定到整个 http::Client
,而是只对单个 Request
生效。
RSA, X.509, 和 PEM
这些是常见的加密标准:
- RSA:一种公钥加密算法,用于生成密钥对(公钥、私钥)。
- X.509:证书标准,描述如何将公钥、签名、身份信息(如 IP/域名)绑定在一起。
- PEM:一种编码格式,常见于
.pem
文件,用于存储密钥或证书。
生成 RSA 密钥对
示例:
Expected::Of<rsa::Key> key = rsa::Key::Builder().Build();
生成一个新的 RSA 密钥对(公钥+私钥)。
用 RSA 密钥生成 X.509 证书
示例:
Expected::Of<x509::Certificate> certificate =
x509::Certificate::Builder()
.subject_key(rsa::Key(*key)) // 证书的主体使用这个 key
.sign_key(rsa::Key(*key)) // 用同一个 key 对证书签名
.ip(address) // 证书绑定的 IP 地址
.Build();
这里生成了一个自签名证书(自己签发,常用于测试或内部使用)。
将 RSA Key 或 X.509 Certificate 编码成 PEM 格式
示例:
Expected::Of<std::string> pem_key = pem::Encode(*key);
Expected::Of<std::string> pem_certificate = pem::Encode(*certificate);
这样就能得到一个 PEM 格式的字符串,可以写入 .pem
文件保存。
从 PEM 文件读取证书
示例:
// 从文件中读取 PEM 编码的证书
std::filesystem::path path = "/path/to/certificate";
Expected::Of<x509::Certificate> certificate = pem::ReadCertificate(path);
用于加载外部已有的 PEM 格式证书。
总结:
- TLS 证书验证:
- 可以选择跳过验证(
verify_peer(false)
)。 - 可以通过加载 PEM 文件的 CA 证书来启用验证。
- 证书可以绑定到
http::Client
(全局生效)或http::Request
(单次生效)。
- 可以选择跳过验证(
- RSA, X.509, PEM:
- RSA 生成密钥对;
- X.509 用来生成证书(可以绑定 IP/域名);
- PEM 是密钥和证书的存储/交换格式。
builder::Field
/ builder::FieldWithDefault
以及模板参数控制的「是否已设置」逻辑。
背景
eventuals
库里很多地方用到了 Builder 模式(构建器模式),但是它和常见的 builder 有点不一样:
- 它在 编译期 就能检查哪些字段是必须设置的。
- 这是通过一些模板技巧和
builder::Field
来实现的。 - 最终结果是:如果你忘了设置必须的字段,代码直接 编译不过,而不是等到运行时报错。
例子:http::Request
一开始只有一个空的 Request
类:
namespace http {
class Request {};
} // namespace http
我们希望用户可以通过类似下面的方式构造它:
auto req = http::Request::Builder()
.method(Method::POST)
.uri("https://...")
.timeout(2s)
.Build();
并且如果用户忘记 .method()
或 .uri()
,代码直接编译错误。
Builder 模板
先声明 Builder
接口和 _Builder
模板:
class Request {
public:
static auto Builder();
private:
template <bool, bool>
struct _Builder;
};
这里的两个 bool
模板参数分别表示:
has_method_
:method 是否已经设置过has_uri_
:uri 是否已经设置过
所以_Builder<false, false>
表示两个字段都没设置过;_Builder<true, false>
表示 method 已经设置过,uri 没设置;
最终要求必须到_Builder<true, true>
才能Build()
。
builder::Field
builder::Field<Type, has>
相当于编译期版本的 std::optional<Type>
:
- 如果
has == false
,它是「空」的; - 如果
has == true
,它一定有值; - 这样在调用
.value()
时,编译器就能知道值一定存在(不会有空的情况)。
例如:
template <bool has_method_, bool has_uri_>
class Request::_Builder : public builder::Builder {
private:
builder::Field<Method, has_method_> method_;
builder::Field<std::string, has_uri_> uri_;
};
构造和 Setter
Builder 需要能不断「生成一个新的 Builder」。
所以它的 setter 写法是:
auto method(Method method) && {
static_assert(!has_method_, "Duplicate 'method'");
return Construct<_Builder>(
method_.Set(method),
std::move(uri_));
}
解释:
static_assert(!has_method_)
保证 method 只能设置一次。Construct<_Builder>(...)
会调用_Builder
的私有构造函数,生成一个新的 builder,里面 method 已经有值了。&&
(rvalue ref 限定)是为了强制用户只能在链式调用时用,不会误以为 builder 是可变的。
Build()
最终需要 Build()
来真正生成 Request
:
Request Build() && {
static_assert(has_method_, "Missing 'method'");
static_assert(has_uri_, "Missing 'uri'");
return Request{ ... };
}
这里的 static_assert
会在编译时检查必填字段是否都设置了。
默认值字段
如果有字段可以有默认值,就用 builder::FieldWithDefault<Type, has>
:
builder::FieldWithDefault<std::chrono::nanoseconds, has_timeout_>
timeout_ = std::chrono::seconds(10);
它的区别在于:
- 不管
has_timeout_
是true
还是false
,value()
总是可用的。 - 如果用户没调用
.timeout(...)
,就用默认值(这里是 10 秒)。
总结
bool
模板参数用来跟踪字段是否已设置。builder::Field
表示「必须设置」的字段。builder::FieldWithDefault
表示「可选,有默认值」的字段。- setter 方法通过
Construct()
创建新的 Builder,保证每次调用都会「生成新的状态」。 Build()
用static_assert
保证所有必填字段都已设置。
更多推荐
所有评论(0)