一、aiohttp 模块

aiohttp 是Python中 基于asyncio实现异步HTTP框架,主要有两大核心能力:

异步 HTTP 客户端: 替代同步的requests用于异步发送HTTP请求(GET/POST)适合高并发的网络请求场景,如爬虫、API调用

异步 HTTP 服务器: 快速搭建异步Web服务,替代部分同步的Flask/Django的场景,适合IO密集型的Web应用

核心优势:
异步非阻塞,能在单线程中处理大量并发请求,大幅提升 IO 密集型任务的效率。比如同时请求 100 个接口,无需等待前一个完成。适合高并发网络请求 / 轻量级 Web 服务

二、安装aiohttp

# 核心库
pip install aiohttp
# 安装加速依赖(可选,提升性能)
pip install aiohttp[speedups] 

三、异步 HTTP 客户端

作用是 发起异步 HTTP/HTTPS 请求(GET/POST/PUT 等),管理连接池、Cookie、超时等

* aiohttp.ClientSession(): 必须用 async with 管理,自动关闭连接,避免资源泄露 
* aiohttp.ClientTimeout(total=10): 设置超时,避免请求无限等待,total=10 表示总超时 10# 注意事项:
1. 避免在异步函数中调用同步阻塞函数,如 time.sleep()、requests.get(),会阻塞整个事件循环,改用 asyncio.sleep()
2. 复用 ClientSession:
ClientSession 是"可复用"的,不要为每个请求创建新的 ClientSession,一个业务流程中只创建一个ClientSession,利用连接池复用 TCP 连接,避免频繁创建 / 关闭导致的性能损耗
3. 异常处理:
即使开启raise_for_status,仍需捕获ClientError通用异常、网络异常、TimeoutError超时异常、JSON解析等
4. 必配 timeout: 
无论场景如何,必须配置超时,防止请求挂起阻塞事件循环

3.1、ClientSession 会话管理

ClientSession 是 aiohttp的核心 ,所有请求都通过它发起,且必须配合**async/await**使用。
ClientSession 是客户端的核心入口,用于管理 HTTP 会话的生命周期
(1)内置连接池,复用 TCP 连接,减少握手开销
(2)自动维护 Cookie、请求头headers
(3)统一管理超时、代理、SSL 等配置
(4)必须通过 async with 异步上下文管理器使用,自动释放资源

3.1.1、初始化方法:ClientSession.__init__() 的核心参数

# 核心参数:
* base_url: "设置全局基础 URL"
  - 作用: 所有请求的路径会自动拼接在该地址后,避免重复书写相同的域名/接口前缀,简化代码
  - 默认值: None

* headers: "设置全局请求头,所有请求默认携带"
  - 若单次请求单独配置headers,会覆盖全局同名请求头(非同名则合并)
  - 典型场景: 统一设置 User-Agent、Accept、Content-Type、Token 等通用头
  - 默认值: None

*  cookies: "设置全局 Cookie,所有请求默认携带"
  - ClientSession会"自动维护"响应返回的 Cookie,无需手动更新
  - 典型场景: 请求需要登录态的接口,提前传入登录后的 Cookie
  - 默认值: None

* timeout: "设置全局超时规则,控制请求的生命周期"
  - 支持"简化配置"(直接传数字,总超时)和"精细化配置"(ClientTimeout,分阶段超时),单次请求可单独配置超时覆盖全局
  - "ClientTimeout""精细化配置参数":
    - connect=5: 建立TCP连接的超时时间
    - sock_read=10: 从套接字读取响应数据的超时时间,None=使用total值
    - sock_connect: 套接字连接超时,None=使用connect值
    - total=30: 整个请求的生命周期超时,None=无限制
  - "实际开发必须配置",避免因服务端无响应导致事件循环阻塞
  - 默认值: sentinel 默认不超时,易导致请求无限挂起

