异步

参考视频;
https://www.bilibili.com/video/BV157mFYEEkH
https://www.bilibili.com/video/BV1oa411b7c9

异步编程主要关注的是并发(Concurrency),而不是并行。它通过单线程中的事件循环来管理多个任务,这些任务在等待 I/O 操作时可以暂停执行,让其他任务继续运行。虽然异步编程可以提高程序的响应能力和资源利用率,但它本身并不实现真正的并行。

  1. 并发而非并行:
    • 异步编程通过事件循环在单线程中管理多个任务,这些任务在等待 I/O 操作时可以暂停执行,让其他任务继续运行。
  2. 适用于 I/O 密集型任务:
    • 对于 I/O 密集型任务,如网络请求、文件读写等,异步编程可以显著提高程序的性能,因为这些任务通常涉及大量的等待时间。
  3. 单线程:
    • 异步编程通常在单线程中运行,避免了多线程编程中的复杂同步问题。

1. 协程 (Coroutine)

协程是 async/await 的核心概念,是一种可以暂停和恢复执行的函数。
协程是异步编程的基本单元,它是一个特殊的函数,可以通过 async 关键字定义。协程可以暂停和恢复执行,这使得它可以在等待 I/O 操作时让出控制权,让其他任务运行。
概念
● 使用 async def 定义的函数就是协程函数
● 调用协程函数不会立即执行,而是返回一个协程对象
● 协程需要通过事件循环或 await 来驱动执行

# iscoroutinefunction 检查一个方法是否是协程函数
import inspect

async def async_function():
    return "Hello, World!"

def normal_function():
    return "Hello, World!"

if __name__ == "__main__":
    # 检查一个方法是否是协程函数
    print(inspect.iscoroutinefunction(async_function))  # 输出: True
    print(inspect.iscoroutinefunction(normal_function))  # 输出: False

1.1 协程函数和协程对象

  1. 一个协程方法, 只写方法名称是一个协程函数
  2. 一个协程方法, 写方法名称和括号是一个协程对象,可以通过await 包装为一个任务进行调用
import asyncio

async def my_coroutine():
    print("协程函数开始 started")
    await asyncio.sleep(1)  # 模拟异步 I/O 操作
    print("协程函数结束 finished")

# 协程函数  my_coroutine
# 协程对象  my_coroutine_obj = my_coroutine()  my_coroutine_obj是协程对象

# 调用协程函数返回协程对象,不会执行
if __name__ == '__main__':
    my_coroutine_obj = my_coroutine()       
    print(my_coroutine_obj)                    # 协程对象
    asyncio.run(my_coroutine_obj)                # 执行协程对象
    # asyncio.run(my_coroutine())                # 执行协程对象

1.2 任务 (Task)

任务是协程的进一步封装,用于调度协程的执行。
作用
● 任务是对协程的包装,将协程加入事件循环的调度
● 任务是 Future 的子类
● 创建任务后会自动调度执行
在这里插入图片描述
协程(coroutine)转化为任务(task)才能让事件循环调用

1.3 事件循环 (Event Loop)

在这里插入图片描述

事件循环是异步编程的核心,负责调度和执行协程/任务。
事件循环同一事件只能执行一个任务
作用
● 管理任务的执行
● 处理IO操作和系统事件
● 提供定时器功能
在这里插入图片描述

import asyncio

loop = asyncio.get_event_loop()

1.4 三者的关系

  1. 协程:基本的可暂停/恢复的函数
  2. 任务:被事件循环调度的协程
  3. 事件循环:管理和执行所有任务的引擎
    事件循环
    ├── 任务1 (包装协程A)
    ├── 任务2 (包装协程B)
    └── 任务3 (包装协程C)

2. await的作用

await 会让出当前的控制权,暂停当前线程,
完成await之后的任务才能继续向下执行,

1.定义协程函数
2.包装协程为任务
3.建立事件循环
await 协程对象 --> 包装成任务

import asyncio
async def my_coroutine():
    print("Hello")
    await asyncio.sleep(5)
    print("World")

