目标读者:熟悉 Python 基础,刚接触 async/await,想通过实际例子理解异步网络请求。
你能学到

  • 异步编程的核心组件如何协同工作
  • 如何用两种方式并发获取多个城市天气
  • 为什么 aiohttp 比 requests + 线程池 更高效
  • 可直接运行的完整示例代码
  • 明确 loop.run_in_executor“异步 ↔ 同步”的桥梁

1. 背景

1.1. 为什么需要异步?

当我们需要同时获取多个城市的天气数据(比如 10 个城市),如果用传统的同步方式(如 requests.get() 一个接一个调用),每个请求都要等上一次完成才能开始下一次。假设每个请求耗时 500ms,10 个就是 5 秒。

异步并发可以在等待某个请求响应的同时,发起其他请求,从而显著提升效率(理想情况下接近单次请求的时间,如 500ms)。

1.2. “桥接”是什么意思?

在异步程序中,我们希望不阻塞事件循环地执行 I/O 操作(如网络请求)。但很多经典库(如 requeststime.sleep、数据库驱动)是同步阻塞的——它们一旦调用,就会“卡住”当前线程,直到完成。

这就产生了一个问题:

如何在异步环境中安全地使用同步代码?

解决方案是:异步-同步桥接(Async-Sync Bridging)

而 Python 标准库提供的官方桥梁就是:

loop.run_in_executor()

  • 它把一个同步函数提交到**线程池(或进程池)**中执行;
  • 返回一个可 await 的协程对象;
  • 主事件循环继续运行,不会被阻塞;
  • 当线程中的任务完成,结果自动返回给异步上下文。

🌉 因此,run_in_executor 是连接“异步世界”和“同步世界”的关键桥梁。

1.3. 准备:使用真实可用的天气 API

我们选用 Open-Meteo —— 免费、无需 Key、支持全球坐标。

内置城市坐标映射:

CITY_COORDS = {
    "Beijing": (39.9042, 116.4074),
    "Harbin": (45.8038, 126.5340),
    "Shanghai": (31.2304, 121.4737),
    "Urumqi": (43.8256, 87.6168),
    "Sanya": (18.2528, 109.5119),
    "Qingdao": (36.0671, 120.3826),
    "Dalian": (38.9140, 121.6147),
    "Mohe": (52.9771, 122.5336),
    "Xiamen": (24.4798, 118.0894),
    "Chengdu": (30.5728, 104.0668),
    "Dali": (25.6808, 100.2348),
}

API 示例:

GET https://api.open-meteo.com/v1/forecast?latitude=39.9042&longitude=116.4074&current_weather=true

返回:

{ "current_weather": { "temperature": 5.2 } }

2. 异步核心组件协作流程图

使用原生异步库
使用同步库
asyncio.run(main())
Event Loop 启动
发起网络请求?
aiohttp.ClientSession.get()
loop.run_in_executor()
ThreadPoolExecutor
requests.get() (在线程中运行)
结果返回给 Event Loop
非阻塞 I/O,由 Event Loop 监听 socket
响应到达,触发回调
结果返回给 await 表达式
结果返回给 await 表达式
继续执行后续异步逻辑

💡 图解说明:

  • 左侧路径(aiohttp):全程在事件循环内,无额外线程,高效。
  • 右侧路径(requests):通过 run_in_executor 桥接到线程池,避免阻塞主循环。

3. 方案一:通过 run_in_executor 桥接同步 requests

🌉 桥接原理回顾

  • requests.get() 是同步阻塞函数,不能直接在 async def 中调用。
  • loop.run_in_executor(None, requests.get, url) 将其交给线程池执行。
  • 主事件循环继续调度其他协程,实现“伪并发”。

🔧 完整代码

# weather_with_bridge.py
import asyncio
import requests

CITY_COORDS = { ... }  # 同上

def fetch_weather_sync(city):
    """同步函数:只能在线程中安全运行"""
    lat, lon = CITY_COORDS[city]
    url = f"https://api.open-meteo.com/v1/forecast?latitude={lat}&longitude={lon}&current_weather=true"
    try:
        resp = requests.get(url, timeout=10)
        temp = resp.json()["current_weather"]["temperature"]
        return city, f"{temp}°C"
    except Exception as e:
        return city, f"Error: {e}"

async def main():
    cities = list(CITY_COORDS.keys())
    loop = asyncio.get_running_loop()

    # 🌉 关键:通过 run_in_executor 桥接同步函数
    tasks = [
        loop.run_in_executor(None, fetch_weather_sync, city)
        for city in cities
    ]

    results = await asyncio.gather(*tasks)
    for city, weather in results:
        print(f"{city}: {weather}")

if __name__ == "__main__":
    import time
    start = time.time()
    asyncio.run(main())
    print(f"\n⏱️  Total time: {time.time() - start:.2f}s")

这就是“异步调用同步代码”的标准做法


4、方案二:原生异步 aiohttp(无需桥接)

4.1. 🚫 为什么不需要桥接?

因为 aiohttp 从底层就基于 asyncio 设计:

  • 使用非阻塞 socket;
  • 通过事件循环监听 I/O 事件;
  • 所有操作天然支持 await