* connector: "配置底层连接池,控制最大并发连接数、每个域名最大连接数、DNS 缓存、SSL 验证等"
  - 高并发请求的核心优化参数
  - 默认值:None(自动创建默认TCPConnector,全局最大连接 100- "TCPConnector核心参数":
    - limit: 全局最大并发连接数,高并发建议根据服务器配置调整
      - 默认: 1000=无限制
    - limit_per_host: 每个域名的最大连接数
      - 默认: 0=使用 limit 值,建议设置10-20
    - ttl_dns_cache: DNS缓存超时(秒)
      - 默认: 3000=禁用缓存,适合动态域名
    - ssl: 是否验证 SSL 证书
      - 默认: True,测试环境可设 False 忽略自签名证书
    - proxy: 全局代理地址

* proxy: "设置全局代理,所有请求默认通过该代理发起"
  - 支持HTTP 代理和SOCKS5 代理,单次请求可单独配置 proxy 覆盖全局
  - 默认值: None

* proxy_auth: "为全局代理配置基础认证(用户名 + 密码),适用于需要账号密码的代理服务器"
  - 默认值:None

* auth: 设置全局 HTTP 基础认证,所有请求自动在请求头添加Authorization: Basic xxx,无需手动拼接认证头
  - 默认值: None

* raise_for_status: "设置自动触发异常"
  - 若响应状态码为4xx(客户端错误)/5xx(服务器错误),会自动调用resp.raise_for_status(),抛出aiohttp.ClientResponseError,无需手动判断状态码
  - 默认值: False

* auto_decompress: "是否自动解压响应内容,支持 gzip/deflate/brotli 三种压缩格式"
  - 默认值: True 开启,无需手动处理解压,直接读取原始内容即可
  - 若需要获取原始压缩数据(如做爬虫数据缓存),可设为 False
3.1.1.1、核心参数单个示例
# base_url
async def main():
    # 配置基础URL为接口根地址
    async with aiohttp.ClientSession(base_url="https://api.httpbin.org") as session:
        # 实际请求:https://api.httpbin.org/get
        async with session.get("/get") as resp:
            print(await resp.json())
        # 实际请求:https://api.httpbin.org/post
        async with session.post("/post", json={"key": "val"}) as resp:
            print(await resp.json())

# headers
async def main():
    # 全局请求头:统一UA+指定接收JSON+携带令牌
    global_headers = {
        "User-Agent": "Python-Aiohttp/3.9.1",
        "Accept": "application/json",
        "X-Token": "abc123def456"
    }
    async with aiohttp.ClientSession(headers=global_headers) as session:
        # 单次请求覆盖全局的Accept,新增Referer(最终头为合并+覆盖)
        async with session.get("/get", headers={"Accept": "*/*", "Referer": "https://xxx.com"}) as resp:
            print(dict(resp.request_info.headers))  # 查看最终发送的请求头

# Cookie
async def main():
    # 全局Cookie:携带登录态uid和token
    global_cookies = {"uid": "10086", "login_token": "xyz789"}
    async with aiohttp.ClientSession(cookies=global_cookies) as session:
        async with session.get("/cookies") as resp:
            print(await resp.json())  # 查看服务端接收到的Cookie

# Timeout
# 简化配置:直接传数字,指定总超时,整个请求从连接到读取完成的总时间
async def main():
    # 全局总超时10秒:所有请求若10秒内未完成则抛超时异常
    async with aiohttp.ClientSession(timeout=10) as session:
        async with session.get("/delay/15") as resp:  # 接口延迟15秒返回
            print(await resp.json())  # 会抛出asyncio.TimeoutError
# 精细化配置:用ClientTimeout分阶段控制(推荐,更灵活)
from aiohttp import ClientTimeout
async def main():
    # 精细化超时配置:连接5秒+读取10秒+总超时30秒
    timeout = ClientTimeout(
        connect=5,       # 建立TCP连接的超时时间
        sock_read=10,    # 从套接字读取响应数据的超时时间
        total=30         # 整个请求的生命周期超时(兜底)
    )
    async with aiohttp.ClientSession(timeout=timeout) as session:
        async with session.get("/delay/8") as resp:
            print(await resp.json())  # 读取超时(sock_read=10秒可正常,delay=12则超时)

# connector  
# 高并发 + 禁用 DNS 缓存
from aiohttp import TCPConnector
async def main():
    # 配置连接池:全局最大200连接,每个域名20连接,禁用DNS缓存
    connector = TCPConnector(
        limit=200,
        limit_per_host=20,
        ttl_dns_cache=0
    )
    async with aiohttp.ClientSession(connector=connector) as session:
        # 高并发请求时,连接池会自动复用TCP连接,减少握手开销
        tasks = [session.get("/get") for _ in range(30)]
        results = await asyncio.gather(*tasks)
        print(f"成功请求{len(results)}次")
# 忽略SSL 证书验证,测试环境专用
async def main():
    # 禁用SSL验证(生产环境禁止使用!)
    connector = TCPConnector(ssl=False)
    async with aiohttp.ClientSession(connector=connector) as session:
        async with session.get("https://自签名证书的域名.com") as resp:
            print(resp.status)

# proxy
async def main():
    async with aiohttp.ClientSession(
        # HTTP代理(本地代理示例,根据实际代理地址修改)
        proxy="http://127.0.0.1:7890"
        # SOCKS5代理:proxy="socks5://127.0.0.1:7890"
    ) as session:
        async with session.get("/ip") as resp:
            print(await resp.json())  # 查看代理后的出口IP

# proxy_auth
from aiohttp import BasicAuth
async def main():
    async with aiohttp.ClientSession(
        proxy="http://127.0.0.1:7890",
        # 代理认证:用户名admin,密码123456
        proxy_auth=BasicAuth(login="admin", password="123456")
    ) as session:
        async with session.get("/ip") as resp:
            print(await resp.json())

# auth
from aiohttp import BasicAuth
async def main():
    async with aiohttp.ClientSession(
        # HTTP基础认证:用户名user,密码pass123
        auth=BasicAuth("user", "pass123")
    ) as session:
        async with session.get("/basic-auth/user/pass123") as resp:
            print(await resp.json())  # 鉴权成功返回{"authenticated":true,"user":"user"}

# raise_for_status
async def main():
    async with aiohttp.ClientSession(raise_for_status=True) as session:
        try:
            # 响应状态码404,会自动抛异常
            async with session.get("/status/404") as resp:
                data = await resp.json()  # 不会执行到这一步
        except aiohttp.ClientResponseError as e:
            print(f"请求失败:{e.status} - {e.message}")  # 捕获并处理异常

# auto_decompress
async def main():
    # 关闭自动解压,获取原始gzip压缩数据
    async with aiohttp.ClientSession(auto_decompress=False) as session:
        async with session.get("/gzip") as resp:
            raw_data = await resp.read()  # 原始二进制压缩数据
            print(f"原始数据长度:{len(raw_data)}")
    
    # 开启自动解压(默认),直接读取解压后的文本
    async with aiohttp.ClientSession(auto_decompress=True) as session:
        async with session.get("/gzip") as resp:
            text_data = await resp.text()  # 已解压的文本
            print(f"解压后文本长度:{len(text_data)}")
3.1.1.1、核心参数组合示例

结合高并发 + 超时控制 + 全局配置 + 代理 + 异常自动触发,可作为生产环境可用的ClientSession配置模板

import asyncio
import aiohttp
from aiohttp import ClientTimeout, TCPConnector, BasicAuth

async def fetch_url(session, url):
    try:
        async with session.get(url) as response:
            return await response.json()
    except Exception as e:
        return f"请求失败: {str(e)}"

async def main():
    # 1. 精细化超时配置
    timeout = ClientTimeout(
        connect=5,     # 建立TCP连接的超时时间
        sock_read=10,  # 从套接字读取响应数据的超时时间
        total=20       # 整个请求的生命周期超时
    )
    # 2. 高并发连接池配置
    connector = TCPConnector(
        limit=200,          # 全局最大并发连接数
        limit_per_host=20,  # 每个域名的最大连接数
        ttl_dns_cache=0     # DNS 缓存超时
    )
    # 3. 全局请求头
    headers = {
        "User-Agent": "Python-Aiohttp/3.9.1",
        "Accept": "application/json",
        "X-App-Version": "1.0.0"
    }
    # 4. 初始化ClientSession
    try:
        async with aiohttp.ClientSession(
            base_url="https://api.example.com",
            headers=headers,
            timeout=timeout,
            connector=connector,
            proxy="http://proxy:8080", # 按需开启代理
            # proxy_auth=BasicAuth("admin", "123456"),  # 代理需要认证时开启
            # auth=BasicAuth("user", "pass123"),        # 接口需要基础认证时开启
            raise_for_status=True,
            auto_decompress=True,
            trust_env=False

        ) as session:
            # 高并发请求10个接口
            tasks = [fetch_url(session, f"/get?num={i}") for i in range(10)]
            results = await asyncio.gather(*tasks, return_exceptions=True)
            return results
    except Exception as eg:
        print(f"Partial failures: {eg}")

if __name__ == '__main__':
    asyncio.run(main())

3.1.2、ClientSession 核心请求方法

GET / POST / PUT / DELETE / HEAD / OPTIONS
所有请求方法的参数结构一致,核心参数通用,仅 HTTP 方法不同
以下以 session.get() 为例详解

# 核心参数:
* url: 请求目标 URL
  - 类型: str/URL

* params: URL查询参数 "GET请求"
  - 类型: dict/list/tuple/str
  - 自动拼接为: "?key1=val1&key2=val2"

* data: 请求体的表单/二进制数据 "POST/PUT等请求"
  - 类型: bytes/dict/FormData

* json: JSON格式请求体
  - 自动序列化,设置 Content-Type: application/json
  - 类型: dict/list

* headers: 单次请求头 "覆盖全局headers"
  - 类型: dict

* cookies: 单次请求 Cookie "新增/覆盖全局CookieJar"
  - 类型: dict

* timeout: 单次请求超时 "覆盖全局timeout"
  - 类型: ClientTimeout/int

* proxy: 代理地址
  - http://127.0.0.1:7890 或者 socks5://127.0.0.1:7890

* files: 上传文件 "POST/PUT常用"
  - 类型: dict

* ssl: 是否开启SSL验证
  - False: 禁用验证,适合自签名证书
  - True: 开启验证,默认
  - 类型: bool/SSLContext

* allow_redirects: 是否允许重定向(3xx 响应)
3.1.2.1、GET 请求
import asyncio
import aiohttp

# 全局超时配置
timeout = aiohttp.ClientTimeout(total=10)

async def demo_get(session):
    params = {"name": "张三", "age": 20}  # 查询参数:自动拼接为 ?name=张三&age=20

    # 单次请求头(覆盖全局)
    headers = {"Accept": "application/json"}

    try:
        # 2. 发起 GET 请求,await 等待响应
        async with session.get(
            url="https://httpbin.org/get",
            params=params,
            headers=headers,
            timeout=timeout
        ) as response:
            if response.status == 200:
                text = await response.text()  # 获取文本响应,如 HTML/JSON 字符串
                # json_data = await response.json()  # 如果响应是 JSON,用这个
                # binary_data = await response.read()  # 获取二进制数据(如图片)
                return text
    except aiohttp.ClientError as e:
        raise aiohttp.ClientError(f"Error fetching {response.url}: {str(e)}")

async def main():
    # 1. 创建 ClientSession(异步上下文管理器),自动管理连接池
    async with aiohttp.ClientSession(
            headers={"User-Agent": "aiohttp-demo/1.0"},  # 全局请求头
            timeout=timeout
    ) as session:
        await demo_get(session)

if __name__ == '__main__':
    result = asyncio.run(main())
3.1.2.2、POST 请求
import asyncio
import aiohttp
# 全局超时配置
timeout = aiohttp.ClientTimeout(total=10)

async def demo_post(session):
    # 2.1 JSON 请求体
    json_data = {"name": "张三", "age": 20}
    try:
        async with session.post(
            url="https://httpbin.org/post",
            json=json_data,  # 自动序列化 JSON,设置 Content-Type: application/json
            timeout=timeout
        ) as response:
            if response.status == 200:
                return await response.json()
    except aiohttp.ClientError as e:
        raise aiohttp.ClientError(f"Error fetching {response.url}: {str(e)}")

    # 2.2 表单请求体(设置 Content-Type: application/x-www-form-urlencoded)
    form_data = {"gender": "male", "city": "Beijing"}
    try:
        async with session.post(
            url="https://httpbin.org/post",
            data=form_data,
            timeout=timeout
        ) as response:
            if response.status == 200:
                return await response.json()
    except aiohttp.ClientError as e:
        raise aiohttp.ClientError(f"Error fetching {response.url}: {str(e)}")

    # 2.3 多表单/文件上传(设置 Content-Type: multipart/form-data; boundary=----)
    multipart_data = aiohttp.FormData()
    multipart_data.add_field("username", "user")
    multipart_data.add_field("password", "pass123456")
    multipart_data.add_field("text_field", "测试文本")
    # 上传文件(本地文件路径)
    multipart_data.add_field(
        "file_field",
        open("demo.txt", "rb"),  # 二进制模式打开
        filename="demo.txt",  # 文件名
        content_type="text/plain"  # MIME类型
    )
    try:
        async with session.post(
            url="https://httpbin.org/post",
            data=multipart_data,
            timeout=timeout
        ) as response:
            if response.status == 200:
                return await response.json()
    except aiohttp.ClientError as e:
        raise aiohttp.ClientError(f"Error fetching {response.url}: {str(e)}")

async def main():
    # 1. 创建 ClientSession(异步上下文管理器),自动管理连接池
    async with aiohttp.ClientSession(
            headers={"User-Agent": "aiohttp-demo/1.0"},  # 全局请求头
            timeout=timeout
    ) as session:
        await demo_post(session)
if __name__ == '__main__':
    result = asyncio.run(main())
3.1.2.3、并发请求

多个请求几乎同时发起,总耗时≈最慢的那个请求的耗时,而非所有请求耗时之和

import asyncio
import aiohttp
async def fetch_url(session, url):
    async with session.get(url) as response:
        if response.status != 200:
            raise Exception(f"Error fetch {url}; status code: {response.status}")
        # return await response.text()
        return response.status

async def main():
    url_list = [
        "https://httpbin.org/get?num=1",
        "https://httpbin.org/get?num=2",
        "https://httpbin.org/get?num=3"
    ]
    try:
        async with aiohttp.ClientSession() as session:
            tasks  =[fetch_url(session, url) for url in url_list]
            results = await asyncio.gather(*tasks, return_exceptions=True)
    except Exception as eg:
        print(f"Partial failures: {eg}")

    for url, result in zip(url_list, results):
        print(f"{url}: {result}")
if __name__ == '__main__':
    asyncio.run(main())
3.1.2.4、请求重试(配合 tenacity)
import aiohttp
import asyncio
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
# 重试规则:最多3次,指数退避(1s→2s→4s),仅捕获ClientError
@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=1, max=4),
    retry=retry_if_exception_type(aiohttp.ClientError)
)
async def fetch_with_retry(session, url):
    async with session.get(url, timeout=5) as resp:
        resp.raise_for_status()
        return await resp.text()