async def main():
    task1 = asyncio.create_task(my_coroutine())
    task2 = asyncio.create_task(my_coroutine())
    await task1				# task1, task2几乎同时执行, 因为此时让出控制权,事件循环发现了两个任务, task1, task2
    await task2			
   
if __name__ == "__main__":
    asyncio.run(main())		

输出

Hello
Hello
World
World

3. 将协程包装为任务的不同方法

3.1 手动

(asyncio.create_task(协程对象))

task1 = asyncio.create_task(my_coroutine())
import asyncio
async def my_coroutine():
    print("Hello")
    await asyncio.sleep(1)
    print("World")

async def main():
    task1 = asyncio.create_task(my_coroutine())
    await task1

if __name__ == "__main__":
    asyncio.run(main())

3.2 自动

await 协程对象 --> 包装成任务

import asyncio
async def my_coroutine():
    print("Hello")
    await asyncio.sleep(1)
    print("World")

async def main():
    task1 = my_coroutine()
    await task1 		# 会自动先封装为任务

if __name__ == "__main__":
    asyncio.run(main())

3.3. asyncio.gather()

是一个用于并发执行多个异步任务的函数,它会等待所有任务完成,并返回一个包含所有任务结果的列表。

await asyncio.gather(task1, task2, task3)		# 关键代码
await asyncio.gather(*[task1, task2, task3])	# 关键代码

具体示例

import inspect
import asyncio
import time
from loguru import logger

async def fake_async_function():
    time.sleep(1)   # 同步阻塞操作:会挂起当前线程或进程,不会释放资源,导致资源利用率低,程序响应速度变慢。
    logger.info(("Hello, World!"))  
    return "Hello, World!"  

async def parallel_async_test():
    # 由于fake_async_function 里面是同步耗时, 所以总耗时耗时3秒钟
    serial_start_time = time.time()
    task1 = asyncio.create_task(fake_async_function())
    task2 = asyncio.create_task(fake_async_function())
    task3 = asyncio.create_task(fake_async_function())
    await asyncio.gather(task1, task2, task3)
    cost_time = time.time() - serial_start_time
    logger.info(f"并行异步耗时:{cost_time}") 

async def main():
    
    start_time = time.time()
    await parallel_async_test()
    end_time = time.time() - start_time
    logger.info(f"耗时:{end_time}")    

# 
if __name__ == '__main__':
    asyncio.run(main())  # True

耗时3秒

3.4 as_compelted([])

是一个用于并发执行多个异步任务的函数,它会返回一个可迭代对象,每次迭代返回一个已经完成的任务。
关键代码 接受一个列表

results = asyncio.as_completed([task1, task2, task3])  
import inspect
import asyncio
import time
from loguru import logger

async def fake_async_function():
    await asyncio.sleep(1)   # 异步
    # time.sleep(1)   # 同步阻塞操作:会挂起当前线程或进程,不会释放资源,导致资源利用率低,程序响应速度变慢。
    logger.info(("Hello, World!"))  
    return "Hello, World!"  

async def parallel_async_test():
    # 由于fake_async_function 里面是同步耗时, 所以总耗时耗时3秒钟
    
    serial_start_time = time.time()
    
    task1 = fake_async_function() 
    task2 = fake_async_function() 
    task3 = fake_async_function()
    # results = await asyncio.gather(task1, task2, task3)
    results = asyncio.as_completed([task1, task2, task3])
    for result in results:
        print(await result)
    cost_time = time.time() - serial_start_time
    logger.info(f"并行异步耗时:{cost_time}") 

async def main():
    
    start_time = time.time()
    await parallel_async_test()
    end_time = time.time() - start_time
    logger.info(f"耗时:{end_time}")    

# 
if __name__ == '__main__':
    asyncio.run(main())  # True

4. 具体样例

4.1 并行异步阻塞 asyncio.sleep(2)

