Python异步编程
Python异步编程摘要(148字) Python异步编程主要通过协程实现轻量级并发。协程是用户态线程,通过事件循环调度实现单线程并发,比进程/线程更高效。核心要点:1)使用async/await语法定义协程;2)通过asyncio.run()启动事件循环;3)用create_task将协程转为任务实现并发;4)TaskGroup可批量管理并发任务。关键优势在于:1)资源消耗低;2)适合I/O密集
Python异步编程
1. 异步开发的几种实现方式
1. 多进程
- 进程是操作系统进行资源分配和调度的一个独立单位,每个进程都有自己独立的内存空间和系统资源。通过创建多个进程,各个进程可以同时执行不同的任务,实现并发编程。进程之间的运行互不影响,如果一个进程崩溃,不会影响其他进程的正常运行。但是,进程之间的切换成本和通信成本相对较高,每个进程需要占用独立的系统资源(如内存),因此对资源的消耗相对较大。在Python中,多进程适合处理CPU密集型任务,比如视频转码,科学计算等。
2. 多线程
- 线程是进程内的一个执行单元,是操作系统进行任务调度的基本单位。一个进程内通常包含多个线程,这些线程共享进程的资源,包括内存空间、文件句柄等。通过创建多个线程,一个进程内的多个任务可以同时执行,实现并发编程。与进程相比,线程之间的切换成本更低,且线程之间的通信更方便,因为他们共享同一份内存空间。但是,线程之间的运行相互影响,如果一个线程崩溃,可能会影响同一进程内的其他线程。另外,线程的并发执行需要操作系统的支持,不同的操作系统对线程的支持程度不同。Python中的线程有GUL锁,即在同一个进程中,同一时刻只能有一个线程上CPU运行,所以Python多线程不适合处理CPU密集型任务。多线程适合处理I/O密集型任务,比如网络请求,磁盘读写等。
3. 协程
- 协程(Coroutine)是一种程序组件,通过它可以实现多任务的并发执行。与传统进程和线程相比,协程提供了更轻量级的并发编程解决方案。接下来会详细介绍协程。
2. 协程
1. 协程的概念
- 协程是一种用户态的轻量级线程,它允许程序在单个线程内实现多个任务的并发执行。写成通过协作式多任务来实现,这意味着协程会主动交出控制权,让其他协程运行。与进程和线程不同,协程的切换不需要操作系统内核的介入,从而降低了开销。
2. 协程实现并发编程
- 协程实现并发编程的核心思想是利用函数的暂停和恢复。在协程中,函数可以在某个点暂停执行,并在适当的时候恢复执行,而不会影响其他协程的运行。这种机制使得多个协程可以在单个线程内交替执行,从而实现并发。
- 协程的实现通常依赖于以下两个关键概念:
- 生成器(Generator):生成器是一种特殊的函数,可以在执行过程中多次暂停和恢复。通过生成器,我们可以实现简单的协程功能。例如Python中使用
yield关键字可以创建生成器。 - 异步编程(Asynchronous Programming):异步编程是一种编程范式,允许程序在等待I/O操作完成时执行其他任务。在协程中,可以利用异步编程实现并发。
- 生成器(Generator):生成器是一种特殊的函数,可以在执行过程中多次暂停和恢复。通过生成器,我们可以实现简单的协程功能。例如Python中使用
3. 协程与进程、线程的优势
- 轻量级:协程的创建和切换开销远低于线程和进程。由于协程在用户态执行,因此不需要内核态的上下文切换,从而降低了开销。
- 高并发性能:由于协程的轻量级特性,单个线程可以创建大量的协程,实现高并发处理。相比之下,线程和进程的数量受到系统资源的限制。
- 资源共享:协程在单个线程内运行,可以轻松地共享资源,无序考虑线程和进程间的同步和通信问题。
- 简化编程模型:协程的协作式多任务特性使得并发编程更加直观和简单。开发者可以专注于业务逻辑,而不是线程或进程的同步和竞争条件。
4. 主流协议
- WSGI:同步。通过多进程+多线程的方式来实现并发。
- ASGI:异步。通过多进程+主线程(不存在多线程)+协程来实现并发。
3. 第一个协程代码
在Python3.4中添加了asyncio库,这让我们利用Python编写协程代码变得更加简单(不要再用yield了,早过时了)。
import asyncio
async def main():
print("hello")
# 协程必须等待,也就是必须在前面加上await关键字
await asyncio.sleep(1) # 这里不要使用同步的I/O,如time.sleep(),否则就是同步的,失去了并发性
print('world')
if __name__ == '__main__':
# 创建一个协程对象
# main(): 这样并不是直接执行main函数,而是创建一个协程
cor = main()
# 要把协程丢到事件循环中,才能运行协程
asyncio.run(cor)
- 以上代码有几点需要说明:
- a. 协程不会自己运行,需要加入到事件循环中,让事件调度运行。我们可以通过
asyncio.run()函数来将一个协程放到事件循环中; - b. 通过在函数前面加上
async关键字,将一个普通的函数变为一个协程; - c. 在协程中,使用
await关键字等待一个协程执行完成(必须加await)。关键字await,也必须放到async定义的函数中,否则会报错; - d. 以上
asyncio.sleep函数不能用time.sleep来替换,后者是同步的,如果放到异步函数中,将无法发挥异步编程的优势。
- a. 协程不会自己运行,需要加入到事件循环中,让事件调度运行。我们可以通过
- 伪代码理解协程调度:
asyncio.run(main())就相当于把main()协程加入事件循环,即协程队列中。随后不断从队列中pop出协程。如果执行某个协程时,出现了资源等待,那么系统不会傻傻的等待这个协程资源就绪,而是直接进入下一次循环,pop出新的协程,继续执行。
# 有一个协程队列,存储所有需要执行的协程
queue = [cor1, cor2, ...]
while True:
cor = queue.pop()
result = await cor
4. 并发多个协程
先定义一个显示协程运行时间的装饰器:
# utils.py
import time
from functools import wraps
def async_timed(func):
@wraps(func)
async def wrapper(*args, **kwargs):
print(f'开始执行{func},参数为:{args}, {kwargs}')
start = time.time()
try:
return await func(*args, **kwargs)
finally:
end = time.time()
total = end - start
print(f'结束执行{func},耗时:{total:.4f}秒')
return wrapper
1. 用创建任务的方式并发运行
- 先展示一个错误的同步运行写法:
- 最终
main协程的运行时间为3s,没有实现并发。
- 最终
# 传入delay设置延迟
async def greet(name, delay):
await asyncio.sleep(delay)
return f"hello {name}"
# 同步运行
@async_timed
async def main():
# 执行两个协程
result1 = await greet(name='xxx', delay=1)
print(f'result1: {result1}')
result2 = await greet(name='yyy', delay=2)
print(f'result2: {result2}')
- 想实现并发,必须将协程包装成
Task任务对象(下面详细解释一下代码的执行顺序):- 首先,(1)执行,
asyncio.create_task将创建一个事件循环,并将协程greet(name='xxx', delay=1)放入队列中,开始执行; - 紧接着,(2)执行,将协程
greet(name='yyy', delay=2)也加入队列,开始执行; - (3)执行,
main()主程序等待task1执行完毕,时间为1s; - (4)执行,打印
result1; - (5)执行,由于两个协程几乎同时进入事件循环,所以此时
task2也已经执行1s了,main()主程序继续等待task2执行完毕,剩余时间为1s; - 最后(6)执行,打印
result。
- 首先,(1)执行,
# 传入delay设置延迟
async def greet(name, delay):
await asyncio.sleep(delay)
return f"hello {name}"
# 用创建任务的方式并发运行
@async_timed
async def main():
# 必须要将协程包装成Task对象,才能够并发执行
task1 = asyncio.create_task(greet(name='xxx', delay=1)) # (1)
task2 = asyncio.create_task(greet(name='yyy', delay=2)) # (2)
result1 = await task1 # (3)
print(f'result1: {result1}') # (4)
result2 = await task2 # (5)
print(f'result2: {result2}') # (6)
- 由此衍生出一个错误的写法:
- 如果在第一个协程进入事件循环后,就直接
await该协程,就会导致main()主程序在此等待task1完全执行完毕,1s后才会执行下一句代码,总时间还是3s; - 由此可见,想要实现并发,必须先让全部的协程都先进入事件循环,之后才能
await。
- 如果在第一个协程进入事件循环后,就直接
# 传入delay设置延迟
async def greet(name, delay):
await asyncio.sleep(delay)
return f"hello {name}"
@async_timed
async def main():
result1 = await asyncio.create_task(greet(name='xxx', delay=1))
result2 = await asyncio.create_task(greet(name='yyy', delay=2))
2. 使用任务组
- 除了创建完任务后,一个个
await,还可以使用TaskGroup创建一个任务组,然后再任务组中创建多个任务,最终统一await这个任务组(这个操作不用我们自己做,是自动的),示例:
# 传入delay设置延迟
async def greet(name, delay):
await asyncio.sleep(delay)
return f"hello {name}"
# 2. 用TaskGroup方式并发运行协程
@async_timed
async def main():
async with asyncio.TaskGroup() as group:
# 在这个里面创建任务
task1 = group.create_task(greet(name='xxx', delay=1))
task2 = group.create_task(greet(name='yyy', delay=2))
print(task1.result())
print(task2.result())
- 但是如果其中有任务出现异常了,就会导致后面的任务被取消,从而提前退出协程的并发运行。
async def greet_group(name, delay):
await asyncio.sleep(delay)
if name == "xxx":
raise ValueError("执行错误!")
return f'hello {name}'
# 2. 用TaskGroup方式并发运行协程
@async_timed
async def main():
try:
async with asyncio.TaskGroup() as group:
# 在这个里面创建任务
task1 = group.create_task(greet_group(name='xxx', delay=1))
task2 = group.create_task(greet_group(name='yyy', delay=2))
except Exception as e:
print(e)
# 其中有任务出现异常了,导致后面的任务被取消了,从而提前退出协程的并发运行
# * done:代表该协程是否完成(被取消也算完成)
# * cancelled:返回该协程是否被取消
print(task1.done()) # true
print(task2.cancelled()) # true
3. 使用gather
- 以上代码是手动创建任务后运行,另外还可以通过一个更高级的API来实现并发运行,即
asyncio.gather,这个函数的底层实际上也是将协程封装成Future对象,然后再并发运行。
# 传入delay设置延迟
async def greet(name, delay):
await asyncio.sleep(delay)
return f"hello {name}"
async def greet_group(name, delay):
await asyncio.sleep(delay)
if name == "xxx":
raise ValueError("执行错误!")
return f'hello {name}'
@async_timed
async def main1():
# gather在将所有协程全部执行完之后,会按照协程入队的顺序(注意不是协程执行完的顺序),将协程的返回值存放在results中
results = await asyncio.gather(
greet('张三', 1),
greet('李四', 3),
greet('王五', 2)
)
print(results)
@async_timed
async def main2():
results = await asyncio.gather(
greet_group('xxx', 1),
greet('李四', 3),
greet('王五', 2),
return_exceptions=True
)
print(results)
# results:[ValueError('执行错误!'), 'hello 李四', 'hello 王五']
- 补充:
- 如果gather中的协程出现异常,那么会抛出异常。如果不想抛出异常,那么可以设置
return_exceptions=True,就会把异常作为返回值,而不会抛出异常; - gather在将所有协程全部执行完后,会按照协程入队的顺序,将协程的返回值存放在
results中; - 与
TaskGroup相比:asyncio.gather函数即使其中有任务抛出异常,也不会取消后面的任务;而TaskGroup则是只要有一个任务抛出异常,后续的任务都会被取消。
- 如果gather中的协程出现异常,那么会抛出异常。如果不想抛出异常,那么可以设置
4. 使用as_completed
as_completed在每运行完一个协程后就返回,使用方法如下:
# 传入delay设置延迟
async def greet(name, delay):
await asyncio.sleep(delay)
return f"hello {name}"
@async_timed
async def main():
aws = [
greet('张三', 1),
greet('李四', 3)
]
for coro in asyncio.as_completed(aws):
result = await coro
print(result)
- 上述代码中,会并发执行
aws中的协程。as_completed函数会返回一个迭代器,最先执行完的任务会最先被遍历到。并且as_completed可以设置超时时间:
@async_timed
async def main():
aws = [
greet('张三', 1),
greet('李四', 3)
]
# 可以指定超时时间
# 如果超过指定超时时间,还有任务没有完成,那么会抛出TimeoutError异常
# 剩余的任务不会被取消
try:
for coro in asyncio.as_completed(aws, timeout=2):
result = await coro
print(result)
except asyncio.TimeoutError:
print('超时了!')
tasks = asyncio.all_tasks()
for task in tasks:
if task.get_name() == 'Task-1': # main协程不等待
continue
else:
# 如果没有继续等待task执行,那么这个task就不会执行了,而是处于pending状态
result = await task
print(result)
as_completed方法在其中某个任务抛出异常后,剩余的任务也不会被取消掉。可以通过await再次激活,这一点同gather。
5. 等待
有时候,我们期望某个协程或者任务最多运行多长时间,就可以使用
wait_for或wait函数。
1. wait_for(aw, timeout)
wait_for函数只能用于等待一个协程或者任务,可以指定超时时间。
# 传入delay设置延迟
async def greet(name, delay):
await asyncio.sleep(delay)
return f"hello {name}"
# wait_for
@async_timed
async def main():
try:
result = await asyncio.wait_for(greet('张三', 2), timeout=1)
print(result)
except asyncio.TimeoutError:
print('超时了!')
tasks = asyncio.all_tasks()
print(tasks) # 这里只会打印一个Task1,也就是main协程,超时的协程被取消了
- 超时后的任务,没法继续让其执行了。
2. wait(aws, timeout=None, return_when=ALL_COMPLETED)
- 这个函数可用于等待多个
Task或者Future(Task对象的基类),并且可以指定在什么情况下才会返回,默认是ALL_COMPLETED(全部执行完后返回),并且注意,这个函数不会触发TimeoutError,而是将执行完的,以及超时的任务,通过元组的形式返回:
# 传入delay设置延迟
async def greet(name, delay):
await asyncio.sleep(delay)
return f"hello {name}"
# wait
@async_timed
async def main():
aws = [
asyncio.create_task(greet('张三', 1)),
asyncio.create_task(greet('李四', 3))
]
# wait函数返回的结果是一个元组(执行完成的任务,执行超时的任务)
# 如果没有指定timeout,那么永远不会超时
done_tasks, pending_tasks = await asyncio.wait(aws, timeout=2)
print(done_tasks)
print(pending_tasks)
for task in pending_tasks:
result = await task # 没有执行完的协程可以继续执行
print(result)
- 其中,
return_when除了默认的ALL_COMPLETED外,还有以下可选值(当然超时就会直接返回,相当于这个参数就不起任何作用):ALL_COMPLETED:等所有任务都执行完后,再返回;FIRST_EXCEPTION:有任何任务发生异常后就立即返回,即使没有超时也会返回;FIRST_COMPLETED:第一个任务执行完后就立即返回。
6. 超时
asyncio提供了专门的超时API,用于限制某些任务的最大执行时间。超时API有两个,分别是:asyncio.timeout和asyncio.timeout_at。
1. asyncio.timeout(delay)
- 该函数返回一个异步上下文管理器,也就意味着我们可以使用
async with进行使用; - 其中
delay可以为具体的秒数,也可以为None,如果为None,那么代表哦永远不会超时; - 如果超时
delay的时间,那么下面所有的任务都将会被取消,并抛出TimeoutError异常。
# 传入delay设置延迟
async def greet(name, delay):
await asyncio.sleep(delay)
return f"hello {name}"
@async_timed
async def main():
try:
async with asyncio.timeout(1):
task1 = asyncio.create_task(greet('张三', 1), name='zhangsan')
task2 = asyncio.create_task(greet('李四', 2), name='lisi')
result1 = await task1
print(result1)
result2 = await task2
print(result2)
except asyncio.TimeoutError:
print('超时了!')
tasks = asyncio.all_tasks()
print(tasks) # lisi协程被打印,处于pending状态
2. asyncio.timeout_at(when)
- 与
asyncio.timeout不同的是,asyncio.timeout_at中的when参数是一个绝对一时间,或者为None。
7. 使用多线程执行同步任务
1. 有协程了,为什么还要用线程?
- 在Python中,虽然协程比线程效率更高,但是很多库比如
requests,并没有提供异步协程的版本。一旦在协程中使用了requests库中的同步方法,比如requests.get(),事件循环就会在这里阻塞,等待资源就绪,从而失去异步特性。 - 但是,如果我们新开一个线程,把同步的代码放在其中执行,就不会影响主线程中的事件循环了。
2. 代码示例
async def greet(name, delay):
await asyncio.sleep(delay)
return f"hello {name}"
def get_url(url):
print(f'开始获取{url}')
# 同步阻塞2s
time.sleep(2)
print(f'结束获取{url}')
return 'success'
@async_timed
async def main():
result = await asyncio.gather(
asyncio.to_thread(get_url, url="https://www.baidu.com"),
greet('张三', 2)
)
print(result)
if __name__ == "__main__":
asyncio.run(main())
8. Task对象
Task对象是用于封装和管理协程的运行的,可以将协程并发执行。Task对象有以下方法:
done:用于获取该Task对象是否执行完成(正常完成,异常,被取消都算done);result:用于获取该Task执行完后的返回值;exception:如果Task对象执行过程中发生异常,则该方法会返回异常信息。如果任务没有发生异常,那么调用exception()方法将抛出asyncio.exceptions.InvalidStateError: Exception is ont set.异常
async def task_will_fail():
await asyncio.sleep(1)
raise ValueError("发生异常!")
async def main1():
# create_task创建完任务后,这个任务会立马加入到事件循环中进行调度
task = asyncio.create_task(task_will_fail())
# 如果任务中没有出现异常,那么调用exception()方法就会出现异常
print(task.exception()) # 报错
async def main2():
task = asyncio.create_task(task_will_fail())
await asyncio.sleep(2)
print(task.exception()) # 打印异常,不报错
add_done_callback:添加任务执行完成后的回调。
async def greet(name, delay):
await asyncio.sleep(delay)
return f"hello {name}"
def my_callback1(task):
print('='*20)
print(type(task)) # <class '_asyncio.Task'>
print(task.result())
print('='*20)
def my_callback2(task, tag):
print('='*20)
print(type(task)) # <class '_asyncio.Task'>
print(task.result())
print('tag: ', tag)
print('='*20)
async def main1():
task = asyncio.create_task(greet('张三', 2))
task.add_done_callback(my_callback1)
await task
async def main2():
task = asyncio.create_task(greet('张三', 2))
# partial: 偏函数,可以提前准备好一些参数
task.add_done_callback(partial(my_callback2, tag='zhangsan'))
await task
cancel:取消任务的执行;
async def something():
print('something start')
await asyncio.sleep(20)
async def main():
task = asyncio.create_task(something())
task.cancel()
# 等待一个已经被取消的任务,会抛出CanceledError异常
try:
await task
except asyncio.CancelledError:
print('是否被取消:', task.cancelled())
cancelled:判断任务是否被取消;get_name:获取任务的名称;set_name:设置任务的名称。
更多推荐

所有评论(0)