async def main():
    async with aiohttp.ClientSession() as session:
        try:
            result = await fetch_with_retry(session, "https://httpbin.org/get")
            print("请求成功:", result[:50])
        except aiohttp.ClientError as e:
            print("3次重试后仍失败:", e)
asyncio.run(main())

3.1.3、ClientSession 核心属性

# 核心属性:
* connector: 当前会话连接池管理器,可"查看/动态调整"连接池配置,如临时修改最大连接数

* timeout: 会话的全局超时配置
  - 初始化时传入的 timeout 参数,若传数字会自动转为 ClientTimeout 对象

* cookie_jar: 会话的 Cookie 管理器,负责存储、发送、更新 Cookie,是"保持登录态的核心"
  - session.cookie_jar.update_cookies({"token": "123"}): 手动添加/更新 Cookie(cookies为字典)
  - clear(): 清空所有 Cookie
  - filter_cookies(url): 筛选指定 URL 可发送的 Cookie

* closed: 会话是否已关闭
  - True: 表示关闭,无法再发起请求
  - False: 表示正常运行

* headers: 只读,会话的全局请求头,底层是有序字典,支持"大小写不敏感的键查找"
# base_url
async def main():
    async with aiohttp.ClientSession(base_url="https://api.httpbin.org") as session:
        # 查看基础URL(yarl.URL对象,可转字符串)
        print("基础URL(原始):", session._base_url)
        print("基础URL(字符串):", str(session._base_url))
        # 拼接路径(等价于 session.get("/get"))
        full_url = session._base_url / "get"
        print("拼接后的完整URL:", full_url)

# headers
async def main():
    global_headers = {"User-Agent": "Aiohttp-Demo", "X-Token": "123456"}
    async with aiohttp.ClientSession(headers=global_headers) as session:
        # 查看全局请求头(整体)
        print("全局请求头:", dict(session.headers))
        # 动态修改请求头
        session.headers["User-Agent"] = "Demo-Aiohttp"
        # 按键查找(大小写不敏感)
        print("User-Agent:", session.headers.get("user-agent"))
        print("X-Token:", session.headers["X-Token"])

# timeout
from aiohttp import ClientTimeout
async def main():
    timeout = ClientTimeout(connect=5, sock_read=10, total=30)
    async with aiohttp.ClientSession(timeout=timeout) as session:
        # 查看全局超时配置
        print("全局超时对象:", session.timeout)
        print("连接超时(秒):", session.timeout.connect)
        print("读取超时(秒):", session.timeout.sock_read)
        print("总超时(秒):", session.timeout.total)

