Python 并发编程(三)对比(multiprocessing, threading, concurrent.futures, asyncio)
1. 对比Python 3.2 引入了concurrent.futures。3.4版本引入了asyncio到标准库, python3.5以后使用async/await语法。库Class/Method并发类型适用任务multiprocessingPool并行cpu 密集型concurrent.futuresProcessPoolExecutor并行cpu 密集型threadingThread并
1. 对比
Python 3.2 引入了concurrent.futures。
3.4版本引入了asyncio到标准库, python3.5以后使用async/await语法。
库 | Class/Method | 并发类型 | 适用任务 |
---|---|---|---|
multiprocessing | Pool | 并行 | cpu 密集型 |
concurrent.futures | ProcessPoolExecutor | 并行 | cpu 密集型 |
threading | Thread | 并发 | I/O 密集型 |
concurrent.futures | ThreadPoolExecutor | 并发 | I/O 密集型 |
asyncio | gather | 并发(coroutines) | I/O 密集型 |
concurrent.futures 提供了一个更加简单的接口来处理多线程和多进程编程,降低了使用的复杂度。
本文通过一些代码样例来展示不同的库的使用方法。
2. IO密集型任务(threading, ThreadPoolExecutor,asyncio)
2.1 同步方式
# io_bound_sync.py
import time
import requests
def fetch(n):
return requests.get('http://httpbin.org/get', params={'number': n})
def main():
for num in range(100):
fetch(num)
start_time = time.perf_counter()
main()
end_time = time.perf_counter()
print(f"Elapsed run time: {end_time - start_time} seconds.")
执行文件python io_bound_sync.py 返回, 耗时 68秒
Elapsed run time: 68.3351878 seconds.
2.2 threading 示例
# io_bound_threading.py
import time
from threading import Thread
import requests
def fetch(n):
return requests.get('http://httpbin.org/get', params={'number': n})
def main():
tasks = [Thread(target=fetch, args=(num,)) for num in range(20)]
for task in tasks:
task.start()
for task in tasks:
task.join()
start_time = time.perf_counter()
main()
end_time = time.perf_counter()
print(f"Elapsed run time: {end_time - start_time} seconds.")
执行文件python io_bound_threading.py 返回, 速度提升了将近40倍
Elapsed run time: 1.8064501000000002 seconds.
2.3 concurrent.futures 示例
# io_bound_threadpool.py
import time
from concurrent.futures import ThreadPoolExecutor, wait
import requests
def fetch(n):
return requests.get('http://httpbin.org/get', params={'number': n})
def main():
futures = []
with ThreadPoolExecutor() as executor:
for num in range(20):
futures.append(executor.submit(fetch, num))
wait(futures)
start_time = time.perf_counter()
main()
end_time = time.perf_counter()
print(f"Elapsed run time: {end_time - start_time} seconds.")
执行文件 io_bound_threadpool.py 返回
Elapsed run time: 6.0246413 seconds.
concurrent.futures.ThreadPoolExecutor 实际上是对threading库的抽象,使得它更易于使用。在前面的示例中,我们将每个请求分配给一个线程,总共使用了 100 个线程。但ThreadPoolExecutor 默认工作线程数为min(32, os.cpu_count() + 4).,所以可以看到执行时间比threading 版本长了一点。
可以修改 with ThreadPoolExecutor(max_workers=100) as executor: 指定线程数。再次运行结果如下:
Elapsed run time: 1.038872 seconds.
ThreadPoolExecutor 的存在是为了简化实现多线程的过程。如果需要对多线程进行更多控制,还是要用 threading 库。
2.4 asyncio 示例
# io_bound_asyncio.py
import time
import asyncio
import httpx
async def fetch(client, n):
await client.get("https://httpbin.org/get", params={'number': n})
async def main():
async with httpx.AsyncClient() as client:
await asyncio.gather(
*[fetch(client, num) for num in range(20)]
)
start_time = time.perf_counter()
asyncio.get_event_loop().run_until_complete(main())
end_time = time.perf_counter()
print(f"Elapsed run time: {end_time - start_time} seconds.")
执行文件 io_bound_asyncio.py 返回
Elapsed run time: 1.6809711999999999 seconds.
asyncio 比threading 方法更快,因为threading 使用了 OS(操作系统)线程,所以线程由操作系统调度,其中线程切换被操作系统抢占。asyncio 使用由 Python 解释器定义的协程。程序决定何时以最佳方式切换任务。这是由asyncio 中的 event_loop 来处理的。
3. CPU密集型任务(multiprocessing, ProcessPoolExecutor)
3.1 同步任务
import time
def sum(count):
ret = 0
for n in range(count):
ret += n
return ret
def main():
for num in range(1000, 16000):
sum(num)
start_time = time.perf_counter()
main()
end_time = time.perf_counter()
print(f"Elapsed run time: {end_time - start_time} seconds.")
运行结果
Elapsed run time: 5.0112078 seconds.
3.2 multiprocessing 示例
import time
from multiprocessing import Pool, cpu_count
def sum(count):
ret = 0
for n in range(count):
ret += n
return ret
def main():
with Pool(cpu_count() - 1) as p:
p.starmap(sum, zip(range(1000, 16000)))
p.close()
p.join()
start_time = time.perf_counter()
main()
end_time = time.perf_counter()
print(f"Elapsed run time: {end_time - start_time} seconds.")
运行结果
Elapsed run time: 2.0392013 seconds.
3.3 concurrent.futures 示例
import time
from concurrent.futures import ProcessPoolExecutor, wait
from multiprocessing import cpu_count
def sum(count):
ret = 0
for n in range(count):
ret += n
return ret
def main():
futures = []
with ProcessPoolExecutor(cpu_count() - 1) as executor:
for num in range(1000, 16000):
futures.append(executor.submit(sum, num))
wait(futures)
start_time = time.perf_counter()
main()
end_time = time.perf_counter()
print(f"Elapsed run time: {end_time - start_time} seconds.")
运行结果
Elapsed run time: 8.558755399999999 seconds.
比单进程还慢?, 没搞明白为什么。
4. 结论
concurrent.futures 更加易用
日常编码中
concurrent.futures > [ threading , multiprocessing]
asyncio > threading
更多推荐
所有评论(0)