(本文主要参照Python在线文档进行学习)

0.前言

进程是系统资源分配的最小单位,进程之间堆栈、虚拟地址空间等独立;线程是系统调度的最小单位,同进程内的线程之间共享堆、地址空间、全局变量,但栈、寄存器、程序计数器等独立;协程通常运行在用户态线程之上,拥有独立的用户态栈或等价状态,由程序或运行时进行协作式调度。协程避免了内核级调度和上下文切换,具有更低的切换成本,但代码设计和实现的复杂度更高。

1.初探 asyncio

asyncio 是一个基于 async / await 语法编写的并发代码库,在Python 3.4 引入,直接内置了对异步IO的支持,并在后来的版本中不断完善。早期基于生成器来实现协程,但是在 Python 3.8 开始不推荐使用该方式,并将在 3.10 移除。新的标准使用 async / await 语法来实现协程。

要实际运行协程,asyncio 提供了以下机制:

a.使用 asyncio.run() 函数来运行顶层入口点

(运行代码需要 Python3.7 ;要注意的是,直接调用 func() 并不能执行)

import asyncio
import time

async def func():
    print('func in'+time.strftime('%X'))
    await asyncio.sleep(3)
    print('func out'+time.strftime('%X'))

print('begin'+time.strftime('%X'))
asyncio.run(func())
print('end'+time.strftime('%X'))

b.顺序执行

import asyncio
import time

async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)

async def main():
    print(f"started at {time.strftime('%X')}")
    await say_after(1, 'hello')
    await say_after(2, 'world')
    print(f"finished at {time.strftime('%X')}")

asyncio.run(main())

任务顺序执行,耗时1+2

c.并行执行

修改上个示例的 main 函数,用asyncio.create_task() 函数把协程作为异步任务来同时执行

async def main():
    task1 = asyncio.create_task(
        say_after(1, 'hello'))

    task2 = asyncio.create_task(
        say_after(2, 'world'))

    print(f"started at {time.strftime('%X')}")

    # 等待直到两个任务都完成
    # (会花费约 2 秒钟。)
    await task1
    await task2

    print(f"finished at {time.strftime('%X')}")

两个任务并行执行,耗时 max(1, 2)

d.更现代化的替代方案

asyncio.TaskGroup 类提供了 create_task() 的更现代化的替代,使用此 API,之前的例子将变为:

async def main():
    async with asyncio.TaskGroup() as tg:
        task1 = tg.create_task(
            say_after(1, 'hello'))

        task2 = tg.create_task(
            say_after(2, 'world'))

        print(f"started at {time.strftime('%X')}")

    # 当存在上下文管理器时 await 是隐式执行的。
    print(f"finished at {time.strftime('%X')}")

执行时间也是 max(1, 2)

2.可等待对象 awaitable

如果一个对象可以在 await 表达式中使用,那么它就是 awaitable 的。可等待的对象主要有三种:协程 coroutines,任务 Tasks,期望 Futures。

a.协程 coroutines

Python 协程属于awaitable 对象,因此可以在其他协程中被 await:

import asyncio

async def nested():
    return 42

async def main():
    # 一个协程对象创建了但是没有被 await,不会执行
    print(nested())
    # 输出 "42".
    print(await nested())  

asyncio.run(main())

两个重要概念:

协程函数:通过 async def 定义的函数;

协程对象:调用协程函数返回的对象。

b.任务 Tasks

任务可以用来并行调度协程。当使用 asyncio.create_task() 等函数包装一个协程为任务时,该协程很快会被自动调度执行。

import asyncio

async def nested():
    return 42

async def main():
    # 将 nested() 加入计划任务
    # 立即与 "main()" 并发运行。
    task = asyncio.create_task(nested())

    # "task" 现在可用于取消 "nested()", 或是等待其完成
    await task

asyncio.run(main())

c.期望 Futures

Future 是一种特殊的底层级可等待对象,代表异步操作的最终结果。当 await 一个 Future 时,协程将等待直到 Future 在别的地方完成。在 asyncio 中需要 Future 对象以便允许通过 async/await 使用基于回调的代码。一般不需要在应用层级代码中创建 Futures 对象。

Future 对象有时会由库和某些 asyncio API 暴露给用户,用作可等待对象:

async def main():
    await function_that_returns_a_future_object()

    # 这样也可以:
    await asyncio.gather(
        function_that_returns_a_future_object(),
        some_python_coroutine()
    )

3.部分 API 参考

a.运行一个异步程序

asyncio.run(coro, *, debug = False )

执行协程 coro 并返回结果。它应该用作 asyncio 程序的主要入口点,理想情况下应该只调用一次。