# connector
from aiohttp import TCPConnector
async def main():
    connector = TCPConnector(limit=200, limit_per_host=20)
    async with aiohttp.ClientSession(connector=connector) as session:
        # 查看连接池配置
        print("连接池类型:", type(session.connector))
        print("全局最大连接数:", session.connector.limit)
        print("单域名最大连接数:", session.connector.limit_per_host)
        # 动态修改连接池配置(慎用,需确保线程安全)
        session.connector.limit = 150
        print("修改后全局最大连接数:", session.connector.limit)

# closed
async def main():
    session = aiohttp.ClientSession()
    # 初始状态:未关闭
    print("会话是否关闭(初始):", session.closed)
    # 发起请求
    async with session.get("https://api.httpbin.org/get") as resp:
        print("请求状态码:", resp.status)
    # 手动关闭会话
    await session.close()
    print("会话是否关闭(关闭后):", session.closed)
    
    # 尝试对已关闭的会话发起请求(会抛异常)
    try:
        async with session.get("https://api.httpbin.org/get") as resp:
            pass
    except RuntimeError as e:
        print("异常:", e)

# cookie
async def main():
    async with aiohttp.ClientSession() as session:
        # 1. 手动添加Cookie
        session.cookie_jar.update_cookies(
            {"uid": "10086", "login_token": "abc789"},
            url="https://api.httpbin.org"  # 指定Cookie生效的URL(可选)
        )   
        # 2. 查看所有Cookie
        print("会话所有Cookie:", session.cookie_jar._cookies)
        # 3. 筛选指定URL的Cookie
        filtered_cookies = session.cookie_jar.filter_cookies("https://api.httpbin.org")
        print("指定URL的Cookie:", {k: v.value for k, v in filtered_cookies.items()})
        # 4. 发起请求,验证Cookie是否携带
        async with session.get("https://api.httpbin.org/cookies") as resp:
            print("服务端接收的Cookie:", await resp.json())
        # 5. 清空Cookie
        session.cookie_jar.clear()
        print("清空后Cookie:", session.cookie_jar._cookies)

3.2、ClientResponse 响应对象

每次发起请求后,会返回 ClientResponse 对象,封装所有响应数据

# 核心方法: 
"均需 await"
* text(encoding=None): 读取响应文本,自动解码,encoding指定编码,如utf-8
* json(encoding=None, loads=json.loads): 解析JSON响应,失败抛出JSONDecodeError异常
* read(): 读取二进制响应(适合图片/视频/文件)
* release(): 手动释放响应资源
  - async with会自动调用,无需手动
* raise_for_status(): 手动触发 4xx/5xx 响应的异常(HTTPError)
* content.read(n=-1): 流式读取响应
  - n: 指定读取字节数,-1 读取全部,适合大文件
# 核心属性:
"无需 await"
* status: HTTP 响应状态码(200=成功,404=资源未找到,500=服务器错误)
* reason: 状态码描述,200"OK"404"Not Found"
* headers: 响应头,字典格式 如: resp.headers["Content-Type"]
* cookies: 响应返回的Cookie 如: resp.cookies["token"].value
* content_type: 响应内容类型 如: application/json、text/html
* url: 最终请求的 URL(含重定向后的地址)
* history: 重定向历史 如: 301/302跳转记录

3.2.1、流式读取大文件

import aiohttp
import asyncio
async def download_large_file():
    async with aiohttp.ClientSession() as session:
        async with session.get("https://httpbin.org/stream/100", timeout=60) as resp:
            # 流式读取(每次1024字节)
            with open("large_file.txt", "wb") as f:
                while True:
                    chunk = await resp.content.read(1024)
                    if not chunk:
                        break
                    f.write(chunk)
    print("大文件下载完成")
asyncio.run(download_large_file())

四、异步 HTTP 服务器

搭建轻量级异步 Web 服务,处理路由、请求、响应、WebSocket 等

4.1、web.Application 核心类

创建异步 Web 应用实例,管理路由、中间件、配置、信号(生命周期事件)

4.1.1、初始化方法:web.Application(**kwargs)

参数名 类型 默认值 含义
middlewares list [] 中间件列表(全局请求 / 响应拦截,如日志、鉴权)
router web.UrlDispatcher UrlDispatcher() 路由管理器(可自定义路由规则)
client_max_size int 1024*1024 最大请求体大小(默认 1MB,超出抛异常)
debug bool False 调试模式(开启后显示详细错误栈)

4.1.2、web.Application的核心方法

方法名 作用
add_routes(routes) 添加路由规则(接收 RouteTableDef 或路由列表)
router.add_get(path, handler) 快捷添加 GET 路由(post/put/delete 同理)
run_app(app, **kwargs) 运行应用(需导入 from aiohttp import web)

4.2、web.Request 请求对象

web.Request 是 aiohttp 服务器端处理请求的核心对象,每个路由处理函数的第一个参数就是该对象,封装了客户端发起的所有请求数据(URL、参数、请求体、头信息、Cookie 等)

4.2.1、web.Request 核心属性 “无需await

4.2.1.1、path 和 path_qs
* path: 请求的路径 "不含域名 和 查询参数"
* path_qs: 请求路径 + 查询参数,完整路径 "不含域名"
# 访问 http://127.0.0.1:8080/user/1001?name=张三
from aiohttp import web
async def handler(request):
    print("path请求路径:", request.path)  # /user/1001
    print("path_as请求路径:", request.path_as)  # /user/1001?name=张三
    return web.Response(text=f"Path: {request.path}")
app = web.Application()
app.add_routes([web.get("/user/{user_id}", handler)])
web.run_app(app, host="127.0.0.1", port=8080)
4.2.1.2、query

URL查询参数(即**?key1=val1&key2=val2**部分),支持大小写不敏感查找

# 常用方法:
* get(key, default=None): 获取单个值,避免键不存在报错
* getall(key): 获取同名多值
# 访问 http://127.0.0.1:8080/query?name=张三&tag=1&tag=2
async def handler(request):
    # 1. 获取单个查询参数
    name = request.query.get("name", "未知用户")
    age = request.query.get("age", 0)  # 字符串类型,需手动转int

    # 2. 获取同名多值(如 ?tag=1&tag=2&tag=3)
    tags = request.query.getall("tag")  # 返回列表:["1", "2", "3"]

    # 3. 转为普通字典(同名多值仅保留第一个)
    query_dict = dict(request.query)  # 核心修正:替代不存在的 to_dict()
    # 等价写法:query_dict = dict(request.query.items())

    # 4. 手动处理同名多值,转为 {key: list} 格式的字典
    query_multi_dict = {}
    for key in request.query.keys():
        query_multi_dict[key] = request.query.getall(key)
    return web.json_response({
        "single_value": {
            "name": name,
            "age": int(age)  # 手动转换类型
        },
        "multi_value": {
            "tags": tags
        },
        "query_dict": query_dict,  # 单值字典
        "query_multi_dict": query_multi_dict  # 多值字典
    })
app = web.Application()
app.add_routes([web.get("/query", handler)])
if __name__ == "__main__":
    web.run_app(app, host="127.0.0.1", port=8080)
4.2.1.3、match_info

路由路径参数(即 /{user_id} 中的动态部分),通过键名获取值

