Python学习笔记:协程与任务初探
asyncio 是一个基于 async / await 语法编写的并发代码库,在Python 3.4 引入,直接内置了对异步IO的支持,并在后来的版本中不断完善。早期基于生成器来实现协程,但是在 Python 3.8 开始不推荐使用该方式,并将在 3.10 移除。新的标准使用 async / await 语法来实现协程。
(本文主要参照Python在线文档进行学习)
0.前言
进程是系统资源分配的最小单位,进程之间堆栈、虚拟地址空间等独立;线程是系统调度的最小单位,同进程内的线程之间共享堆、地址空间、全局变量,但栈、寄存器、程序计数器等独立;协程通常运行在用户态线程之上,拥有独立的用户态栈或等价状态,由程序或运行时进行协作式调度。协程避免了内核级调度和上下文切换,具有更低的切换成本,但代码设计和实现的复杂度更高。
1.初探 asyncio
asyncio 是一个基于 async / await 语法编写的并发代码库,在Python 3.4 引入,直接内置了对异步IO的支持,并在后来的版本中不断完善。早期基于生成器来实现协程,但是在 Python 3.8 开始不推荐使用该方式,并将在 3.10 移除。新的标准使用 async / await 语法来实现协程。
要实际运行协程,asyncio 提供了以下机制:
a.使用 asyncio.run() 函数来运行顶层入口点
(运行代码需要 Python3.7 ;要注意的是,直接调用 func() 并不能执行)
import asyncio
import time
async def func():
print('func in'+time.strftime('%X'))
await asyncio.sleep(3)
print('func out'+time.strftime('%X'))
print('begin'+time.strftime('%X'))
asyncio.run(func())
print('end'+time.strftime('%X'))

b.顺序执行
import asyncio
import time
async def say_after(delay, what):
await asyncio.sleep(delay)
print(what)
async def main():
print(f"started at {time.strftime('%X')}")
await say_after(1, 'hello')
await say_after(2, 'world')
print(f"finished at {time.strftime('%X')}")
asyncio.run(main())
任务顺序执行,耗时1+2

c.并行执行
修改上个示例的 main 函数,用asyncio.create_task() 函数把协程作为异步任务来同时执行
async def main():
task1 = asyncio.create_task(
say_after(1, 'hello'))
task2 = asyncio.create_task(
say_after(2, 'world'))
print(f"started at {time.strftime('%X')}")
# 等待直到两个任务都完成
# (会花费约 2 秒钟。)
await task1
await task2
print(f"finished at {time.strftime('%X')}")
两个任务并行执行,耗时 max(1, 2)

d.更现代化的替代方案
asyncio.TaskGroup 类提供了 create_task() 的更现代化的替代,使用此 API,之前的例子将变为:
async def main():
async with asyncio.TaskGroup() as tg:
task1 = tg.create_task(
say_after(1, 'hello'))
task2 = tg.create_task(
say_after(2, 'world'))
print(f"started at {time.strftime('%X')}")
# 当存在上下文管理器时 await 是隐式执行的。
print(f"finished at {time.strftime('%X')}")
执行时间也是 max(1, 2)
2.可等待对象 awaitable
如果一个对象可以在 await 表达式中使用,那么它就是 awaitable 的。可等待的对象主要有三种:协程 coroutines,任务 Tasks,期望 Futures。
a.协程 coroutines
Python 协程属于awaitable 对象,因此可以在其他协程中被 await:
import asyncio
async def nested():
return 42
async def main():
# 一个协程对象创建了但是没有被 await,不会执行
print(nested())
# 输出 "42".
print(await nested())
asyncio.run(main())