b.创建任务

asyncio.create_task(coro, *, name = None )

将协程 coro 包装为一个 Task 并调度执行,返回一个 Task 对象。

c.休眠

asyncio.sleep(delay, result=None, *, loop=None)

阻塞等待 delay 秒,sleep 挂起当前任务,允许其他任务运行。

d.同时运行

asyncio.gather(*aws, loop=None, return_exceptions=False)

并发执行 aws 列表中的可等待对象。如果 aws 中有任何可等待的协程,将自动作为一个 Task 调度。

import asyncio

async def factorial(name, number):
    f = 1
    for i in range(2, number + 1):
        print(f"Task {name}: Compute factorial({i})...")
        await asyncio.sleep(1)
        f *= i
    print(f"Task {name}: factorial({number}) = {f}")

async def main():
    # Schedule three calls *concurrently*:
    await asyncio.gather(
        factorial("A", 2),
        factorial("B", 3),
        factorial("C", 4),
    )

asyncio.run(main())

# Expected output:
#
#     Task A: Compute factorial(2)...
#     Task B: Compute factorial(2)...
#     Task C: Compute factorial(2)...
#     Task A: factorial(2) = 2
#     Task B: Compute factorial(3)...
#     Task C: Compute factorial(3)...
#     Task B: factorial(3) = 6
#     Task C: Compute factorial(4)...
#     Task C: factorial(4) = 24

e.超时等待

asyncio.wait_for(aw, timeout, *, loop=None)

aw 是一个协程,如果 timeout 秒数为 None,将会阻塞直到完成。

async def eternity():
    # Sleep for one hour
    await asyncio.sleep(3600)
    print('yay!')

async def main():
    # Wait for at most 1 second
    try:
        await asyncio.wait_for(eternity(), timeout=1.0)
    except asyncio.TimeoutError:
        print('timeout!')

asyncio.run(main())

# Expected output:
#
#     timeout!

协程相关的接口还是挺多的,这里只列举了部分,详情可以参照书籍或文档 

4.练习

借助 AI 可以生成一些简单的用例:

a.多个任务并发执行

import asyncio
import time

async def worker(name: str, delay: float):
    print(f"{time.strftime('%X')} | {name} start")
    await asyncio.sleep(delay)
    print(f"{time.strftime('%X')} | {name} end")
    return name

async def main():
    print(f"{time.strftime('%X')} | main start")

    tasks = [
        asyncio.create_task(worker("A", 2)),
        asyncio.create_task(worker("B", 1)),
        asyncio.create_task(worker("C", 3)),
    ]

    results = await asyncio.gather(*tasks)

    print(f"{time.strftime('%X')} | results = {results}")
    print(f"{time.strftime('%X')} | main end")

if __name__ == "__main__":
    asyncio.run(main())

b.冒泡排序

import asyncio

async def bubble_step(arr, n):
    """对 arr 执行一次冒泡迭代"""
    print(f"Start bubble step for n={n}")
    try:
        swapped = False
        for i in range(n - 1):
            if arr[i] > arr[i + 1]:
                arr[i], arr[i + 1] = arr[i + 1], arr[i]
                swapped = True
            # 模拟异步等待,让出控制权
            await asyncio.sleep(0)  
        return swapped
    except asyncio.CancelledError:
        print("Bubble step cancelled")
        raise

async def bubble_sort(arr):
    """按冒泡排序逻辑,每次循环一步"""
    n = len(arr)
    try:
        for i in range(n, 1, -1):
            swapped = await bubble_step(arr, i)
            print(f"After step: {arr}")
            if not swapped:
                break
    except asyncio.CancelledError:
        print("Bubble sort cancelled")
        raise

async def main():
    arr = [10, 9, 8, 7, 6, 5, 4, 3, 2, 1]
    print(f"Original array: {arr}")

    sort_task = asyncio.create_task(bubble_sort(arr))

    # 模拟在某个时间点取消排序
    await asyncio.sleep(0.1)  # 可以调整取消时机
    sort_task.cancel()

    try:
        await sort_task
    except asyncio.CancelledError:
        print("Main: bubble_sort task has been cancelled")

    print(f"Final array state: {arr}")

asyncio.run(main())

5.参考:

Python在线文档:https://docs.python.org/3/library/asyncio-task.html

百度:https://wenku.baidu.com/view/a52ac722a22d7375a417866fb84ae45c3b35c24e.html

博客:https://www.jianshu.com/p/e38ee24dd004

博客:https://www.liaoxuefeng.com/wiki/1016959663602400/1017968846697824

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