# http://127.0.0.1:8080/user/10086,响应:User ID: 10086
async def handler(request):
    # 获取路由参数({user_id})
    user_id = request.match_info.get("user_id", "0")
    # 直接通过[]获取(键不存在会抛KeyError,慎用)
    # user_id = request.match_info["user_id"]
    return web.Response(text=f"User ID: {user_id}")
app = web.Application()
app.add_routes([web.get("/user/{user_id}", handler)])
web.run_app(app, host="127.0.0.1", port=8080)
4.2.1.4、url

完整的请求 URL(含域名、路径、查询参数),支持拆解各部分

# 常用子属性:
* request.url.host: 域名      如:127.0.0.1
* request.url.port: 端口     如: 8080
* request.url.scheme: 协议   如: http/https
* request.url.query: 查询参数(同request.query)
# 访问 http://127.0.0.1:8080/url?a=1
async def handler(request):
    return web.json_response({
        "full_url": str(request.url),
        "host": request.url.host,
        "port": request.url.port,
        "scheme": request.url.scheme
    })
app = web.Application()
app.add_routes([web.get("/url", handler)])
web.run_app(app, host="127.0.0.1", port=8080)
4.2.1.5、method

获取大写的HTTP 请求方法

# http://127.0.0.1:8080/method,响应:Method: POST
async def handler(request):
    return web.Response(text=f"Method: {request.method}")
app = web.Application()
app.add_routes([
    web.post("/method", handler)
])
web.run_app(app, host="127.0.0.1", port=8080)
4.2.1.6、headers

获取请求头信息,支持大小写不敏感查找

# http://127.0.0.1:8080/headers
async def handler(request):
    # 获取请求头(大小写不敏感)
    user_agent = request.headers.get("User-Agent", "未知")
    content_type = request.headers.get("Content-Type", "无")
    token = request.headers.get("X-Token", "无")

    return web.json_response({
        "User-Agent": user_agent,
        "Content-Type": content_type,
        "X-Token": token
    })
app = web.Application()
app.add_routes([web.get("/headers", handler)])
web.run_app(app, host="127.0.0.1", port=8080)
4.2.1.7、cookies

客户端携带的 Cookie,通过 key.value 获取值

# 客户端携带Cookie uid=1001;token=abc789 访问,响应:UID: 1001, Token: abc789
async def handler(request):
    # 获取Cookie
    uid = request.cookies.get("uid", "").value if "uid" in request.cookies else "无"
    token = request.cookies.get("token", "").value if "token" in request.cookies else "无"
    return web.Response(text=f"UID: {uid}, Token: {token}")
app = web.Application()
app.add_routes([web.get("/cookies", handler)])
web.run_app(app, host="127.0.0.1", port=8080)
4.2.1.8、remote

客户端的 IP 地址,用于记录访问日志、限制 IP 等

async def handler(request):
    return web.Response(text=f"Your IP: {request.remote}")
app = web.Application()
app.add_routes([web.get("/ip", handler)])
web.run_app(app, host="127.0.0.1", port=8080)

4.2.2、web.Request 核心方法 “必须加await

4.2.2.1、json()

解析 JSON 格式的请求体(Content-Type 需为 application/json),失败抛 JSONDecodeError

# 测试:POST 提交 {"name":"张三","age":20},响应成功;提交非 JSON 数据,返回 400 错误
async def handler(request):
    try:
        # 解析JSON请求体
        data = await request.json()
        name = data.get("name")
        age = data.get("age")
        return web.json_response({"code": 200, "msg": "成功", "data": data})
    except Exception as e:
        return web.json_response({"code": 400, "msg": f"解析失败:{str(e)}"}, status=400)
app = web.Application()
app.add_routes([web.post("/json", handler)])
web.run_app(app, host="127.0.0.1", port=8080)
4.2.2.2、post()

解析表单格式的请求体
Content-Type 为 application/x-www-form-urlencodedmultipart/form-data

async def handler(request):
    # 解析表单数据
    form_data = await request.post()
    username = form_data.get("username")
    password = form_data.get("password")
    return web.json_response({"username": username, "password": password})
app = web.Application()
app.add_routes([web.post("/form", handler)])
web.run_app(app, host="127.0.0.1", port=8080)
4.2.2.3、files()

读取原始二进制请求体(通用方法,适合非 JSON / 表单的自定义格式,如 protobuf、二进制文件)

async def handler(request):
    # 读取原始二进制请求体
    raw_data = await request.read()
    return web.Response(
        text=f"原始数据长度:{len(raw_data)},内容:{raw_data.decode('utf-8')}",
        status=200
    )
app = web.Application()
app.add_routes([web.post("/raw", handler)])
web.run_app(app, host="127.0.0.1", port=8080)
4.2.2.4、content.read(n=-1)

流式读取请求体,适合大文件,避免一次性加载到内存
n 为读取字节数,-1表示读取全部

async def handler(request):
    # 流式读取请求体(每次1024字节)
    total_size = 0
    with open("large_file.bin", "wb") as f:
        while True:
            chunk = await request.content.read(1024)
            if not chunk:
                break
            total_size += len(chunk)
            f.write(chunk)
    return web.Response(text=f"大文件接收完成,总大小:{total_size} 字节")
app = web.Application(client_max_size=1024*1024*100)  # 允许最大100MB请求体
app.add_routes([web.post("/large_upload", handler)])
web.run_app(app, host="127.0.0.1", port=8080)

4.2.3、web.Response() 响应对象

aiohttp 服务端构建 HTTP 响应的基础类,所有路由处理函数最终都需要返回一个 Response 实例(或其子类如 json_response/FileResponse),它封装了响应状态码、响应头、响应体等所有响应相关的信息

参数名 类型 默认值 核心含义 & 使用场景
text str None 响应体为文本内容(自动编码为 bytes,默认 utf-8),适合返回纯文本、HTML 等
body bytes/IOBase None 响应体为二进制数据(优先级高于 text),适合返回自定义二进制、文件流等
status int 200 HTTP 响应状态码(如 200 = 成功,400 = 参数错误,404 = 未找到,500 = 服务器错误)
reason str None 状态码描述(如 404 对应 “Not Found”,默认自动匹配状态码)
headers dict/CIMultiDict None 响应头(如 Content-Type、Set-Cookie、Cache-Control 等)
content_type str None 响应体的 MIME 类型(如 text/html、application/json,优先级高于 headers 中的 Content-Type)
charset str utf-8 文本响应的编码(仅对 text 参数生效)
compress int/bool False 是否压缩响应体(True = 默认压缩级别,6 = 自定义级别,支持 gzip/deflate)
set_cookie str/Tuple None 快捷设置单个 Cookie(替代手动在 headers 中加 Set-Cookie)
set_cookies Iterable () 批量设置多个 Cookie
headers dict None 自定义响应头(如跨域的 Access-Control-Allow-Origin)
chunked bool False 是否启用分块传输编码(Transfer-Encoding: chunked),适合大响应体(如流式返回)
timeout float None 响应发送超时时间(秒)
4.2.3.1、基础文本响应

返回纯文本、HTML 等文本类型响应,核心用 text 参数,自动处理编码和 Content-Type