如果是先把他转化为task,然后

  • serial_async_test:
    第一次调用 await async_function() 时,当前协程会暂停,等待 async_function 完成。
    由于没有其他任务在运行,每次调用都是独立的,因此总耗时为 2+2+2=6 秒。
  • parallel_async_test:
    使用 asyncio.create_task 创建任务时,第一个调用await使用 task1, task2, task3已经被调度到事件循环中,开始并行执行。
    await task1、await task2 和 await task3 会依次等待这些任务完成,但由于它们已经并行运行,总耗时为最长任务的时间,即 2 秒。
import inspect
import asyncio
import time
from loguru import logger

async def async_function():
    logger.info("异步开始 ...")
    await asyncio.sleep(2)              # 异步非阻塞操作:允许当前线程或进程在等待时释放资源,让其他任务可以继续执行,提高资源利用率和程序响应速度。
    logger.info(("异步结束 ..."))  

async def serial_async_test():
    # 耗时三秒钟
    serial_start_time = time.time()
    await async_function()
    await async_function()
    await async_function()
    cost_time = time.time() - serial_start_time
    logger.info(f"串行异步耗时:{cost_time}")   

async def parallel_async_test():
    # 耗时一秒钟
    parallel_start_time = time.time()
    task1 = asyncio.create_task(async_function())
    task2 = asyncio.create_task(async_function())
    task3 = asyncio.create_task(async_function())
    await task1
    await task2
    await task3
    cost_time = time.time() - parallel_start_time
    logger.info(f"并行异步耗时:{cost_time}") 

async def main():
    
    start_time = time.time()
    await serial_async_test()             # 耗时6秒
    await parallel_async_test()           # 耗时2秒
    end_time = time.time() - start_time
    logger.info(f"耗时:{end_time}")    

# 测试异步方法串行和异步方法并行
if __name__ == '__main__':
    asyncio.run(main())  # True

4.2 异步并行同步阻塞 time.sleep(2)

  • serial_async_test:
    • 每次调用 await async_function() 时,time.sleep(2) 会阻塞整个事件循环 2 秒。
    • 由于是串行调用,每次调用都会等待前一个调用完成,因此总耗时为 2+2+2=6 秒。
  • parallel_async_test:
    • 使用 asyncio.create_task 创建任务时,任务会立即被调度到事件循环中,开始执行。
    • 但是,由于 async_function 中使用了 time.sleep(2),每个任务都会阻塞整个事件循环 2 秒。
    • 尽管任务是并行启动的,但由于 time.sleep(2) 是同步阻塞操作,任务 1 会阻塞事件循环 2 秒,任务 2 和任务 3 无法在这期间运行。
    • 因此,总耗时仍然是 2+2+2=6 秒。
import inspect
import asyncio
import time
from loguru import logger

async def async_function():
    logger.info("异步开始 ...")
    time.sleep(2)              # 同步阻塞操作:会挂起当前线程或进程,不会释放资源,导致资源利用率低,程序响应速度变慢。
    logger.info(("异步结束 ..."))  

async def serial_async_test():
    # 耗时6秒钟
    serial_start_time = time.time()
    await async_function()
    await async_function()
    await async_function()
    cost_time = time.time() - serial_start_time
    logger.info(f"串行异步耗时:{cost_time}")   

async def parallel_async_test():
    # 耗时6秒钟
    parallel_start_time = time.time()
    task1 = asyncio.create_task(async_function())
    task2 = asyncio.create_task(async_function())
    task3 = asyncio.create_task(async_function())
    await task1
    await task2
    await task3
    cost_time = time.time() - parallel_start_time
    logger.info(f"并行异步耗时:{cost_time}") 

async def main():
    
    start_time = time.time()
    await serial_async_test()             # 耗时6秒
    await parallel_async_test()           # 耗时2秒
    end_time = time.time() - start_time
    logger.info(f"耗时:{end_time}")    

# 测试异步方法串行和异步方法并行
if __name__ == '__main__':
    asyncio.run(main())  # True

4.3 gather 和 complete

  • asyncio.gather 是一个用于并发执行多个异步任务的函数,它会等待所有任务完成,并返回一个包含所有任务结果的列表。
  • asyncio.as_completed 是一个用于并发执行多个异步任务的函数,它会返回一个可迭代对象,每次迭代返回一个已经完成的任务。