两个重要概念:
协程函数:通过 async def 定义的函数;
协程对象:调用协程函数返回的对象。
b.任务 Tasks
任务可以用来并行调度协程。当使用 asyncio.create_task() 等函数包装一个协程为任务时,该协程很快会被自动调度执行。
import asyncio
async def nested():
return 42
async def main():
# 将 nested() 加入计划任务
# 立即与 "main()" 并发运行。
task = asyncio.create_task(nested())
# "task" 现在可用于取消 "nested()", 或是等待其完成
await task
asyncio.run(main())
c.期望 Futures
Future 是一种特殊的底层级可等待对象,代表异步操作的最终结果。当 await 一个 Future 时,协程将等待直到 Future 在别的地方完成。在 asyncio 中需要 Future 对象以便允许通过 async/await 使用基于回调的代码。一般不需要在应用层级代码中创建 Futures 对象。
Future 对象有时会由库和某些 asyncio API 暴露给用户,用作可等待对象:
async def main():
await function_that_returns_a_future_object()
# 这样也可以:
await asyncio.gather(
function_that_returns_a_future_object(),
some_python_coroutine()
)
3.部分 API 参考
a.运行一个异步程序
asyncio.run(coro, *, debug = False )
执行协程 coro 并返回结果。它应该用作 asyncio 程序的主要入口点,理想情况下应该只调用一次。
b.创建任务
asyncio.create_task(coro, *, name = None )
将协程 coro 包装为一个 Task 并调度执行,返回一个 Task 对象。
c.休眠
asyncio.sleep(delay, result=None, *, loop=None)
阻塞等待 delay 秒,sleep 挂起当前任务,允许其他任务运行。
d.同时运行
asyncio.gather(*aws, loop=None, return_exceptions=False)
并发执行 aws 列表中的可等待对象。如果 aws 中有任何可等待的协程,将自动作为一个 Task 调度。
import asyncio
async def factorial(name, number):
f = 1
for i in range(2, number + 1):
print(f"Task {name}: Compute factorial({i})...")
await asyncio.sleep(1)
f *= i
print(f"Task {name}: factorial({number}) = {f}")
async def main():
# Schedule three calls *concurrently*:
await asyncio.gather(
factorial("A", 2),
factorial("B", 3),
factorial("C", 4),
)
asyncio.run(main())
# Expected output:
#
# Task A: Compute factorial(2)...
# Task B: Compute factorial(2)...
# Task C: Compute factorial(2)...
# Task A: factorial(2) = 2
# Task B: Compute factorial(3)...
# Task C: Compute factorial(3)...
# Task B: factorial(3) = 6
# Task C: Compute factorial(4)...
# Task C: factorial(4) = 24
e.超时等待
asyncio.wait_for(aw, timeout, *, loop=None)
aw 是一个协程,如果 timeout 秒数为 None,将会阻塞直到完成。
async def eternity():
# Sleep for one hour
await asyncio.sleep(3600)
print('yay!')
async def main():
# Wait for at most 1 second
try:
await asyncio.wait_for(eternity(), timeout=1.0)
except asyncio.TimeoutError:
print('timeout!')
asyncio.run(main())
# Expected output:
#
# timeout!
协程相关的接口还是挺多的,这里只列举了部分,详情可以参照书籍或文档
4.练习
借助 AI 可以生成一些简单的用例:
a.多个任务并发执行
import asyncio
import time
async def worker(name: str, delay: float):
print(f"{time.strftime('%X')} | {name} start")
await asyncio.sleep(delay)
print(f"{time.strftime('%X')} | {name} end")
return name
async def main():
print(f"{time.strftime('%X')} | main start")
tasks = [
asyncio.create_task(worker("A", 2)),
asyncio.create_task(worker("B", 1)),
asyncio.create_task(worker("C", 3)),
]
results = await asyncio.gather(*tasks)
print(f"{time.strftime('%X')} | results = {results}")
print(f"{time.strftime('%X')} | main end")
if __name__ == "__main__":
asyncio.run(main())
b.冒泡排序
import asyncio
async def bubble_step(arr, n):
"""对 arr 执行一次冒泡迭代"""
print(f"Start bubble step for n={n}")
try:
swapped = False
for i in range(n - 1):
if arr[i] > arr[i + 1]:
arr[i], arr[i + 1] = arr[i + 1], arr[i]
swapped = True
# 模拟异步等待,让出控制权
await asyncio.sleep(0)
return swapped
except asyncio.CancelledError:
print("Bubble step cancelled")
raise
async def bubble_sort(arr):
"""按冒泡排序逻辑,每次循环一步"""
n = len(arr)
try:
for i in range(n, 1, -1):
swapped = await bubble_step(arr, i)
print(f"After step: {arr}")
if not swapped:
break
except asyncio.CancelledError:
print("Bubble sort cancelled")
raise
async def main():
arr = [10, 9, 8, 7, 6, 5, 4, 3, 2, 1]
print(f"Original array: {arr}")
sort_task = asyncio.create_task(bubble_sort(arr))
# 模拟在某个时间点取消排序
await asyncio.sleep(0.1) # 可以调整取消时机
sort_task.cancel()
try:
await sort_task
except asyncio.CancelledError:
print("Main: bubble_sort task has been cancelled")
print(f"Final array state: {arr}")
asyncio.run(main())
5.参考:
Python在线文档:https://docs.python.org/3/library/asyncio-task.html
百度:https://wenku.baidu.com/view/a52ac722a22d7375a417866fb84ae45c3b35c24e.html
博客:https://www.jianshu.com/p/e38ee24dd004
博客:https://www.liaoxuefeng.com/wiki/1016959663602400/1017968846697824
更多推荐
所有评论(0)