python 异步
本文介绍了Python异步编程的核心概念:协程、任务和事件循环。协程是可暂停/恢复的函数,通过async/await定义和调用;任务是对协程的封装,由事件循环调度执行;事件循环是异步编程的引擎。文章详细讲解了协程函数与对象的区别、await的作用、将协程包装为任务的多种方法(手动create_task、自动await、gather()和as_completed()),并提供了相关代码示例。这些概念
异步
参考视频;
https://www.bilibili.com/video/BV157mFYEEkH
https://www.bilibili.com/video/BV1oa411b7c9
异步编程主要关注的是并发(Concurrency),而不是并行。它通过单线程中的事件循环来管理多个任务,这些任务在等待 I/O 操作时可以暂停执行,让其他任务继续运行。虽然异步编程可以提高程序的响应能力和资源利用率,但它本身并不实现真正的并行。
- 并发而非并行:
- 异步编程通过事件循环在单线程中管理多个任务,这些任务在等待 I/O 操作时可以暂停执行,让其他任务继续运行。
- 适用于 I/O 密集型任务:
- 对于 I/O 密集型任务,如网络请求、文件读写等,异步编程可以显著提高程序的性能,因为这些任务通常涉及大量的等待时间。
- 单线程:
- 异步编程通常在单线程中运行,避免了多线程编程中的复杂同步问题。
1. 协程 (Coroutine)
协程是 async/await 的核心概念,是一种可以暂停和恢复执行的函数。
协程是异步编程的基本单元,它是一个特殊的函数,可以通过 async 关键字定义。协程可以暂停和恢复执行,这使得它可以在等待 I/O 操作时让出控制权,让其他任务运行。
概念
● 使用 async def 定义的函数就是协程函数
● 调用协程函数不会立即执行,而是返回一个协程对象
● 协程需要通过事件循环或 await 来驱动执行
# iscoroutinefunction 检查一个方法是否是协程函数
import inspect
async def async_function():
return "Hello, World!"
def normal_function():
return "Hello, World!"
if __name__ == "__main__":
# 检查一个方法是否是协程函数
print(inspect.iscoroutinefunction(async_function)) # 输出: True
print(inspect.iscoroutinefunction(normal_function)) # 输出: False
1.1 协程函数和协程对象
- 一个协程方法, 只写方法名称是一个协程函数
- 一个协程方法, 写方法名称和括号是一个协程对象,可以通过await 包装为一个任务进行调用
import asyncio
async def my_coroutine():
print("协程函数开始 started")
await asyncio.sleep(1) # 模拟异步 I/O 操作
print("协程函数结束 finished")
# 协程函数 my_coroutine
# 协程对象 my_coroutine_obj = my_coroutine() my_coroutine_obj是协程对象
# 调用协程函数返回协程对象,不会执行
if __name__ == '__main__':
my_coroutine_obj = my_coroutine()
print(my_coroutine_obj) # 协程对象
asyncio.run(my_coroutine_obj) # 执行协程对象
# asyncio.run(my_coroutine()) # 执行协程对象
1.2 任务 (Task)
任务是协程的进一步封装,用于调度协程的执行。
作用
● 任务是对协程的包装,将协程加入事件循环的调度
● 任务是 Future 的子类
● 创建任务后会自动调度执行
协程(coroutine)转化为任务(task)才能让事件循环调用
1.3 事件循环 (Event Loop)
事件循环是异步编程的核心,负责调度和执行协程/任务。
事件循环同一事件只能执行一个任务
作用
● 管理任务的执行
● 处理IO操作和系统事件
● 提供定时器功能
import asyncio
loop = asyncio.get_event_loop()
1.4 三者的关系
- 协程:基本的可暂停/恢复的函数
- 任务:被事件循环调度的协程
- 事件循环:管理和执行所有任务的引擎
事件循环
├── 任务1 (包装协程A)
├── 任务2 (包装协程B)
└── 任务3 (包装协程C)
2. await的作用
await 会让出当前的控制权,暂停当前线程,
完成await之后的任务才能继续向下执行,
1.定义协程函数
2.包装协程为任务
3.建立事件循环
await 协程对象 --> 包装成任务
import asyncio
async def my_coroutine():
print("Hello")
await asyncio.sleep(5)
print("World")
async def main():
task1 = asyncio.create_task(my_coroutine())
task2 = asyncio.create_task(my_coroutine())
await task1 # task1, task2几乎同时执行, 因为此时让出控制权,事件循环发现了两个任务, task1, task2
await task2
if __name__ == "__main__":
asyncio.run(main())
输出
Hello
Hello
World
World
3. 将协程包装为任务的不同方法
3.1 手动
(asyncio.create_task(协程对象))
task1 = asyncio.create_task(my_coroutine())
import asyncio
async def my_coroutine():
print("Hello")
await asyncio.sleep(1)
print("World")
async def main():
task1 = asyncio.create_task(my_coroutine())
await task1
if __name__ == "__main__":
asyncio.run(main())
3.2 自动
await 协程对象 --> 包装成任务
import asyncio
async def my_coroutine():
print("Hello")
await asyncio.sleep(1)
print("World")
async def main():
task1 = my_coroutine()
await task1 # 会自动先封装为任务
if __name__ == "__main__":
asyncio.run(main())
3.3. asyncio.gather()
是一个用于并发执行多个异步任务的函数,它会等待所有任务完成,并返回一个包含所有任务结果的列表。
await asyncio.gather(task1, task2, task3) # 关键代码
await asyncio.gather(*[task1, task2, task3]) # 关键代码
具体示例
import inspect
import asyncio
import time
from loguru import logger
async def fake_async_function():
time.sleep(1) # 同步阻塞操作:会挂起当前线程或进程,不会释放资源,导致资源利用率低,程序响应速度变慢。
logger.info(("Hello, World!"))
return "Hello, World!"
async def parallel_async_test():
# 由于fake_async_function 里面是同步耗时, 所以总耗时耗时3秒钟
serial_start_time = time.time()
task1 = asyncio.create_task(fake_async_function())
task2 = asyncio.create_task(fake_async_function())
task3 = asyncio.create_task(fake_async_function())
await asyncio.gather(task1, task2, task3)
cost_time = time.time() - serial_start_time
logger.info(f"并行异步耗时:{cost_time}")
async def main():
start_time = time.time()
await parallel_async_test()
end_time = time.time() - start_time
logger.info(f"耗时:{end_time}")
#
if __name__ == '__main__':
asyncio.run(main()) # True
耗时3秒
3.4 as_compelted([])
是一个用于并发执行多个异步任务的函数,它会返回一个可迭代对象,每次迭代返回一个已经完成的任务。
关键代码 接受一个列表
results = asyncio.as_completed([task1, task2, task3])
import inspect
import asyncio
import time
from loguru import logger
async def fake_async_function():
await asyncio.sleep(1) # 异步
# time.sleep(1) # 同步阻塞操作:会挂起当前线程或进程,不会释放资源,导致资源利用率低,程序响应速度变慢。
logger.info(("Hello, World!"))
return "Hello, World!"
async def parallel_async_test():
# 由于fake_async_function 里面是同步耗时, 所以总耗时耗时3秒钟
serial_start_time = time.time()
task1 = fake_async_function()
task2 = fake_async_function()
task3 = fake_async_function()
# results = await asyncio.gather(task1, task2, task3)
results = asyncio.as_completed([task1, task2, task3])
for result in results:
print(await result)
cost_time = time.time() - serial_start_time
logger.info(f"并行异步耗时:{cost_time}")
async def main():
start_time = time.time()
await parallel_async_test()
end_time = time.time() - start_time
logger.info(f"耗时:{end_time}")
#
if __name__ == '__main__':
asyncio.run(main()) # True
4. 具体样例
4.1 并行异步阻塞 asyncio.sleep(2)
如果是先把他转化为task,然后
- serial_async_test:
第一次调用 await async_function() 时,当前协程会暂停,等待 async_function 完成。
由于没有其他任务在运行,每次调用都是独立的,因此总耗时为 2+2+2=6 秒。 - parallel_async_test:
使用 asyncio.create_task 创建任务时,第一个调用await使用 task1, task2, task3已经被调度到事件循环中,开始并行执行。
await task1、await task2 和 await task3 会依次等待这些任务完成,但由于它们已经并行运行,总耗时为最长任务的时间,即 2 秒。
import inspect
import asyncio
import time
from loguru import logger
async def async_function():
logger.info("异步开始 ...")
await asyncio.sleep(2) # 异步非阻塞操作:允许当前线程或进程在等待时释放资源,让其他任务可以继续执行,提高资源利用率和程序响应速度。
logger.info(("异步结束 ..."))
async def serial_async_test():
# 耗时三秒钟
serial_start_time = time.time()
await async_function()
await async_function()
await async_function()
cost_time = time.time() - serial_start_time
logger.info(f"串行异步耗时:{cost_time}")
async def parallel_async_test():
# 耗时一秒钟
parallel_start_time = time.time()
task1 = asyncio.create_task(async_function())
task2 = asyncio.create_task(async_function())
task3 = asyncio.create_task(async_function())
await task1
await task2
await task3
cost_time = time.time() - parallel_start_time
logger.info(f"并行异步耗时:{cost_time}")
async def main():
start_time = time.time()
await serial_async_test() # 耗时6秒
await parallel_async_test() # 耗时2秒
end_time = time.time() - start_time
logger.info(f"耗时:{end_time}")
# 测试异步方法串行和异步方法并行
if __name__ == '__main__':
asyncio.run(main()) # True
4.2 异步并行同步阻塞 time.sleep(2)
- serial_async_test:
- 每次调用 await async_function() 时,time.sleep(2) 会阻塞整个事件循环 2 秒。
- 由于是串行调用,每次调用都会等待前一个调用完成,因此总耗时为 2+2+2=6 秒。
- parallel_async_test:
- 使用 asyncio.create_task 创建任务时,任务会立即被调度到事件循环中,开始执行。
- 但是,由于 async_function 中使用了 time.sleep(2),每个任务都会阻塞整个事件循环 2 秒。
- 尽管任务是并行启动的,但由于 time.sleep(2) 是同步阻塞操作,任务 1 会阻塞事件循环 2 秒,任务 2 和任务 3 无法在这期间运行。
- 因此,总耗时仍然是 2+2+2=6 秒。
import inspect
import asyncio
import time
from loguru import logger
async def async_function():
logger.info("异步开始 ...")
time.sleep(2) # 同步阻塞操作:会挂起当前线程或进程,不会释放资源,导致资源利用率低,程序响应速度变慢。
logger.info(("异步结束 ..."))
async def serial_async_test():
# 耗时6秒钟
serial_start_time = time.time()
await async_function()
await async_function()
await async_function()
cost_time = time.time() - serial_start_time
logger.info(f"串行异步耗时:{cost_time}")
async def parallel_async_test():
# 耗时6秒钟
parallel_start_time = time.time()
task1 = asyncio.create_task(async_function())
task2 = asyncio.create_task(async_function())
task3 = asyncio.create_task(async_function())
await task1
await task2
await task3
cost_time = time.time() - parallel_start_time
logger.info(f"并行异步耗时:{cost_time}")
async def main():
start_time = time.time()
await serial_async_test() # 耗时6秒
await parallel_async_test() # 耗时2秒
end_time = time.time() - start_time
logger.info(f"耗时:{end_time}")
# 测试异步方法串行和异步方法并行
if __name__ == '__main__':
asyncio.run(main()) # True
4.3 gather 和 complete
- asyncio.gather 是一个用于并发执行多个异步任务的函数,它会等待所有任务完成,并返回一个包含所有任务结果的列表。
- asyncio.as_completed 是一个用于并发执行多个异步任务的函数,它会返回一个可迭代对象,每次迭代返回一个已经完成的任务。
import inspect
import asyncio
import time
from loguru import logger
async def async_function(number):
logger.info("异步函数 开始")
await asyncio.sleep(1) # 异步
# time.sleep(1) # 同步阻塞操作:会挂起当前线程或进程,不会释放资源,导致资源利用率低,程序响应速度变慢。
logger.info(("异步函数 结束"))
return number
async def test_gather():
task1 = async_function(1)
task2 = async_function(2)
gather_start_time = time.time()
gather_result = await asyncio.gather(task1, task2)
gather_cost_time = time.time() - gather_start_time
logger.info(f"gather_reuslt {gather_result}")
logger.info(f"gather并行异步耗时:{gather_cost_time}")
async def test_complete():
task3 = async_function(3)
task4 = async_function(4)
complete_start_time = time.time()
complete_result = asyncio.as_completed([task3, task4])
results = []
for result in complete_result:
results.append(await result)
logger.info(results)
# await asyncio.sleep(2)
complete_cost_time = time.time() - complete_start_time
logger.info(f"compelte并行异步耗时:{complete_cost_time}")
async def main():
start_time = time.time()
await test_gather() # 1 秒 左右
await test_complete() # 1 秒 左右
end_time = time.time() - start_time
logger.info(f"耗时:{end_time}")
# 将协程转化为任务进行 调用
if __name__ == '__main__':
asyncio.run(main()) # True
4.4 asyncio.to_thread (让同步函数并行)
asyncio.to_thread 的作用:将同步函数的调用委托给线程池,避免阻塞事件循环。
将阻塞型同步函数的调用委托给线程池,从而实现并行执行,而不会阻塞事件循环。‘
import inspect
import asyncio
import time
from loguru import logger
def sync_function(number):
logger.info("同步方法 开始")
time.sleep(1)
logger.info(("同步方法 结束"))
return number
async def serial_async_test():
# 串行异步耗时
serial_start_time = time.time()
await asyncio.to_thread(sync_function, 1)
await asyncio.to_thread(sync_function, 2)
await asyncio.to_thread(sync_function, 3)
cost_time = time.time() - serial_start_time
logger.info(f"并行异步耗时:{cost_time}")
async def parallel_async_test():
# 并行异步耗时
serial_start_time = time.time()
task1 = asyncio.to_thread(sync_function, 1)
task2 = asyncio.to_thread(sync_function, 2)
task3 = asyncio.to_thread(sync_function, 3)
await asyncio.gather(task1, task2, task3)
cost_time = time.time() - serial_start_time
logger.info(f"并行异步耗时:{cost_time}")
async def main():
start_time = time.time()
await serial_async_test()
await parallel_async_test()
end_time = time.time() - start_time
logger.info(f"耗时:{end_time}")
# 将协程转化为任务进行 调用
if __name__ == '__main__':
asyncio.run(main()) # True
4.5 异步Semaphore信号量
import asyncio
import time
import inspect
from loguru import logger
async def test_async_func(name):
print(f"Worker {name} 开始")
await asyncio.sleep(1) # 模拟耗时操作
print(f"Worker {name} 结束")
async def worker(name):
# 创建一个信号量,限制同时运行的协程数量
semaphore = asyncio.Semaphore(20) # 最多允许 3 个协程同时运行
async def task_wrapper(query):
async with semaphore: # 使用 async with 来获取信号量
return await test_async_func(query)
start_time = time.time()
tasks = [task_wrapper(i) for i in range(100)] # 创建 10 个协程
await asyncio.gather(*tasks) # 等待所有协程完成
cost_time = time.time() - start_time
logger.info(f"worker方法耗时 cost time: {cost_time:.2f}S")
async def main():
await worker(1)
# Semaphore 是用来限制并发数量, 如果不适用这个参数,那么默认是并发数 gather中的人数数量(没有限制并发数)
if __name__ == "__main__":
# 运行主函数
asyncio.run(main())
5. 并发
5.1 多线程
import threading
import time
from loguru import logger
def worker_function(number, results, lock):
logger.info(f"Worker {number} 开始")
time.sleep(2) # 模拟耗时操作
logger.info(f"Worker {number} 结束")
with lock: # 确保线程安全
results.append(number)
def main():
start_time = time.time()
threads = []
results = []
lock = threading.Lock() # 创建一个锁
for i in range(5):
# target:指定线程启动时要执行的函数。
# args:传递给 target 函数的参数,必须是元组。
thread = threading.Thread(target=worker_function, args=(i, results, lock))
threads.append(thread)
thread.start() # 启动线程,使其开始执行目标函数。
for thread in threads:
thread.join() # 等待线程完成,确保主线程在所有子线程完成后再继续执行。
cost_time = time.time() - start_time
logger.info(f"方法耗时:{cost_time:.2f}s")
logger.info(f"返回值 :{results}")
if __name__ == "__main__":
main()
5.2 多进程
import threading
import time
from loguru import logger
def worker_function(number, results, lock):
logger.info(f"Worker {number} 开始")
time.sleep(2) # 模拟耗时操作
logger.info(f"Worker {number} 结束")
with lock: # 确保线程安全
results.append(number)
def main():
start_time = time.time()
threads = []
results = []
lock = threading.Lock() # 创建一个锁
for i in range(5):
# target:指定线程启动时要执行的函数。
# args:传递给 target 函数的参数,必须是元组。
thread = threading.Thread(target=worker_function, args=(i, results, lock))
threads.append(thread)
thread.start() # 启动线程,使其开始执行目标函数。
for thread in threads:
thread.join() # 等待线程完成,确保主线程在所有子线程完成后再继续执行。
cost_time = time.time() - start_time
logger.info(f"方法耗时:{cost_time:.2f}s")
logger.info(f"返回值 :{results}")
if __name__ == "__main__":
main()
5.3 异步编程
import time
import asyncio
from loguru import logger
def worker(number):
logger.info(f"Worker {number} 开始")
time.sleep(2) # 模拟耗时操作
logger.info(f"Worker {number} 结束")
return number
async def main():
start_time = time.time()
tasks = [worker(i) for i in range(5)]
await asyncio.gather(*tasks)
cost_time = time.time() - start_time
logger.info(f"方法耗时 {cost_time:.2f}s")
if __name__ == "__main__":
asyncio.run(main())
5.4 线程池
import concurrent.futures
import time
from loguru import logger
def worker(number):
logger.info(f"Worker {number} 开始")
time.sleep(2) # 模拟耗时操作
logger.info(f"Worker {number} 结束")
return number
def main():
start_time = time.time()
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
futures = [executor.submit(worker, i) for i in range(5)]
multi_result = []
for future in concurrent.futures.as_completed(futures):
multi_result.append(future.result())
cost_time = time.time() - start_time
logger.info(f"结果: {multi_result}")
logger.info(f"方法耗时 {cost_time:.2f}s")
if __name__ == "__main__":
main()
5.5 进程池
import concurrent.futures
import time
from loguru import logger
def worker(number):
logger.info(f"Worker {number} 开始")
time.sleep(2) # 模拟耗时操作
logger.info(f"Worker {number} 结束")
return number
def main():
start_time = time.time()
with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
multi_result = []
futures = [executor.submit(worker, i) for i in range(4)]
for future in concurrent.futures.as_completed(futures):
multi_result.append(future.result())
cost_time = time.time() - start_time
logger.info(f"结果: {multi_result}")
logger.info(f"方法耗时 {cost_time:.2f}s")
if __name__ == "__main__":
main()
更多推荐
所有评论(0)