Python 异步并发获取天气数据:从requests到aiohttp实践
目标读者:熟悉 Python 基础,刚接触
async/await,想通过实际例子理解异步网络请求。
你能学到:
- 异步编程的核心组件如何协同工作
- 如何用两种方式并发获取多个城市天气
- 为什么 aiohttp 比 requests + 线程池 更高效
- 可直接运行的完整示例代码
- 明确
loop.run_in_executor是 “异步 ↔ 同步”的桥梁
1. 背景
1.1. 为什么需要异步?
当我们需要同时获取多个城市的天气数据(比如 10 个城市),如果用传统的同步方式(如 requests.get() 一个接一个调用),每个请求都要等上一次完成才能开始下一次。假设每个请求耗时 500ms,10 个就是 5 秒。
而异步并发可以在等待某个请求响应的同时,发起其他请求,从而显著提升效率(理想情况下接近单次请求的时间,如 500ms)。
1.2. “桥接”是什么意思?
在异步程序中,我们希望不阻塞事件循环地执行 I/O 操作(如网络请求)。但很多经典库(如 requests、time.sleep、数据库驱动)是同步阻塞的——它们一旦调用,就会“卡住”当前线程,直到完成。
这就产生了一个问题:
❓ 如何在异步环境中安全地使用同步代码?
解决方案是:异步-同步桥接(Async-Sync Bridging)。
而 Python 标准库提供的官方桥梁就是:
✅ loop.run_in_executor()
- 它把一个同步函数提交到**线程池(或进程池)**中执行;
- 返回一个可
await的协程对象; - 主事件循环继续运行,不会被阻塞;
- 当线程中的任务完成,结果自动返回给异步上下文。
🌉 因此,
run_in_executor是连接“异步世界”和“同步世界”的关键桥梁。
1.3. 准备:使用真实可用的天气 API
我们选用 Open-Meteo —— 免费、无需 Key、支持全球坐标。
内置城市坐标映射:
CITY_COORDS = {
"Beijing": (39.9042, 116.4074),
"Harbin": (45.8038, 126.5340),
"Shanghai": (31.2304, 121.4737),
"Urumqi": (43.8256, 87.6168),
"Sanya": (18.2528, 109.5119),
"Qingdao": (36.0671, 120.3826),
"Dalian": (38.9140, 121.6147),
"Mohe": (52.9771, 122.5336),
"Xiamen": (24.4798, 118.0894),
"Chengdu": (30.5728, 104.0668),
"Dali": (25.6808, 100.2348),
}
API 示例:
GET https://api.open-meteo.com/v1/forecast?latitude=39.9042&longitude=116.4074¤t_weather=true
返回:
{ "current_weather": { "temperature": 5.2 } }
2. 异步核心组件协作流程图
💡 图解说明:
- 左侧路径(
aiohttp):全程在事件循环内,无额外线程,高效。- 右侧路径(
requests):通过run_in_executor桥接到线程池,避免阻塞主循环。
3. 方案一:通过 run_in_executor 桥接同步 requests
🌉 桥接原理回顾
requests.get()是同步阻塞函数,不能直接在async def中调用。loop.run_in_executor(None, requests.get, url)将其交给线程池执行。- 主事件循环继续调度其他协程,实现“伪并发”。
🔧 完整代码
# weather_with_bridge.py
import asyncio
import requests
CITY_COORDS = { ... } # 同上
def fetch_weather_sync(city):
"""同步函数:只能在线程中安全运行"""
lat, lon = CITY_COORDS[city]
url = f"https://api.open-meteo.com/v1/forecast?latitude={lat}&longitude={lon}¤t_weather=true"
try:
resp = requests.get(url, timeout=10)
temp = resp.json()["current_weather"]["temperature"]
return city, f"{temp}°C"
except Exception as e:
return city, f"Error: {e}"
async def main():
cities = list(CITY_COORDS.keys())
loop = asyncio.get_running_loop()
# 🌉 关键:通过 run_in_executor 桥接同步函数
tasks = [
loop.run_in_executor(None, fetch_weather_sync, city)
for city in cities
]
results = await asyncio.gather(*tasks)
for city, weather in results:
print(f"{city}: {weather}")
if __name__ == "__main__":
import time
start = time.time()
asyncio.run(main())
print(f"\n⏱️ Total time: {time.time() - start:.2f}s")
✅ 这就是“异步调用同步代码”的标准做法。
4、方案二:原生异步 aiohttp(无需桥接)
4.1. 🚫 为什么不需要桥接?
因为 aiohttp 从底层就基于 asyncio 设计:
- 使用非阻塞 socket;
- 通过事件循环监听 I/O 事件;
- 所有操作天然支持
await。
4.2. aiohttp介绍
aiohttp 是一个基于 Python 的 异步 HTTP 客户端/服务器框架,专为高并发 I/O 密集型场景设计,尤其适合与 asyncio 事件循环配合使用。
原生异步支持
- 基于
async/await语法,单线程即可处理数千并发连接(通过非阻塞 I/O 和事件循环)。 - 避免传统多线程/多进程模型的资源消耗和上下文切换开销。
HTTP 客户端与服务器二合一
- 客户端:支持发送 HTTP 请求(GET/POST/PUT/DELETE 等),支持 WebSocket、SSL/TLS、连接池等。
- 服务器:可快速构建异步 HTTP 服务(如 REST API),性能优于传统框架(如 Flask + Gevent)。
高性能连接管理
- 内置连接池(
TCPConnector),复用 TCP 连接减少握手开销。 - 支持 HTTP/1.1 和 HTTP/2 协议。
丰富的功能扩展
- 中间件(Middleware):拦截请求/响应,实现日志、认证、限流等。
- 信号(Signals):通过事件触发自定义逻辑(如请求完成时通知)。
- 流式上传/下载:处理大文件或实时数据流。
典型使用场景
- 高并发网络爬虫
- 单线程爬取数千网页,避免线程爆炸。
- 微服务间调用
- 异步调用其他服务的 REST API,减少等待时间。
- 实时数据推送
- 通过 WebSocket 实现实时消息推送(如聊天应用)。
- API 代理层
- 构建异步 API 网关,转发请求到多个后端服务。
4.3. 🔧 完整代码
安装aiohttp
pip install aiohttpInstalling collected packages: propcache, multidict, frozenlist, async-timeout, aiohappyeyeballs, yarl, aiosignal, aiohttp
发送 GET 请求案例代码
# weather_native_async.py
import asyncio
import aiohttp
CITY_COORDS = { ... }
async def fetch_weather_async(session, city):
lat, lon = CITY_COORDS[city]
url = f"https://api.open-meteo.com/v1/forecast?latitude={lat}&longitude={lon}¤t_weather=true"
try:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as resp:
data = await resp.json()
temp = data["current_weather"]["temperature"]
return city, f"{temp}°C"
except Exception as e:
return city, f"Error: {e}"
async def main():
cities = list(CITY_COORDS.keys())
async with aiohttp.ClientSession() as session:
tasks = [fetch_weather_async(session, city) for city in cities]
results = await asyncio.gather(*tasks)
for city, weather in results:
print(f"{city}: {weather}")
if __name__ == "__main__":
import time
start = time.time()
asyncio.run(main())
print(f"\n⏱️ Total time: {time.time() - start:.2f}s")
运行结果
Beijing: 1.5°C
Harbin: -16.4°C
Shanghai: 13.9°C
Urumqi: -7.6°C
Sanya: 26.7°C
Qingdao: 2.2°C
Dalian: -5.8°C
Mohe: -23.4°C
Xiamen: 22.6°C
Chengdu: 14.9°C
Dali: 13.2°C
⏱️ Total time: 1.81s
**发送 POST 请求(带 JSON 数据)
async def post_data():
url = "https://api.example.com/submit"
payload = {"key": "value"}
async with aiohttp.ClientSession() as session:
async with session.post(url, json=payload) as response:
print("响应状态:", response.status)
print("响应文本:", await response.text())
asyncio.run(post_data())
设置超时与重试
from aiohttp import ClientTimeout, ClientError
import backoff
@backoff.on_exception(backoff.expo, ClientError, max_tries=3)
async def fetch_with_retry():
timeout = ClientTimeout(total=5) # 总超时 5 秒
try:
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.get("https://api.example.com/data") as response:
return await response.json()
except asyncio.TimeoutError:
print("请求超时,重试中...")
raise
asyncio.run(fetch_with_retry())
5. 对比总结:桥接 vs 原生异步
| 维度 | run_in_executor + requests |
aiohttp |
|---|---|---|
| 是否需要桥接 | ✅ 是(同步 → 异步) | ❌ 否(原生异步) |
| 线程使用 | 多线程(线程池) | 单线程 |
| 资源开销 | 较高(线程上下文切换) | 极低 |
| 代码侵入性 | 低(复用旧代码) | 中(需学新 API) |
| 并发上限 | 受限于线程池大小 | 可轻松支持数千并发 |
| 适用阶段 | 迁移期、临时方案 | 新项目、生产环境 |
| 用时 | 2.82s | 1.81s |
🎯 记住:
- 桥接是权宜之计,用于兼容旧代码;
- 原生异步是未来方向,更高效、更 Pythonic。
6. 给初学者的行动建议
- 理解“桥接”概念:
run_in_executor是异步世界调用同步世界的“安全通道”。 - 不要滥用桥接:能用
aiohttp、aiomysql、httpx(支持 async)就别用requests。 - 永远不要在 async 函数里直接调用阻塞函数(如
time.sleep,requests.get)。 - 先理解
async/await基础:协程不是多线程,而是协作式调度。
更多推荐



所有评论(0)