4.2. aiohttp介绍

aiohttp 是一个基于 Python 的 异步 HTTP 客户端/服务器框架,专为高并发 I/O 密集型场景设计,尤其适合与 asyncio 事件循环配合使用。

原生异步支持

  • 基于 async/await 语法,单线程即可处理数千并发连接(通过非阻塞 I/O 和事件循环)。
  • 避免传统多线程/多进程模型的资源消耗和上下文切换开销。

HTTP 客户端与服务器二合一

  • 客户端:支持发送 HTTP 请求(GET/POST/PUT/DELETE 等),支持 WebSocket、SSL/TLS、连接池等。
  • 服务器:可快速构建异步 HTTP 服务(如 REST API),性能优于传统框架(如 Flask + Gevent)。

高性能连接管理

  • 内置连接池(TCPConnector),复用 TCP 连接减少握手开销。
  • 支持 HTTP/1.1 和 HTTP/2 协议。

丰富的功能扩展

  • 中间件(Middleware):拦截请求/响应,实现日志、认证、限流等。
  • 信号(Signals):通过事件触发自定义逻辑(如请求完成时通知)。
  • 流式上传/下载:处理大文件或实时数据流。

典型使用场景

  • 高并发网络爬虫
    • 单线程爬取数千网页,避免线程爆炸。
  • 微服务间调用
    • 异步调用其他服务的 REST API,减少等待时间。
  • 实时数据推送
    • 通过 WebSocket 实现实时消息推送(如聊天应用)。
  • API 代理层
    • 构建异步 API 网关,转发请求到多个后端服务。

4.3. 🔧 完整代码

安装aiohttp

pip install aiohttp
Installing collected packages: propcache, multidict, frozenlist, async-timeout, aiohappyeyeballs, yarl, aiosignal, aiohttp

发送 GET 请求案例代码

# weather_native_async.py
import asyncio
import aiohttp

CITY_COORDS = { ... }

async def fetch_weather_async(session, city):
    lat, lon = CITY_COORDS[city]
    url = f"https://api.open-meteo.com/v1/forecast?latitude={lat}&longitude={lon}&current_weather=true"
    try:
        async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as resp:
            data = await resp.json()
            temp = data["current_weather"]["temperature"]
            return city, f"{temp}°C"
    except Exception as e:
        return city, f"Error: {e}"

async def main():
    cities = list(CITY_COORDS.keys())
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_weather_async(session, city) for city in cities]
        results = await asyncio.gather(*tasks)
        for city, weather in results:
            print(f"{city}: {weather}")

if __name__ == "__main__":
    import time
    start = time.time()
    asyncio.run(main())
    print(f"\n⏱️  Total time: {time.time() - start:.2f}s")

运行结果

Beijing: 1.5°C
Harbin: -16.4°C      
Shanghai: 13.9°C     
Urumqi: -7.6°C       
Sanya: 26.7°C        
Qingdao: 2.2°C       
Dalian: -5.8°C       
Mohe: -23.4°C        
Xiamen: 22.6°C       
Chengdu: 14.9°C      
Dali: 13.2°C

⏱️  Total time: 1.81s

**发送 POST 请求(带 JSON 数据)

async def post_data():
    url = "https://api.example.com/submit"
    payload = {"key": "value"}
    async with aiohttp.ClientSession() as session:
        async with session.post(url, json=payload) as response:
            print("响应状态:", response.status)
            print("响应文本:", await response.text())

asyncio.run(post_data())

设置超时与重试

from aiohttp import ClientTimeout, ClientError
import backoff

@backoff.on_exception(backoff.expo, ClientError, max_tries=3)
async def fetch_with_retry():
    timeout = ClientTimeout(total=5)  # 总超时 5 秒
    try:
        async with aiohttp.ClientSession(timeout=timeout) as session:
            async with session.get("https://api.example.com/data") as response:
                return await response.json()
    except asyncio.TimeoutError:
        print("请求超时,重试中...")
        raise

asyncio.run(fetch_with_retry())

5. 对比总结:桥接 vs 原生异步

维度 run_in_executor + requests aiohttp
是否需要桥接 ✅ 是(同步 → 异步) ❌ 否(原生异步)
线程使用 多线程(线程池) 单线程
资源开销 较高(线程上下文切换) 极低
代码侵入性 低(复用旧代码) 中(需学新 API)
并发上限 受限于线程池大小 可轻松支持数千并发
适用阶段 迁移期、临时方案 新项目、生产环境
用时 2.82s 1.81s

🎯 记住

  • 桥接是权宜之计,用于兼容旧代码;
  • 原生异步是未来方向,更高效、更 Pythonic。

6. 给初学者的行动建议

  1. 理解“桥接”概念run_in_executor 是异步世界调用同步世界的“安全通道”。
  2. 不要滥用桥接:能用 aiohttpaiomysqlhttpx(支持 async)就别用 requests
  3. 永远不要在 async 函数里直接调用阻塞函数(如 time.sleep, requests.get)。
  4. 先理解 async/await 基础:协程不是多线程,而是协作式调度。
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