import inspect
import asyncio
import time
from loguru import logger

async def async_function(number):
    logger.info("异步函数 开始")
    await asyncio.sleep(1)   # 异步
    # time.sleep(1)   # 同步阻塞操作:会挂起当前线程或进程,不会释放资源,导致资源利用率低,程序响应速度变慢。
    logger.info(("异步函数 结束"))  
    return number



async def test_gather():

    task1 = async_function(1)
    task2 = async_function(2)

    gather_start_time = time.time()
    gather_result = await asyncio.gather(task1, task2)
    gather_cost_time = time.time() - gather_start_time
    logger.info(f"gather_reuslt {gather_result}")
    logger.info(f"gather并行异步耗时:{gather_cost_time}") 

async def test_complete():

    task3 = async_function(3)
    task4 = async_function(4)

    complete_start_time = time.time()
    complete_result = asyncio.as_completed([task3, task4])
    results = []
    for result in complete_result:
        results.append(await result)
    logger.info(results)
    # await asyncio.sleep(2)
    complete_cost_time = time.time() - complete_start_time
    logger.info(f"compelte并行异步耗时:{complete_cost_time}") 

async def main():
    
    start_time = time.time()
    await test_gather()         # 1 秒 左右
    await test_complete()       # 1 秒 左右
    end_time = time.time() - start_time
    logger.info(f"耗时:{end_time}")    

# 将协程转化为任务进行 调用
if __name__ == '__main__':
    asyncio.run(main())  # True

4.4 asyncio.to_thread (让同步函数并行)

asyncio.to_thread 的作用:将同步函数的调用委托给线程池,避免阻塞事件循环。
将阻塞型同步函数的调用委托给线程池,从而实现并行执行,而不会阻塞事件循环。‘

import inspect
import asyncio
import time
from loguru import logger

def sync_function(number):
    logger.info("同步方法 开始")
    time.sleep(1)   
    logger.info(("同步方法 结束"))  
    return number 

async def serial_async_test():
    #  串行异步耗时
    
    serial_start_time = time.time()
    await asyncio.to_thread(sync_function, 1) 
    await asyncio.to_thread(sync_function, 2)  
    await asyncio.to_thread(sync_function, 3) 
    cost_time = time.time() - serial_start_time
    logger.info(f"并行异步耗时:{cost_time}") 

async def parallel_async_test():
    # 并行异步耗时
    serial_start_time = time.time()
    
    task1 = asyncio.to_thread(sync_function, 1) 
    task2 = asyncio.to_thread(sync_function, 2)  
    task3 = asyncio.to_thread(sync_function, 3) 
    await asyncio.gather(task1, task2, task3)
    cost_time = time.time() - serial_start_time
    logger.info(f"并行异步耗时:{cost_time}") 

async def main():
    
    start_time = time.time()
    await serial_async_test()
    await parallel_async_test()
    end_time = time.time() - start_time
    logger.info(f"耗时:{end_time}")    

# 将协程转化为任务进行 调用
if __name__ == '__main__':
    asyncio.run(main())  # True

4.5 异步Semaphore信号量

import asyncio
import time
import inspect
from loguru import logger

async def test_async_func(name):
    print(f"Worker {name} 开始")
    await asyncio.sleep(1)  # 模拟耗时操作
    print(f"Worker {name} 结束")

async def worker(name):
    # 创建一个信号量,限制同时运行的协程数量
    semaphore = asyncio.Semaphore(20)  # 最多允许 3 个协程同时运行
    async def task_wrapper(query):  
        async with semaphore:  # 使用 async with 来获取信号量
            return await test_async_func(query)
    start_time = time.time()
    tasks = [task_wrapper(i) for i in range(100)]  # 创建 10 个协程
    await asyncio.gather(*tasks)  # 等待所有协程完成
    cost_time = time.time() - start_time
    logger.info(f"worker方法耗时 cost time: {cost_time:.2f}S")

async def main():
    await worker(1)

# Semaphore  是用来限制并发数量, 如果不适用这个参数,那么默认是并发数 gather中的人数数量(没有限制并发数)
if __name__ == "__main__":
    # 运行主函数
    asyncio.run(main())