from aiohttp import web
async def text_response_handler(request):
    # 1. 纯文本响应(默认 Content-Type: text/plain; charset=utf-8)
    # return web.Response(text="Hello, aiohttp!")
    # 2. HTML 响应(指定 content_type 为 text/html)
    html_content = """
    <html>
        <head><title>Test Page</title></head>
        <body><h1>Hello, aiohttp!</h1></body>
    </html>
    """
    return web.Response(
        text=html_content,
        content_type="text/html",  # 关键:指定HTML类型
        status=200  # 状态码默认200
    )
app = web.Application()
app.add_routes([web.get("/text", text_response_handler)])
web.run_app(app, host="127.0.0.1", port=8080)
4.2.3.2、错误响应(4xx/5xx)

返回客户端 / 服务器错误响应,核心是设置 status 状态码和错误信息

async def error_response_handler(request):
    # 400 参数错误
    if "name" not in request.query:
        return web.Response(
            text=json.dumps({"code": 400, "msg": "name参数必传"}, ensure_ascii=False),
            content_type="application/json",
            status=400  # 关键:400状态码
        )
    # 500 服务器错误
    try:
        1 / 0  # 模拟异常
    except Exception as e:
        return web.Response(
            text=json.dumps({"code": 500, "msg": f"服务器错误:{str(e)}"}, ensure_ascii=False),
            content_type="application/json",
            status=500
        )
app.add_routes([web.get("/error", error_response_handler)])
4.2.3.3、流式响应(大文件 / 实时数据)

启用分块传输(chunked=True),流式返回数据(避免一次性加载大文件到内存)

async def stream_response_handler(request):
    # 构建流式响应(生成器返回分块数据)
    async def stream_generator():
        # 模拟逐行返回大文件内容
        with open("large_file.txt", "r", encoding="utf-8") as f:
            for line in f:
                yield line.encode("utf-8")  # 转为二进制分块
    return web.Response(
        body=stream_generator(),  # 流式响应体(生成器)
        content_type="text/plain",
        chunked=True  # 启用分块传输
    )
app.add_routes([web.get("/stream", stream_response_handler)])

4.2.4、web.json_response() 响应对象

参数名 类型 默认值 核心含义 & 使用场景
data dict/list/str/int 无(必填) 要返回的 JSON 数据(支持任意可序列化的 Python 对象)
status int 200 HTTP 响应状态码(如 200 = 成功,400 = 参数错误,500 = 服务器错误)
headers dict/CIMultiDict None 自定义响应头(如跨域头、Token、请求 ID 等)
content_type str application/json JSON 响应的 MIME 类型(默认即可,无需修改)
charset str utf-8 JSON 字符串的编码(确保中文 / 特殊字符正常显示)
dumps Callable json.dumps 自定义 JSON 序列化函数(如用 ujson 提升性能)
**kwargs - - 继承 web.Response 的其他参数(如 reason/compress/set_cookie 等)
4.2.4.1、 基础 JSON 响应
from aiohttp import web
async def basic_json_handler(request):
    # 业务数据(任意可序列化的Python对象)
    response_data = {
        "code": 200,          # 自定义业务码
        "msg": "请求成功",     # 提示信息
        "data": {             # 业务数据体
            "user_id": 10086,
            "username": "张三",
            "age": 25,
            "tags": ["Python", "aiohttp"]
        }
    }
    
    # 构建JSON响应(核心:仅需传入data)
    return web.json_response(
        data=response_data,    # 必填:要序列化的JSON数据
        status=200,            # 可选:HTTP状态码,默认200
        headers={              # 可选:自定义响应头
            "X-Request-Id": "abc123456",  # 请求唯一标识
            "X-App-Version": "1.0.0"      # 应用版本
        }
    )
app = web.Application()
app.add_routes([web.get("/basic-json", basic_json_handler)])
if __name__ == "__main__":
    web.run_app(app, host="127.0.0.1", port=8080)
4.2.4.2、 错误 JSON 响应(4xx/5xx)
# 访问 http://127.0.0.1:8080/error-json(无 username 参数):返回 400 状态码 + 错误 JSON
# 访问 http://127.0.0.1:8080/error-json?username=张三:返回 500 状态码 + 异常信息
async def error_json_handler(request):
    # 模拟参数校验失败
    username = request.query.get("username")
    if not username:
        # 400 参数错误响应
        return web.json_response(
            data={
                "code": 40001,  # 自定义错误码(区分不同错误)
                "msg": "用户名不能为空",
                "data": None
            },
            status=400,  # HTTP状态码:400 Bad Request
            reason="Missing Required Parameter"  # 状态码描述(可选)
        )
    
    # 模拟服务器异常
    try:
        1 / 0  # 故意触发异常
    except Exception as e:
        # 500 服务器错误响应
        return web.json_response(
            data={
                "code": 50001,
                "msg": f"服务器内部错误:{str(e)}",
                "data": None
            },
            status=500  # HTTP状态码:500 Internal Server Error
        )
app.add_routes([web.get("/error-json", error_json_handler)])
4.2.4.2、 自定义 JSON 序列化函数
# 先安装:pip install ujson
import ujson
async def custom_dumps_handler(request):
    data = {"name": "李四", "score": 95.5}
    return web.json_response(
        data=data,
        dumps=ujson.dumps  # 自定义序列化函数(替换默认的json.dumps)
    )
app.add_routes([web.get("/custom-dumps", custom_dumps_handler)])

4.2.5、web.FileResponse() 响应对象

web.Response 的子类,专门用于高效返回文件(如图片、视频、文档、压缩包等)
关键特性:
1、自动推断 MIME 类型:根据文件后缀自动设置 Content-Type(如 .jpg → image/jpeg,.zip → application/zip);
2、断点续传:默认支持 Range 请求(如视频播放时拖拽进度条、大文件断点下载);
3、文件不存在处理:若 path 指向的文件不存在,自动抛出 web.HTTPNotFound 异常;
4、跨平台兼容:支持 Windows/macOS/Linux 的文件路径格式。