5. 并发

5.1 多线程

import threading
import time
from loguru import logger

def worker_function(number, results, lock):
    logger.info(f"Worker {number} 开始")
    time.sleep(2)  # 模拟耗时操作
    logger.info(f"Worker {number} 结束")
    with lock:  # 确保线程安全
        results.append(number)

def main():
    start_time = time.time()
    threads = []
    results = []
    lock = threading.Lock()  # 创建一个锁
    for i in range(5):
        # target:指定线程启动时要执行的函数。
        # args:传递给 target 函数的参数,必须是元组。
        thread = threading.Thread(target=worker_function, args=(i, results, lock))
        threads.append(thread)
        thread.start()          # 启动线程,使其开始执行目标函数。
    
    for thread in threads:
        thread.join()           # 等待线程完成,确保主线程在所有子线程完成后再继续执行。

    cost_time = time.time() - start_time
    logger.info(f"方法耗时:{cost_time:.2f}s")
    logger.info(f"返回值  :{results}")

if __name__ == "__main__":
    main()

5.2 多进程

import threading
import time
from loguru import logger

def worker_function(number, results, lock):
    logger.info(f"Worker {number} 开始")
    time.sleep(2)  # 模拟耗时操作
    logger.info(f"Worker {number} 结束")
    with lock:  # 确保线程安全
        results.append(number)

def main():
    start_time = time.time()
    threads = []
    results = []
    lock = threading.Lock()  # 创建一个锁
    for i in range(5):
        # target:指定线程启动时要执行的函数。
        # args:传递给 target 函数的参数,必须是元组。
        thread = threading.Thread(target=worker_function, args=(i, results, lock))
        threads.append(thread)
        thread.start()          # 启动线程,使其开始执行目标函数。
    
    for thread in threads:
        thread.join()           # 等待线程完成,确保主线程在所有子线程完成后再继续执行。

    cost_time = time.time() - start_time
    logger.info(f"方法耗时:{cost_time:.2f}s")
    logger.info(f"返回值  :{results}")

if __name__ == "__main__":
    main()

5.3 异步编程

import time
import asyncio
from loguru import logger

def worker(number):
    logger.info(f"Worker {number} 开始")
    time.sleep(2)  # 模拟耗时操作
    logger.info(f"Worker {number} 结束")
    return number

async def main():
    start_time = time.time()
    tasks = [worker(i) for i in range(5)]
    await asyncio.gather(*tasks)
    cost_time = time.time() - start_time
    logger.info(f"方法耗时 {cost_time:.2f}s")
    
if __name__ == "__main__":
    asyncio.run(main())

5.4 线程池

import concurrent.futures
import time
from loguru import logger

def worker(number):
    logger.info(f"Worker {number} 开始")
    time.sleep(2)  # 模拟耗时操作
    logger.info(f"Worker {number} 结束")
    return number

def main():
    start_time = time.time()
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        futures = [executor.submit(worker, i) for i in range(5)]
        multi_result = []
        for future in concurrent.futures.as_completed(futures):
            multi_result.append(future.result())
    cost_time = time.time() - start_time
    logger.info(f"结果: {multi_result}")
    logger.info(f"方法耗时 {cost_time:.2f}s")

if __name__ == "__main__":
    main()

5.5 进程池

import concurrent.futures
import time
from loguru import logger

def worker(number):
    logger.info(f"Worker {number} 开始")
    time.sleep(2)  # 模拟耗时操作
    logger.info(f"Worker {number} 结束")
    return number

def main():
    start_time = time.time()
    with concurrent.futures.ProcessPoolExecutor(max_workers=4) as executor:
        multi_result = []
        futures = [executor.submit(worker, i) for i in range(4)]
        for future in concurrent.futures.as_completed(futures):
            multi_result.append(future.result())
    cost_time = time.time() - start_time
    logger.info(f"结果: {multi_result}")
    logger.info(f"方法耗时 {cost_time:.2f}s")

if __name__ == "__main__":
    main()
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