参数名 类型 默认值 核心含义 & 使用场景
path str/Path 无(必填) 要返回的文件路径(绝对路径 / 相对路径均可)
status int 200 HTTP 响应状态码(文件存在 = 200,不存在 = 404)
headers dict/CIMultiDict None 自定义响应头(如强制下载的 Content-Disposition
content_type str None 手动指定文件的 MIME 类型(默认自动推断)
chunk_size int 262144 (256KB) 流式传输的分块大小(可根据文件类型调整)
filename str None 下载时显示的文件名(覆盖原文件名)
last_modified datetime None 手动指定文件最后修改时间(默认读取文件元数据)
cache_control str None 缓存控制头(如 max-age=3600,控制浏览器缓存)
allow_range bool True 是否支持断点续传(Range 请求),默认开启
4.2.5.1、基础文件预览(图片 / 视频 / 文档)

返回文件供浏览器直接预览(如图片、HTML、PDF),无需下载

# 访问 http://127.0.0.1:8080/preview-image,浏览器直接显示图片
from aiohttp import web
from pathlib import Path
# 确保文件存在(test.jpg,可替换为自己的文件路径)
FILE_PATH = Path(__file__).parent / "test.jpg"
async def file_preview_handler(request):
    # 构建FileResponse(核心:仅需传入文件路径)
    return web.FileResponse(
        path=FILE_PATH,          # 必填:文件路径(Path对象/字符串均可)
        status=200,              # 可选:默认200
        cache_control="max-age=3600"  # 可选:浏览器缓存1小时
    )
app = web.Application()
app.add_routes([web.get("/preview-image", file_preview_handler)])
if __name__ == "__main__":
    web.run_app(app, host="127.0.0.1", port=8080)
4.2.5.2、文件强制下载(指定文件名)

让浏览器触发文件下载(而非预览),并自定义下载文件名

# 访问 http://127.0.0.1:8080/download-zip,浏览器自动下载文件,文件名显示为 我的压缩包.zip(而非原文件名 test.zip)
from aiohttp import web
from pathlib import Path
async def file_download_handler(request):
    # 原文件:test.zip,下载时显示为 "我的压缩包.zip"
    zip_path = Path(__file__).parent / "input" / "image_0.jpg"
    # 构建下载响应
    return web.FileResponse(
        path=zip_path,
        headers={
            # 强制下载(优先级高于浏览器默认预览行为)
            "Content-Disposition": 'attachment; filename="我的压缩包.zip"'
        }
    )
app = web.Application()
app.add_routes([web.get("/download-zip", file_download_handler)])
if __name__ == "__main__":
    web.run_app(app, host="127.0.0.1", port=8080)
4.2.5.3、大文件流式传输(支持断点续传)

返回超大文件(如 GB 级视频 / 压缩包),利用 FileResponse 的流式传输和断点续传特性
1、浏览器播放视频时,可拖拽进度条(断点续传
2、下载大文件时,中断后可继续下载(无需重新开始
3、内存占用始终为分块大小(1MB),不会加载整个文件到内存

async def large_file_handler(request):
    # 超大文件路径(如 2GB 的视频文件)
    large_file_path = Path(__file__).parent / "large_video.mp4"
    return web.FileResponse(
        path=large_file_path,
        chunk_size=1048576,  # 调整分块大小为 1MB(默认256KB,大文件可调大)
        allow_range=True,    # 开启断点续传(默认True,可省略)
        content_type="video/mp4"  # 手动指定MIME类型(确保视频播放器识别)
    )
app.add_routes([web.get("/large-video", large_file_handler)])
4.2.5.4、处理文件不存在的场景

主动捕获文件不存在的异常,返回自定义 JSON 错误响应

async def safe_file_handler(request):
    file_path = Path(__file__).parent / "non_exist_file.txt"
    try:
        # 尝试返回文件
        return web.FileResponse(path=file_path)
    except web.HTTPNotFound:
        # 文件不存在时,返回自定义JSON错误
        return web.json_response(
            data={
                "code": 40401,
                "msg": "文件不存在",
                "data": None
            },
            status=404
        )
app.add_routes([web.get("/safe-file", safe_file_handler)])
4.2.5.5、限制文件访问(权限校验)
async def auth_file_handler(request):
    # 1. 权限校验:获取请求头中的Token
    token = request.headers.get("X-Token")
    if token != "valid_token_123":
        # 无权限:返回403错误
        return web.json_response(
            data={"code": 40301, "msg": "无访问权限"},
            status=403
        )
    # 2. 权限通过:返回文件
    file_path = Path(__file__).parent / "secret_doc.pdf"
    return web.FileResponse(path=file_path)
app.add_routes([web.get("/auth-file", auth_file_handler)])
4.2.5.5、临时文件清理

若返回的是临时文件,建议在响应完成后清理

import aiofiles.os
async def temp_file_handler(request):
    temp_path = Path("temp_file.txt")
    resp = web.FileResponse(path=temp_path)
    # 响应完成后删除临时文件
    async def cleanup():
        await aiofiles.os.remove(temp_path)
    request.loop.create_task(cleanup())
    return resp

五、客户端 + 服务端 完整案例

5.1、GET请求

客户端:

import asyncio
import aiohttp
# 全局超时配置
timeout = aiohttp.ClientTimeout(total=10)

async def demo_get(session):
    params = {"name": "张三", "age": 20}  # 查询参数:自动拼接为 ?name=张三&age=20
    # 单次请求头(覆盖全局)
    headers = {"Accept": "application/json"}
    try:
        # 2. 发起 GET 请求,await 等待响应
        async with session.get(
            url="http://127.0.0.1:8080/get",
            params=params,
            headers=headers,
            timeout=timeout
        ) as response:
            if response.status == 200:
                return await response.text()  # 获取文本响应,如 HTML/JSON 字符串
    except aiohttp.ClientError as e:
        raise aiohttp.ClientError(f"Error fetching {response.url}: {str(e)}")
async def main():
    # 1. 创建 ClientSession(异步上下文管理器),自动管理连接池
    async with aiohttp.ClientSession(
            headers={"User-Agent": "aiohttp-demo/1.0"},  # 全局请求头
            timeout=timeout
    ) as session:
        res = await demo_get(session)
        print("获取到服务器的返回结果:")
        print(res)
if __name__ == '__main__':
    asyncio.run(main())

服务端:

from aiohttp import web
# 1. 定义请求处理函数
async def handle(request):
    """
    使用web.request解析请求传递到服务端的参数
    """
    name = request.query.get('name', '未知用户')
    age = request.query.get('age', 0)
    print(f"接收到的参数为:{request.query}")
    return web.Response(
        text=f"请求用户为:{name}, 年龄是:{age}",
        status=200
    )
# 2. 创建应用实例
app = web.Application(
    client_max_size=2*1024*1024, # 最大客户端请求体大小为2MB
)
# 3. 注册路由
app.add_routes(
    [web.get('/get', handle)] # GET方法访问根路径时调用handle函数
)
if __name__ == '__main__':
    # 5. 启动服务
    web.run_app(
        app,
        host='127.0.0.1',  # 监听地址
        port=8080,        # 监听端口
        access_log=web.access_logger, # 访问日志记录器
    )

5.2、POST 请求 - 表单请求体

客户端:

import asyncio
import json
import aiohttp

# 全局超时配置
timeout = aiohttp.ClientTimeout(total=10)

async def demo_post(session):
    form_data = {"gender": "male", "city": "Beijing"}
    # 单次请求头
    headers = {"Accept": "application/x-www-form-urlencoded"}

    try:
        # 2. 发起 post 请求,await 等待响应
        async with session.post(
            url="http://127.0.0.1:8080/post",
            data=form_data,
            headers=headers,
            timeout=timeout
        ) as response:
            if response.status == 200: # HTTP 状态码判断
                return await response.json()  # 获取服务端返回的 JSON 响应数据
    except aiohttp.ClientError as e:
        raise aiohttp.ClientError(f"Error fetching {response.url}: {str(e)}")

async def main():
    # 1. 创建 ClientSession(异步上下文管理器),自动管理连接池
    async with aiohttp.ClientSession(
            headers={"User-Agent": "aiohttp-demo/1.0"},  # 全局请求头
            timeout=timeout
    ) as session:
        result = await demo_post(session)
        print("获取到服务器的返回结果:")
        print(json.dumps(result, indent=2, ensure_ascii=False))

if __name__ == '__main__':
    asyncio.run(main())

服务端:

from aiohttp import web
# 1. 定义请求处理函数
async def handle(request):
    """
    使用web.request解析请求传递到服务端的参数
    案例中客户端发送了 普通表单数据
    """
    # print(request.method)
    # print(request.headers)
    form_data = await request.post()
    gender = form_data.get("gender")
    city = form_data.get("city")

    print(f"异步服务端接收到的参数: gender={gender}, city={city}")

    response_data = {
        "code": "200",
        "msg": "请求成功",
        "data": {
            "gender": gender,
            "city": city
        }
    }
    # 使用json_response响应客户端请求
    return web.json_response(
        data=response_data, # 响应数据
        status=200
    )
# 2. 创建应用实例
app = web.Application(
    debug=False, # 启用调试模式
    client_max_size=2*1024*1024, # 最大客户端请求体大小为2MB
)
# 3. 注册路由
app.add_routes(
    [web.post('/post', handle)] # GET方法访问根路径时调用handle函数
)
if __name__ == '__main__':
    # 5. 启动服务
    web.run_app(
        app,
        host='127.0.0.1',  # 监听地址
        port=8080,        # 监听端口
        access_log=web.access_logger, # 访问日志记录器
    )

5.3、POST 请求 - form-data请求体

import asyncio
import json
import aiohttp

# 全局超时配置
timeout = aiohttp.ClientTimeout(total=10)

async def demo_post(session):
    multipart_data = aiohttp.FormData()
    multipart_data.add_field("username", "user")
    multipart_data.add_field("password", "pass123456")

    # 单次请求头
    headers = {"Content-Type": "application/x-www-form-urlencoded"}

    try:
        # 2. 发起 post 请求,await 等待响应
        async with session.post(
            url="http://127.0.0.1:8080/post",
            data=multipart_data,
            headers=headers,
            timeout=timeout
        ) as response:
            if response.status == 200: # HTTP 状态码判断
                return await response.json()  # 获取服务端返回的 JSON 响应数据
    except aiohttp.ClientError as e:
        raise aiohttp.ClientError(f"Error fetching {response.url}: {str(e)}")

async def main():
    # 1. 创建 ClientSession(异步上下文管理器),自动管理连接池
    async with aiohttp.ClientSession(
            headers={"User-Agent": "aiohttp-demo/1.0"},  # 全局请求头
            timeout=timeout
    ) as session:
        result = await demo_post(session)
        print("获取到服务器的返回结果:")
        print(json.dumps(result, indent=2, ensure_ascii=False))
if __name__ == '__main__':
    asyncio.run(main())
from aiohttp import web
# 1. 定义请求处理函数
async def handle(request):
    """
    使用web.request解析请求传递到服务端的参数
    案例中客户端发送了 form-data表单数据
    """
    form_data = await request.post()
    username = form_data.get("username")
    password = form_data.get("password")

    print(f"异步服务端接收到的参数: gender={username}, password={password}")

    response_data = {
        "code": "200",
        "msg": "请求成功",
        "data": {
            "username": username,
            "password": password
        }
    }
    # 使用json_response响应客户端请求
    return web.json_response(
        data=response_data, # 响应数据
        status=200
    )

# 2. 创建应用实例
app = web.Application(
    debug=False, # 启用调试模式
    client_max_size=2*1024*1024, # 最大客户端请求体大小为2MB
)
# 3. 注册路由
app.add_routes(
    [web.post('/post', handle)] # GET方法访问根路径时调用handle函数
)
if __name__ == '__main__':
    # 5. 启动服务
    web.run_app(
        app,
        host='127.0.0.1',  # 监听地址
        port=8080,        # 监听端口
        access_log=web.access_logger, # 访问日志记录器
    )

5.4、POST 请求 -json请求体

客户端:

import asyncio
import json
import aiohttp
# 全局超时配置
timeout = aiohttp.ClientTimeout(total=10)
async def demo_post(session):
    json_data = {"name": "张三", "age": 20} 

    # 单次请求头
    headers = {"Accept": "application/json"}

    try:
        # 2. 发起 GET 请求,await 等待响应
        async with session.post(
            url="http://127.0.0.1:8080/post",
            json=json_data,
            headers=headers,
            timeout=timeout
        ) as response:
            if response.status == 200: # HTTP 状态码判断
                return await response.json()  # 获取服务端返回的 JSON 响应数据
    except aiohttp.ClientError as e:
        raise aiohttp.ClientError(f"Error fetching {response.url}: {str(e)}")

async def main():
    # 1. 创建 ClientSession(异步上下文管理器),自动管理连接池
    async with aiohttp.ClientSession(
            headers={"User-Agent": "aiohttp-demo/1.0"},  # 全局请求头
            timeout=timeout
    ) as session:
        result = await demo_post(session)
        print("获取到服务器的返回结果:")
        print(json.dumps(result, indent=2, ensure_ascii=False))
if __name__ == '__main__':
    asyncio.run(main())

服务端:

from aiohttp import web
# 1. 定义请求处理函数
async def handle(request):
    """
    使用web.request解析请求传递到服务端的参数
    案例中客户端发送了post请求
    """
    data = await request.json()
    name = data.get('name')
    age = data.get('age')
    print(f"异步服务端接收到的参数: name={name}, age={age}")

    response_data = {
        "code": "200",
        "msg": "请求成功",
        "data": {
            "username": name,
            "user_age": age
        }
    }
    # 使用json_response响应客户端请求
    return web.json_response(
        data=response_data, # 响应数据
        status=200
    )
# 2. 创建应用实例
app = web.Application(
    debug=False, # 启用调试模式
    client_max_size=2*1024*1024, # 最大客户端请求体大小为2MB
)
# 3. 注册路由
app.add_routes(
    [web.get('/post', handle)] # GET方法访问根路径时调用handle函数
)
if __name__ == '__main__':
    # 5. 启动服务
    web.run_app(
        app,
        host='127.0.0.1',  # 监听地址
        port=8080,        # 监听端口
        access_log=web.access_logger, # 访问日志记录器
    )

六、WebSocket 异步通信(客户端 + 服务器)

服务器端:

from aiohttp import web
async def websocket_handler(request):
    # 升级为WebSocket连接
    ws = web.WebSocketResponse()
    await ws.prepare(request)
    # 监听消息
    async for msg in ws:
        if msg.type == web.WSMsgType.TEXT:
            # 回复消息
            print(f"服务器收到消息:{msg.data}")
            await ws.send_str(f"thanks, hava a good day!")
            if msg.data == "close":
                await ws.close()
        elif msg.type == web.WSMsgType.ERROR:
            print(f"WebSocket错误:{ws.exception()}")

    print("WebSocket连接关闭")
    return ws
app = web.Application()
app.add_routes([web.get("/ws", websocket_handler)])
web.run_app(app, host="127.0.0.1", port=8080)

客户端:

import aiohttp
import asyncio
async def websocket_client():
    async with aiohttp.ClientSession() as session:
        # 连接WebSocket服务器
        async with session.ws_connect("http://127.0.0.1:8080/ws") as ws:
            # 发送消息
            await ws.send_str("Hello, WebSocket!")
            # 接收回复
            async for msg in ws:
                if msg.type == aiohttp.WSMsgType.TEXT:
                    print(f"客户端收到:{msg.data}")
                    if msg.data == "服务器收到:close":
                        await ws.close()
                        break
                elif msg.type == aiohttp.WSMsgType.ERROR:
                    break
asyncio.run(websocket_client())
Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