python3从入门到精通(七): 异步HTTP框架之aiohttp模块
是Python中的 异步HTTP框架替代同步的,用于异步发送HTTP请求(GET/POST)快速搭建异步Web服务,替代部分同步的Flask/Django的场景,核心优势:异步非阻塞,能在单线程中处理大量并发请求,大幅提升 IO 密集型任务的效率。比如同时请求 100 个接口,无需等待前一个完成。
一、aiohttp 模块
aiohttp 是Python中 基于asyncio实现 的 异步HTTP框架,主要有两大核心能力:
异步 HTTP 客户端: 替代同步的requests,用于异步发送HTTP请求(GET/POST),适合高并发的网络请求场景,如爬虫、API调用
异步 HTTP 服务器: 快速搭建异步Web服务,替代部分同步的Flask/Django的场景,适合IO密集型的Web应用
核心优势:异步非阻塞,能在单线程中处理大量并发请求,大幅提升 IO 密集型任务的效率。比如同时请求 100 个接口,无需等待前一个完成。适合高并发网络请求 / 轻量级 Web 服务
二、安装aiohttp
# 核心库
pip install aiohttp
# 安装加速依赖(可选,提升性能)
pip install aiohttp[speedups]
三、异步 HTTP 客户端
作用是 发起异步 HTTP/HTTPS 请求(GET/POST/PUT 等),管理连接池、Cookie、超时等
* aiohttp.ClientSession(): 必须用 async with 管理,自动关闭连接,避免资源泄露
* aiohttp.ClientTimeout(total=10): 设置超时,避免请求无限等待,total=10 表示总超时 10 秒
# 注意事项:
1. 避免在异步函数中调用同步阻塞函数,如 time.sleep()、requests.get(),会阻塞整个事件循环,改用 asyncio.sleep()
2. 复用 ClientSession:
ClientSession 是"可复用"的,不要为每个请求创建新的 ClientSession,一个业务流程中只创建一个ClientSession,利用连接池复用 TCP 连接,避免频繁创建 / 关闭导致的性能损耗
3. 异常处理:
即使开启raise_for_status,仍需捕获ClientError通用异常、网络异常、TimeoutError超时异常、JSON解析等
4. 必配 timeout:
无论场景如何,必须配置超时,防止请求挂起阻塞事件循环
3.1、ClientSession 会话管理
ClientSession 是 aiohttp的核心 ,所有请求都通过它发起,且必须配合**async/await**使用。ClientSession 是客户端的核心入口,用于管理 HTTP 会话的生命周期:
(1)内置连接池,复用 TCP 连接,减少握手开销
(2)自动维护 Cookie、请求头headers
(3)统一管理超时、代理、SSL 等配置
(4)必须通过 async with 异步上下文管理器使用,自动释放资源
3.1.1、初始化方法:ClientSession.__init__() 的核心参数
# 核心参数:
* base_url: "设置全局基础 URL"
- 作用: 所有请求的路径会自动拼接在该地址后,避免重复书写相同的域名/接口前缀,简化代码
- 默认值: None
* headers: "设置全局请求头,所有请求默认携带"
- 若单次请求单独配置headers,会覆盖全局同名请求头(非同名则合并)
- 典型场景: 统一设置 User-Agent、Accept、Content-Type、Token 等通用头
- 默认值: None
* cookies: "设置全局 Cookie,所有请求默认携带"
- ClientSession会"自动维护"响应返回的 Cookie,无需手动更新
- 典型场景: 请求需要登录态的接口,提前传入登录后的 Cookie
- 默认值: None
* timeout: "设置全局超时规则,控制请求的生命周期"
- 支持"简化配置"(直接传数字,总超时)和"精细化配置"(ClientTimeout,分阶段超时),单次请求可单独配置超时覆盖全局
- "ClientTimeout""精细化配置参数":
- connect=5: 建立TCP连接的超时时间
- sock_read=10: 从套接字读取响应数据的超时时间,None=使用total值
- sock_connect: 套接字连接超时,None=使用connect值
- total=30: 整个请求的生命周期超时,None=无限制
- "实际开发必须配置",避免因服务端无响应导致事件循环阻塞
- 默认值: sentinel 默认不超时,易导致请求无限挂起
* connector: "配置底层连接池,控制最大并发连接数、每个域名最大连接数、DNS 缓存、SSL 验证等"
- 高并发请求的核心优化参数
- 默认值:None(自动创建默认TCPConnector,全局最大连接 100)
- "TCPConnector核心参数":
- limit: 全局最大并发连接数,高并发建议根据服务器配置调整
- 默认: 100,0=无限制
- limit_per_host: 每个域名的最大连接数
- 默认: 0=使用 limit 值,建议设置10-20
- ttl_dns_cache: DNS缓存超时(秒)
- 默认: 300,0=禁用缓存,适合动态域名
- ssl: 是否验证 SSL 证书
- 默认: True,测试环境可设 False 忽略自签名证书
- proxy: 全局代理地址
* proxy: "设置全局代理,所有请求默认通过该代理发起"
- 支持HTTP 代理和SOCKS5 代理,单次请求可单独配置 proxy 覆盖全局
- 默认值: None
* proxy_auth: "为全局代理配置基础认证(用户名 + 密码),适用于需要账号密码的代理服务器"
- 默认值:None
* auth: 设置全局 HTTP 基础认证,所有请求自动在请求头添加Authorization: Basic xxx,无需手动拼接认证头
- 默认值: None
* raise_for_status: "设置自动触发异常"
- 若响应状态码为4xx(客户端错误)/5xx(服务器错误),会自动调用resp.raise_for_status(),抛出aiohttp.ClientResponseError,无需手动判断状态码
- 默认值: False
* auto_decompress: "是否自动解压响应内容,支持 gzip/deflate/brotli 三种压缩格式"
- 默认值: True 开启,无需手动处理解压,直接读取原始内容即可
- 若需要获取原始压缩数据(如做爬虫数据缓存),可设为 False
3.1.1.1、核心参数单个示例
# base_url
async def main():
# 配置基础URL为接口根地址
async with aiohttp.ClientSession(base_url="https://api.httpbin.org") as session:
# 实际请求:https://api.httpbin.org/get
async with session.get("/get") as resp:
print(await resp.json())
# 实际请求:https://api.httpbin.org/post
async with session.post("/post", json={"key": "val"}) as resp:
print(await resp.json())
# headers
async def main():
# 全局请求头:统一UA+指定接收JSON+携带令牌
global_headers = {
"User-Agent": "Python-Aiohttp/3.9.1",
"Accept": "application/json",
"X-Token": "abc123def456"
}
async with aiohttp.ClientSession(headers=global_headers) as session:
# 单次请求覆盖全局的Accept,新增Referer(最终头为合并+覆盖)
async with session.get("/get", headers={"Accept": "*/*", "Referer": "https://xxx.com"}) as resp:
print(dict(resp.request_info.headers)) # 查看最终发送的请求头
# Cookie
async def main():
# 全局Cookie:携带登录态uid和token
global_cookies = {"uid": "10086", "login_token": "xyz789"}
async with aiohttp.ClientSession(cookies=global_cookies) as session:
async with session.get("/cookies") as resp:
print(await resp.json()) # 查看服务端接收到的Cookie
# Timeout
# 简化配置:直接传数字,指定总超时,整个请求从连接到读取完成的总时间
async def main():
# 全局总超时10秒:所有请求若10秒内未完成则抛超时异常
async with aiohttp.ClientSession(timeout=10) as session:
async with session.get("/delay/15") as resp: # 接口延迟15秒返回
print(await resp.json()) # 会抛出asyncio.TimeoutError
# 精细化配置:用ClientTimeout分阶段控制(推荐,更灵活)
from aiohttp import ClientTimeout
async def main():
# 精细化超时配置:连接5秒+读取10秒+总超时30秒
timeout = ClientTimeout(
connect=5, # 建立TCP连接的超时时间
sock_read=10, # 从套接字读取响应数据的超时时间
total=30 # 整个请求的生命周期超时(兜底)
)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.get("/delay/8") as resp:
print(await resp.json()) # 读取超时(sock_read=10秒可正常,delay=12则超时)
# connector
# 高并发 + 禁用 DNS 缓存
from aiohttp import TCPConnector
async def main():
# 配置连接池:全局最大200连接,每个域名20连接,禁用DNS缓存
connector = TCPConnector(
limit=200,
limit_per_host=20,
ttl_dns_cache=0
)
async with aiohttp.ClientSession(connector=connector) as session:
# 高并发请求时,连接池会自动复用TCP连接,减少握手开销
tasks = [session.get("/get") for _ in range(30)]
results = await asyncio.gather(*tasks)
print(f"成功请求{len(results)}次")
# 忽略SSL 证书验证,测试环境专用
async def main():
# 禁用SSL验证(生产环境禁止使用!)
connector = TCPConnector(ssl=False)
async with aiohttp.ClientSession(connector=connector) as session:
async with session.get("https://自签名证书的域名.com") as resp:
print(resp.status)
# proxy
async def main():
async with aiohttp.ClientSession(
# HTTP代理(本地代理示例,根据实际代理地址修改)
proxy="http://127.0.0.1:7890"
# SOCKS5代理:proxy="socks5://127.0.0.1:7890"
) as session:
async with session.get("/ip") as resp:
print(await resp.json()) # 查看代理后的出口IP
# proxy_auth
from aiohttp import BasicAuth
async def main():
async with aiohttp.ClientSession(
proxy="http://127.0.0.1:7890",
# 代理认证:用户名admin,密码123456
proxy_auth=BasicAuth(login="admin", password="123456")
) as session:
async with session.get("/ip") as resp:
print(await resp.json())
# auth
from aiohttp import BasicAuth
async def main():
async with aiohttp.ClientSession(
# HTTP基础认证:用户名user,密码pass123
auth=BasicAuth("user", "pass123")
) as session:
async with session.get("/basic-auth/user/pass123") as resp:
print(await resp.json()) # 鉴权成功返回{"authenticated":true,"user":"user"}
# raise_for_status
async def main():
async with aiohttp.ClientSession(raise_for_status=True) as session:
try:
# 响应状态码404,会自动抛异常
async with session.get("/status/404") as resp:
data = await resp.json() # 不会执行到这一步
except aiohttp.ClientResponseError as e:
print(f"请求失败:{e.status} - {e.message}") # 捕获并处理异常
# auto_decompress
async def main():
# 关闭自动解压,获取原始gzip压缩数据
async with aiohttp.ClientSession(auto_decompress=False) as session:
async with session.get("/gzip") as resp:
raw_data = await resp.read() # 原始二进制压缩数据
print(f"原始数据长度:{len(raw_data)}")
# 开启自动解压(默认),直接读取解压后的文本
async with aiohttp.ClientSession(auto_decompress=True) as session:
async with session.get("/gzip") as resp:
text_data = await resp.text() # 已解压的文本
print(f"解压后文本长度:{len(text_data)}")
3.1.1.1、核心参数组合示例
结合高并发 + 超时控制 + 全局配置 + 代理 + 异常自动触发,可作为生产环境可用的ClientSession配置模板
import asyncio
import aiohttp
from aiohttp import ClientTimeout, TCPConnector, BasicAuth
async def fetch_url(session, url):
try:
async with session.get(url) as response:
return await response.json()
except Exception as e:
return f"请求失败: {str(e)}"
async def main():
# 1. 精细化超时配置
timeout = ClientTimeout(
connect=5, # 建立TCP连接的超时时间
sock_read=10, # 从套接字读取响应数据的超时时间
total=20 # 整个请求的生命周期超时
)
# 2. 高并发连接池配置
connector = TCPConnector(
limit=200, # 全局最大并发连接数
limit_per_host=20, # 每个域名的最大连接数
ttl_dns_cache=0 # DNS 缓存超时
)
# 3. 全局请求头
headers = {
"User-Agent": "Python-Aiohttp/3.9.1",
"Accept": "application/json",
"X-App-Version": "1.0.0"
}
# 4. 初始化ClientSession
try:
async with aiohttp.ClientSession(
base_url="https://api.example.com",
headers=headers,
timeout=timeout,
connector=connector,
proxy="http://proxy:8080", # 按需开启代理
# proxy_auth=BasicAuth("admin", "123456"), # 代理需要认证时开启
# auth=BasicAuth("user", "pass123"), # 接口需要基础认证时开启
raise_for_status=True,
auto_decompress=True,
trust_env=False
) as session:
# 高并发请求10个接口
tasks = [fetch_url(session, f"/get?num={i}") for i in range(10)]
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
except Exception as eg:
print(f"Partial failures: {eg}")
if __name__ == '__main__':
asyncio.run(main())
3.1.2、ClientSession 核心请求方法
GET / POST / PUT / DELETE / HEAD / OPTIONS所有请求方法的参数结构一致,核心参数通用,仅 HTTP 方法不同
以下以 session.get() 为例详解
# 核心参数:
* url: 请求目标 URL
- 类型: str/URL
* params: URL查询参数 "GET请求"
- 类型: dict/list/tuple/str
- 自动拼接为: "?key1=val1&key2=val2"
* data: 请求体的表单/二进制数据 "POST/PUT等请求"
- 类型: bytes/dict/FormData
* json: JSON格式请求体
- 自动序列化,设置 Content-Type: application/json
- 类型: dict/list
* headers: 单次请求头 "覆盖全局headers"
- 类型: dict
* cookies: 单次请求 Cookie "新增/覆盖全局CookieJar"
- 类型: dict
* timeout: 单次请求超时 "覆盖全局timeout"
- 类型: ClientTimeout/int
* proxy: 代理地址
- http://127.0.0.1:7890 或者 socks5://127.0.0.1:7890
* files: 上传文件 "POST/PUT常用"
- 类型: dict
* ssl: 是否开启SSL验证
- False: 禁用验证,适合自签名证书
- True: 开启验证,默认
- 类型: bool/SSLContext
* allow_redirects: 是否允许重定向(3xx 响应)
3.1.2.1、GET 请求
import asyncio
import aiohttp
# 全局超时配置
timeout = aiohttp.ClientTimeout(total=10)
async def demo_get(session):
params = {"name": "张三", "age": 20} # 查询参数:自动拼接为 ?name=张三&age=20
# 单次请求头(覆盖全局)
headers = {"Accept": "application/json"}
try:
# 2. 发起 GET 请求,await 等待响应
async with session.get(
url="https://httpbin.org/get",
params=params,
headers=headers,
timeout=timeout
) as response:
if response.status == 200:
text = await response.text() # 获取文本响应,如 HTML/JSON 字符串
# json_data = await response.json() # 如果响应是 JSON,用这个
# binary_data = await response.read() # 获取二进制数据(如图片)
return text
except aiohttp.ClientError as e:
raise aiohttp.ClientError(f"Error fetching {response.url}: {str(e)}")
async def main():
# 1. 创建 ClientSession(异步上下文管理器),自动管理连接池
async with aiohttp.ClientSession(
headers={"User-Agent": "aiohttp-demo/1.0"}, # 全局请求头
timeout=timeout
) as session:
await demo_get(session)
if __name__ == '__main__':
result = asyncio.run(main())
3.1.2.2、POST 请求
import asyncio
import aiohttp
# 全局超时配置
timeout = aiohttp.ClientTimeout(total=10)
async def demo_post(session):
# 2.1 JSON 请求体
json_data = {"name": "张三", "age": 20}
try:
async with session.post(
url="https://httpbin.org/post",
json=json_data, # 自动序列化 JSON,设置 Content-Type: application/json
timeout=timeout
) as response:
if response.status == 200:
return await response.json()
except aiohttp.ClientError as e:
raise aiohttp.ClientError(f"Error fetching {response.url}: {str(e)}")
# 2.2 表单请求体(设置 Content-Type: application/x-www-form-urlencoded)
form_data = {"gender": "male", "city": "Beijing"}
try:
async with session.post(
url="https://httpbin.org/post",
data=form_data,
timeout=timeout
) as response:
if response.status == 200:
return await response.json()
except aiohttp.ClientError as e:
raise aiohttp.ClientError(f"Error fetching {response.url}: {str(e)}")
# 2.3 多表单/文件上传(设置 Content-Type: multipart/form-data; boundary=----)
multipart_data = aiohttp.FormData()
multipart_data.add_field("username", "user")
multipart_data.add_field("password", "pass123456")
multipart_data.add_field("text_field", "测试文本")
# 上传文件(本地文件路径)
multipart_data.add_field(
"file_field",
open("demo.txt", "rb"), # 二进制模式打开
filename="demo.txt", # 文件名
content_type="text/plain" # MIME类型
)
try:
async with session.post(
url="https://httpbin.org/post",
data=multipart_data,
timeout=timeout
) as response:
if response.status == 200:
return await response.json()
except aiohttp.ClientError as e:
raise aiohttp.ClientError(f"Error fetching {response.url}: {str(e)}")
async def main():
# 1. 创建 ClientSession(异步上下文管理器),自动管理连接池
async with aiohttp.ClientSession(
headers={"User-Agent": "aiohttp-demo/1.0"}, # 全局请求头
timeout=timeout
) as session:
await demo_post(session)
if __name__ == '__main__':
result = asyncio.run(main())
3.1.2.3、并发请求
多个请求几乎同时发起,总耗时≈最慢的那个请求的耗时,而非所有请求耗时之和
import asyncio
import aiohttp
async def fetch_url(session, url):
async with session.get(url) as response:
if response.status != 200:
raise Exception(f"Error fetch {url}; status code: {response.status}")
# return await response.text()
return response.status
async def main():
url_list = [
"https://httpbin.org/get?num=1",
"https://httpbin.org/get?num=2",
"https://httpbin.org/get?num=3"
]
try:
async with aiohttp.ClientSession() as session:
tasks =[fetch_url(session, url) for url in url_list]
results = await asyncio.gather(*tasks, return_exceptions=True)
except Exception as eg:
print(f"Partial failures: {eg}")
for url, result in zip(url_list, results):
print(f"{url}: {result}")
if __name__ == '__main__':
asyncio.run(main())
3.1.2.4、请求重试(配合 tenacity)
import aiohttp
import asyncio
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
# 重试规则:最多3次,指数退避(1s→2s→4s),仅捕获ClientError
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=4),
retry=retry_if_exception_type(aiohttp.ClientError)
)
async def fetch_with_retry(session, url):
async with session.get(url, timeout=5) as resp:
resp.raise_for_status()
return await resp.text()
async def main():
async with aiohttp.ClientSession() as session:
try:
result = await fetch_with_retry(session, "https://httpbin.org/get")
print("请求成功:", result[:50])
except aiohttp.ClientError as e:
print("3次重试后仍失败:", e)
asyncio.run(main())
3.1.3、ClientSession 核心属性
# 核心属性:
* connector: 当前会话连接池管理器,可"查看/动态调整"连接池配置,如临时修改最大连接数
* timeout: 会话的全局超时配置
- 初始化时传入的 timeout 参数,若传数字会自动转为 ClientTimeout 对象
* cookie_jar: 会话的 Cookie 管理器,负责存储、发送、更新 Cookie,是"保持登录态的核心"
- session.cookie_jar.update_cookies({"token": "123"}): 手动添加/更新 Cookie(cookies为字典)
- clear(): 清空所有 Cookie
- filter_cookies(url): 筛选指定 URL 可发送的 Cookie
* closed: 会话是否已关闭
- True: 表示关闭,无法再发起请求
- False: 表示正常运行
* headers: 只读,会话的全局请求头,底层是有序字典,支持"大小写不敏感的键查找"
# base_url
async def main():
async with aiohttp.ClientSession(base_url="https://api.httpbin.org") as session:
# 查看基础URL(yarl.URL对象,可转字符串)
print("基础URL(原始):", session._base_url)
print("基础URL(字符串):", str(session._base_url))
# 拼接路径(等价于 session.get("/get"))
full_url = session._base_url / "get"
print("拼接后的完整URL:", full_url)
# headers
async def main():
global_headers = {"User-Agent": "Aiohttp-Demo", "X-Token": "123456"}
async with aiohttp.ClientSession(headers=global_headers) as session:
# 查看全局请求头(整体)
print("全局请求头:", dict(session.headers))
# 动态修改请求头
session.headers["User-Agent"] = "Demo-Aiohttp"
# 按键查找(大小写不敏感)
print("User-Agent:", session.headers.get("user-agent"))
print("X-Token:", session.headers["X-Token"])
# timeout
from aiohttp import ClientTimeout
async def main():
timeout = ClientTimeout(connect=5, sock_read=10, total=30)
async with aiohttp.ClientSession(timeout=timeout) as session:
# 查看全局超时配置
print("全局超时对象:", session.timeout)
print("连接超时(秒):", session.timeout.connect)
print("读取超时(秒):", session.timeout.sock_read)
print("总超时(秒):", session.timeout.total)
# connector
from aiohttp import TCPConnector
async def main():
connector = TCPConnector(limit=200, limit_per_host=20)
async with aiohttp.ClientSession(connector=connector) as session:
# 查看连接池配置
print("连接池类型:", type(session.connector))
print("全局最大连接数:", session.connector.limit)
print("单域名最大连接数:", session.connector.limit_per_host)
# 动态修改连接池配置(慎用,需确保线程安全)
session.connector.limit = 150
print("修改后全局最大连接数:", session.connector.limit)
# closed
async def main():
session = aiohttp.ClientSession()
# 初始状态:未关闭
print("会话是否关闭(初始):", session.closed)
# 发起请求
async with session.get("https://api.httpbin.org/get") as resp:
print("请求状态码:", resp.status)
# 手动关闭会话
await session.close()
print("会话是否关闭(关闭后):", session.closed)
# 尝试对已关闭的会话发起请求(会抛异常)
try:
async with session.get("https://api.httpbin.org/get") as resp:
pass
except RuntimeError as e:
print("异常:", e)
# cookie
async def main():
async with aiohttp.ClientSession() as session:
# 1. 手动添加Cookie
session.cookie_jar.update_cookies(
{"uid": "10086", "login_token": "abc789"},
url="https://api.httpbin.org" # 指定Cookie生效的URL(可选)
)
# 2. 查看所有Cookie
print("会话所有Cookie:", session.cookie_jar._cookies)
# 3. 筛选指定URL的Cookie
filtered_cookies = session.cookie_jar.filter_cookies("https://api.httpbin.org")
print("指定URL的Cookie:", {k: v.value for k, v in filtered_cookies.items()})
# 4. 发起请求,验证Cookie是否携带
async with session.get("https://api.httpbin.org/cookies") as resp:
print("服务端接收的Cookie:", await resp.json())
# 5. 清空Cookie
session.cookie_jar.clear()
print("清空后Cookie:", session.cookie_jar._cookies)
3.2、ClientResponse 响应对象
每次发起请求后,会返回 ClientResponse 对象,封装所有响应数据
# 核心方法:
"均需 await"
* text(encoding=None): 读取响应文本,自动解码,encoding指定编码,如utf-8
* json(encoding=None, loads=json.loads): 解析JSON响应,失败抛出JSONDecodeError异常
* read(): 读取二进制响应(适合图片/视频/文件)
* release(): 手动释放响应资源
- async with会自动调用,无需手动
* raise_for_status(): 手动触发 4xx/5xx 响应的异常(HTTPError)
* content.read(n=-1): 流式读取响应
- n: 指定读取字节数,-1 读取全部,适合大文件
# 核心属性:
"无需 await"
* status: HTTP 响应状态码(200=成功,404=资源未找到,500=服务器错误)
* reason: 状态码描述,200→"OK",404→"Not Found"
* headers: 响应头,字典格式 如: resp.headers["Content-Type"]
* cookies: 响应返回的Cookie 如: resp.cookies["token"].value
* content_type: 响应内容类型 如: application/json、text/html
* url: 最终请求的 URL(含重定向后的地址)
* history: 重定向历史 如: 301/302跳转记录
3.2.1、流式读取大文件
import aiohttp
import asyncio
async def download_large_file():
async with aiohttp.ClientSession() as session:
async with session.get("https://httpbin.org/stream/100", timeout=60) as resp:
# 流式读取(每次1024字节)
with open("large_file.txt", "wb") as f:
while True:
chunk = await resp.content.read(1024)
if not chunk:
break
f.write(chunk)
print("大文件下载完成")
asyncio.run(download_large_file())
四、异步 HTTP 服务器
搭建轻量级异步 Web 服务,处理路由、请求、响应、WebSocket 等
4.1、web.Application 核心类
创建异步 Web 应用实例,管理路由、中间件、配置、信号(生命周期事件)
4.1.1、初始化方法:web.Application(**kwargs)
| 参数名 | 类型 | 默认值 | 含义 |
|---|---|---|---|
| middlewares | list | [] | 中间件列表(全局请求 / 响应拦截,如日志、鉴权) |
| router | web.UrlDispatcher | UrlDispatcher() | 路由管理器(可自定义路由规则) |
| client_max_size | int | 1024*1024 | 最大请求体大小(默认 1MB,超出抛异常) |
| debug | bool | False | 调试模式(开启后显示详细错误栈) |
4.1.2、web.Application的核心方法
| 方法名 | 作用 |
|---|---|
| add_routes(routes) | 添加路由规则(接收 RouteTableDef 或路由列表) |
| router.add_get(path, handler) | 快捷添加 GET 路由(post/put/delete 同理) |
| run_app(app, **kwargs) | 运行应用(需导入 from aiohttp import web) |
4.2、web.Request 请求对象
web.Request 是 aiohttp 服务器端处理请求的核心对象,每个路由处理函数的第一个参数就是该对象,封装了客户端发起的所有请求数据(URL、参数、请求体、头信息、Cookie 等)
4.2.1、web.Request 核心属性 “无需await”
4.2.1.1、path 和 path_qs
* path: 请求的路径 "不含域名 和 查询参数"
* path_qs: 请求路径 + 查询参数,完整路径 "不含域名"
# 访问 http://127.0.0.1:8080/user/1001?name=张三
from aiohttp import web
async def handler(request):
print("path请求路径:", request.path) # /user/1001
print("path_as请求路径:", request.path_as) # /user/1001?name=张三
return web.Response(text=f"Path: {request.path}")
app = web.Application()
app.add_routes([web.get("/user/{user_id}", handler)])
web.run_app(app, host="127.0.0.1", port=8080)
4.2.1.2、query
URL查询参数(即**?key1=val1&key2=val2**部分),支持大小写不敏感查找
# 常用方法:
* get(key, default=None): 获取单个值,避免键不存在报错
* getall(key): 获取同名多值
# 访问 http://127.0.0.1:8080/query?name=张三&tag=1&tag=2
async def handler(request):
# 1. 获取单个查询参数
name = request.query.get("name", "未知用户")
age = request.query.get("age", 0) # 字符串类型,需手动转int
# 2. 获取同名多值(如 ?tag=1&tag=2&tag=3)
tags = request.query.getall("tag") # 返回列表:["1", "2", "3"]
# 3. 转为普通字典(同名多值仅保留第一个)
query_dict = dict(request.query) # 核心修正:替代不存在的 to_dict()
# 等价写法:query_dict = dict(request.query.items())
# 4. 手动处理同名多值,转为 {key: list} 格式的字典
query_multi_dict = {}
for key in request.query.keys():
query_multi_dict[key] = request.query.getall(key)
return web.json_response({
"single_value": {
"name": name,
"age": int(age) # 手动转换类型
},
"multi_value": {
"tags": tags
},
"query_dict": query_dict, # 单值字典
"query_multi_dict": query_multi_dict # 多值字典
})
app = web.Application()
app.add_routes([web.get("/query", handler)])
if __name__ == "__main__":
web.run_app(app, host="127.0.0.1", port=8080)
4.2.1.3、match_info
路由路径参数(即 /{user_id} 中的动态部分),通过键名获取值
# http://127.0.0.1:8080/user/10086,响应:User ID: 10086
async def handler(request):
# 获取路由参数({user_id})
user_id = request.match_info.get("user_id", "0")
# 直接通过[]获取(键不存在会抛KeyError,慎用)
# user_id = request.match_info["user_id"]
return web.Response(text=f"User ID: {user_id}")
app = web.Application()
app.add_routes([web.get("/user/{user_id}", handler)])
web.run_app(app, host="127.0.0.1", port=8080)
4.2.1.4、url
完整的请求 URL(含域名、路径、查询参数),支持拆解各部分
# 常用子属性:
* request.url.host: 域名 如:127.0.0.1
* request.url.port: 端口 如: 8080
* request.url.scheme: 协议 如: http/https
* request.url.query: 查询参数(同request.query)
# 访问 http://127.0.0.1:8080/url?a=1
async def handler(request):
return web.json_response({
"full_url": str(request.url),
"host": request.url.host,
"port": request.url.port,
"scheme": request.url.scheme
})
app = web.Application()
app.add_routes([web.get("/url", handler)])
web.run_app(app, host="127.0.0.1", port=8080)
4.2.1.5、method
获取大写的HTTP 请求方法
# http://127.0.0.1:8080/method,响应:Method: POST
async def handler(request):
return web.Response(text=f"Method: {request.method}")
app = web.Application()
app.add_routes([
web.post("/method", handler)
])
web.run_app(app, host="127.0.0.1", port=8080)
4.2.1.6、headers
获取请求头信息,支持大小写不敏感查找
# http://127.0.0.1:8080/headers
async def handler(request):
# 获取请求头(大小写不敏感)
user_agent = request.headers.get("User-Agent", "未知")
content_type = request.headers.get("Content-Type", "无")
token = request.headers.get("X-Token", "无")
return web.json_response({
"User-Agent": user_agent,
"Content-Type": content_type,
"X-Token": token
})
app = web.Application()
app.add_routes([web.get("/headers", handler)])
web.run_app(app, host="127.0.0.1", port=8080)
4.2.1.7、cookies
客户端携带的 Cookie,通过 key.value 获取值
# 客户端携带Cookie uid=1001;token=abc789 访问,响应:UID: 1001, Token: abc789
async def handler(request):
# 获取Cookie
uid = request.cookies.get("uid", "").value if "uid" in request.cookies else "无"
token = request.cookies.get("token", "").value if "token" in request.cookies else "无"
return web.Response(text=f"UID: {uid}, Token: {token}")
app = web.Application()
app.add_routes([web.get("/cookies", handler)])
web.run_app(app, host="127.0.0.1", port=8080)
4.2.1.8、remote
客户端的 IP 地址,用于记录访问日志、限制 IP 等
async def handler(request):
return web.Response(text=f"Your IP: {request.remote}")
app = web.Application()
app.add_routes([web.get("/ip", handler)])
web.run_app(app, host="127.0.0.1", port=8080)
4.2.2、web.Request 核心方法 “必须加await”
4.2.2.1、json()
解析 JSON 格式的请求体(Content-Type 需为 application/json),失败抛 JSONDecodeError
# 测试:POST 提交 {"name":"张三","age":20},响应成功;提交非 JSON 数据,返回 400 错误
async def handler(request):
try:
# 解析JSON请求体
data = await request.json()
name = data.get("name")
age = data.get("age")
return web.json_response({"code": 200, "msg": "成功", "data": data})
except Exception as e:
return web.json_response({"code": 400, "msg": f"解析失败:{str(e)}"}, status=400)
app = web.Application()
app.add_routes([web.post("/json", handler)])
web.run_app(app, host="127.0.0.1", port=8080)
4.2.2.2、post()
解析表单格式的请求体
Content-Type 为 application/x-www-form-urlencoded 或 multipart/form-data
async def handler(request):
# 解析表单数据
form_data = await request.post()
username = form_data.get("username")
password = form_data.get("password")
return web.json_response({"username": username, "password": password})
app = web.Application()
app.add_routes([web.post("/form", handler)])
web.run_app(app, host="127.0.0.1", port=8080)
4.2.2.3、files()
读取原始二进制请求体(通用方法,适合非 JSON / 表单的自定义格式,如 protobuf、二进制文件)
async def handler(request):
# 读取原始二进制请求体
raw_data = await request.read()
return web.Response(
text=f"原始数据长度:{len(raw_data)},内容:{raw_data.decode('utf-8')}",
status=200
)
app = web.Application()
app.add_routes([web.post("/raw", handler)])
web.run_app(app, host="127.0.0.1", port=8080)
4.2.2.4、content.read(n=-1)
流式读取请求体,适合大文件,避免一次性加载到内存n 为读取字节数,-1表示读取全部
async def handler(request):
# 流式读取请求体(每次1024字节)
total_size = 0
with open("large_file.bin", "wb") as f:
while True:
chunk = await request.content.read(1024)
if not chunk:
break
total_size += len(chunk)
f.write(chunk)
return web.Response(text=f"大文件接收完成,总大小:{total_size} 字节")
app = web.Application(client_max_size=1024*1024*100) # 允许最大100MB请求体
app.add_routes([web.post("/large_upload", handler)])
web.run_app(app, host="127.0.0.1", port=8080)
4.2.3、web.Response() 响应对象
aiohttp 服务端构建 HTTP 响应的基础类,所有路由处理函数最终都需要返回一个 Response 实例(或其子类如 json_response/FileResponse),它封装了响应状态码、响应头、响应体等所有响应相关的信息
| 参数名 | 类型 | 默认值 | 核心含义 & 使用场景 |
|---|---|---|---|
text |
str |
None |
响应体为文本内容(自动编码为 bytes,默认 utf-8),适合返回纯文本、HTML 等 |
body |
bytes/IOBase |
None |
响应体为二进制数据(优先级高于 text),适合返回自定义二进制、文件流等 |
status |
int |
200 |
HTTP 响应状态码(如 200 = 成功,400 = 参数错误,404 = 未找到,500 = 服务器错误) |
reason |
str |
None |
状态码描述(如 404 对应 “Not Found”,默认自动匹配状态码) |
headers |
dict/CIMultiDict |
None |
响应头(如 Content-Type、Set-Cookie、Cache-Control 等) |
content_type |
str |
None |
响应体的 MIME 类型(如 text/html、application/json,优先级高于 headers 中的 Content-Type) |
charset |
str |
utf-8 |
文本响应的编码(仅对 text 参数生效) |
compress |
int/bool |
False |
是否压缩响应体(True = 默认压缩级别,6 = 自定义级别,支持 gzip/deflate) |
set_cookie |
str/Tuple |
None |
快捷设置单个 Cookie(替代手动在 headers 中加 Set-Cookie) |
set_cookies |
Iterable |
() |
批量设置多个 Cookie |
headers |
dict |
None |
自定义响应头(如跨域的 Access-Control-Allow-Origin) |
chunked |
bool |
False |
是否启用分块传输编码(Transfer-Encoding: chunked),适合大响应体(如流式返回) |
timeout |
float |
None |
响应发送超时时间(秒) |
4.2.3.1、基础文本响应
返回纯文本、HTML 等文本类型响应,核心用 text 参数,自动处理编码和 Content-Type
from aiohttp import web
async def text_response_handler(request):
# 1. 纯文本响应(默认 Content-Type: text/plain; charset=utf-8)
# return web.Response(text="Hello, aiohttp!")
# 2. HTML 响应(指定 content_type 为 text/html)
html_content = """
<html>
<head><title>Test Page</title></head>
<body><h1>Hello, aiohttp!</h1></body>
</html>
"""
return web.Response(
text=html_content,
content_type="text/html", # 关键:指定HTML类型
status=200 # 状态码默认200
)
app = web.Application()
app.add_routes([web.get("/text", text_response_handler)])
web.run_app(app, host="127.0.0.1", port=8080)
4.2.3.2、错误响应(4xx/5xx)
返回客户端 / 服务器错误响应,核心是设置 status 状态码和错误信息
async def error_response_handler(request):
# 400 参数错误
if "name" not in request.query:
return web.Response(
text=json.dumps({"code": 400, "msg": "name参数必传"}, ensure_ascii=False),
content_type="application/json",
status=400 # 关键:400状态码
)
# 500 服务器错误
try:
1 / 0 # 模拟异常
except Exception as e:
return web.Response(
text=json.dumps({"code": 500, "msg": f"服务器错误:{str(e)}"}, ensure_ascii=False),
content_type="application/json",
status=500
)
app.add_routes([web.get("/error", error_response_handler)])
4.2.3.3、流式响应(大文件 / 实时数据)
启用分块传输(chunked=True),流式返回数据(避免一次性加载大文件到内存)
async def stream_response_handler(request):
# 构建流式响应(生成器返回分块数据)
async def stream_generator():
# 模拟逐行返回大文件内容
with open("large_file.txt", "r", encoding="utf-8") as f:
for line in f:
yield line.encode("utf-8") # 转为二进制分块
return web.Response(
body=stream_generator(), # 流式响应体(生成器)
content_type="text/plain",
chunked=True # 启用分块传输
)
app.add_routes([web.get("/stream", stream_response_handler)])
4.2.4、web.json_response() 响应对象
| 参数名 | 类型 | 默认值 | 核心含义 & 使用场景 |
|---|---|---|---|
data |
dict/list/str/int |
无(必填) | 要返回的 JSON 数据(支持任意可序列化的 Python 对象) |
status |
int |
200 |
HTTP 响应状态码(如 200 = 成功,400 = 参数错误,500 = 服务器错误) |
headers |
dict/CIMultiDict |
None |
自定义响应头(如跨域头、Token、请求 ID 等) |
content_type |
str |
application/json |
JSON 响应的 MIME 类型(默认即可,无需修改) |
charset |
str |
utf-8 |
JSON 字符串的编码(确保中文 / 特殊字符正常显示) |
dumps |
Callable |
json.dumps |
自定义 JSON 序列化函数(如用 ujson 提升性能) |
**kwargs |
- | - | 继承 web.Response 的其他参数(如 reason/compress/set_cookie 等) |
4.2.4.1、 基础 JSON 响应
from aiohttp import web
async def basic_json_handler(request):
# 业务数据(任意可序列化的Python对象)
response_data = {
"code": 200, # 自定义业务码
"msg": "请求成功", # 提示信息
"data": { # 业务数据体
"user_id": 10086,
"username": "张三",
"age": 25,
"tags": ["Python", "aiohttp"]
}
}
# 构建JSON响应(核心:仅需传入data)
return web.json_response(
data=response_data, # 必填:要序列化的JSON数据
status=200, # 可选:HTTP状态码,默认200
headers={ # 可选:自定义响应头
"X-Request-Id": "abc123456", # 请求唯一标识
"X-App-Version": "1.0.0" # 应用版本
}
)
app = web.Application()
app.add_routes([web.get("/basic-json", basic_json_handler)])
if __name__ == "__main__":
web.run_app(app, host="127.0.0.1", port=8080)
4.2.4.2、 错误 JSON 响应(4xx/5xx)
# 访问 http://127.0.0.1:8080/error-json(无 username 参数):返回 400 状态码 + 错误 JSON
# 访问 http://127.0.0.1:8080/error-json?username=张三:返回 500 状态码 + 异常信息
async def error_json_handler(request):
# 模拟参数校验失败
username = request.query.get("username")
if not username:
# 400 参数错误响应
return web.json_response(
data={
"code": 40001, # 自定义错误码(区分不同错误)
"msg": "用户名不能为空",
"data": None
},
status=400, # HTTP状态码:400 Bad Request
reason="Missing Required Parameter" # 状态码描述(可选)
)
# 模拟服务器异常
try:
1 / 0 # 故意触发异常
except Exception as e:
# 500 服务器错误响应
return web.json_response(
data={
"code": 50001,
"msg": f"服务器内部错误:{str(e)}",
"data": None
},
status=500 # HTTP状态码:500 Internal Server Error
)
app.add_routes([web.get("/error-json", error_json_handler)])
4.2.4.2、 自定义 JSON 序列化函数
# 先安装:pip install ujson
import ujson
async def custom_dumps_handler(request):
data = {"name": "李四", "score": 95.5}
return web.json_response(
data=data,
dumps=ujson.dumps # 自定义序列化函数(替换默认的json.dumps)
)
app.add_routes([web.get("/custom-dumps", custom_dumps_handler)])
4.2.5、web.FileResponse() 响应对象
web.Response 的子类,专门用于高效返回文件(如图片、视频、文档、压缩包等)
关键特性:
1、自动推断 MIME 类型:根据文件后缀自动设置 Content-Type(如 .jpg → image/jpeg,.zip → application/zip);
2、断点续传:默认支持 Range 请求(如视频播放时拖拽进度条、大文件断点下载);
3、文件不存在处理:若 path 指向的文件不存在,自动抛出 web.HTTPNotFound 异常;
4、跨平台兼容:支持 Windows/macOS/Linux 的文件路径格式。
| 参数名 | 类型 | 默认值 | 核心含义 & 使用场景 |
|---|---|---|---|
path |
str/Path |
无(必填) | 要返回的文件路径(绝对路径 / 相对路径均可) |
status |
int |
200 |
HTTP 响应状态码(文件存在 = 200,不存在 = 404) |
headers |
dict/CIMultiDict |
None |
自定义响应头(如强制下载的 Content-Disposition) |
content_type |
str |
None |
手动指定文件的 MIME 类型(默认自动推断) |
chunk_size |
int |
262144 (256KB) |
流式传输的分块大小(可根据文件类型调整) |
filename |
str |
None |
下载时显示的文件名(覆盖原文件名) |
last_modified |
datetime |
None |
手动指定文件最后修改时间(默认读取文件元数据) |
cache_control |
str |
None |
缓存控制头(如 max-age=3600,控制浏览器缓存) |
allow_range |
bool |
True |
是否支持断点续传(Range 请求),默认开启 |
4.2.5.1、基础文件预览(图片 / 视频 / 文档)
返回文件供浏览器直接预览(如图片、HTML、PDF),无需下载
# 访问 http://127.0.0.1:8080/preview-image,浏览器直接显示图片
from aiohttp import web
from pathlib import Path
# 确保文件存在(test.jpg,可替换为自己的文件路径)
FILE_PATH = Path(__file__).parent / "test.jpg"
async def file_preview_handler(request):
# 构建FileResponse(核心:仅需传入文件路径)
return web.FileResponse(
path=FILE_PATH, # 必填:文件路径(Path对象/字符串均可)
status=200, # 可选:默认200
cache_control="max-age=3600" # 可选:浏览器缓存1小时
)
app = web.Application()
app.add_routes([web.get("/preview-image", file_preview_handler)])
if __name__ == "__main__":
web.run_app(app, host="127.0.0.1", port=8080)
4.2.5.2、文件强制下载(指定文件名)
让浏览器触发文件下载(而非预览),并自定义下载文件名
# 访问 http://127.0.0.1:8080/download-zip,浏览器自动下载文件,文件名显示为 我的压缩包.zip(而非原文件名 test.zip)
from aiohttp import web
from pathlib import Path
async def file_download_handler(request):
# 原文件:test.zip,下载时显示为 "我的压缩包.zip"
zip_path = Path(__file__).parent / "input" / "image_0.jpg"
# 构建下载响应
return web.FileResponse(
path=zip_path,
headers={
# 强制下载(优先级高于浏览器默认预览行为)
"Content-Disposition": 'attachment; filename="我的压缩包.zip"'
}
)
app = web.Application()
app.add_routes([web.get("/download-zip", file_download_handler)])
if __name__ == "__main__":
web.run_app(app, host="127.0.0.1", port=8080)
4.2.5.3、大文件流式传输(支持断点续传)
返回超大文件(如 GB 级视频 / 压缩包),利用 FileResponse 的流式传输和断点续传特性
1、浏览器播放视频时,可拖拽进度条(断点续传)
2、下载大文件时,中断后可继续下载(无需重新开始)
3、内存占用始终为分块大小(1MB),不会加载整个文件到内存
async def large_file_handler(request):
# 超大文件路径(如 2GB 的视频文件)
large_file_path = Path(__file__).parent / "large_video.mp4"
return web.FileResponse(
path=large_file_path,
chunk_size=1048576, # 调整分块大小为 1MB(默认256KB,大文件可调大)
allow_range=True, # 开启断点续传(默认True,可省略)
content_type="video/mp4" # 手动指定MIME类型(确保视频播放器识别)
)
app.add_routes([web.get("/large-video", large_file_handler)])
4.2.5.4、处理文件不存在的场景
主动捕获文件不存在的异常,返回自定义 JSON 错误响应
async def safe_file_handler(request):
file_path = Path(__file__).parent / "non_exist_file.txt"
try:
# 尝试返回文件
return web.FileResponse(path=file_path)
except web.HTTPNotFound:
# 文件不存在时,返回自定义JSON错误
return web.json_response(
data={
"code": 40401,
"msg": "文件不存在",
"data": None
},
status=404
)
app.add_routes([web.get("/safe-file", safe_file_handler)])
4.2.5.5、限制文件访问(权限校验)
async def auth_file_handler(request):
# 1. 权限校验:获取请求头中的Token
token = request.headers.get("X-Token")
if token != "valid_token_123":
# 无权限:返回403错误
return web.json_response(
data={"code": 40301, "msg": "无访问权限"},
status=403
)
# 2. 权限通过:返回文件
file_path = Path(__file__).parent / "secret_doc.pdf"
return web.FileResponse(path=file_path)
app.add_routes([web.get("/auth-file", auth_file_handler)])
4.2.5.5、临时文件清理
若返回的是临时文件,建议在响应完成后清理
import aiofiles.os
async def temp_file_handler(request):
temp_path = Path("temp_file.txt")
resp = web.FileResponse(path=temp_path)
# 响应完成后删除临时文件
async def cleanup():
await aiofiles.os.remove(temp_path)
request.loop.create_task(cleanup())
return resp
五、客户端 + 服务端 完整案例
5.1、GET请求
客户端:
import asyncio
import aiohttp
# 全局超时配置
timeout = aiohttp.ClientTimeout(total=10)
async def demo_get(session):
params = {"name": "张三", "age": 20} # 查询参数:自动拼接为 ?name=张三&age=20
# 单次请求头(覆盖全局)
headers = {"Accept": "application/json"}
try:
# 2. 发起 GET 请求,await 等待响应
async with session.get(
url="http://127.0.0.1:8080/get",
params=params,
headers=headers,
timeout=timeout
) as response:
if response.status == 200:
return await response.text() # 获取文本响应,如 HTML/JSON 字符串
except aiohttp.ClientError as e:
raise aiohttp.ClientError(f"Error fetching {response.url}: {str(e)}")
async def main():
# 1. 创建 ClientSession(异步上下文管理器),自动管理连接池
async with aiohttp.ClientSession(
headers={"User-Agent": "aiohttp-demo/1.0"}, # 全局请求头
timeout=timeout
) as session:
res = await demo_get(session)
print("获取到服务器的返回结果:")
print(res)
if __name__ == '__main__':
asyncio.run(main())
服务端:
from aiohttp import web
# 1. 定义请求处理函数
async def handle(request):
"""
使用web.request解析请求传递到服务端的参数
"""
name = request.query.get('name', '未知用户')
age = request.query.get('age', 0)
print(f"接收到的参数为:{request.query}")
return web.Response(
text=f"请求用户为:{name}, 年龄是:{age}",
status=200
)
# 2. 创建应用实例
app = web.Application(
client_max_size=2*1024*1024, # 最大客户端请求体大小为2MB
)
# 3. 注册路由
app.add_routes(
[web.get('/get', handle)] # GET方法访问根路径时调用handle函数
)
if __name__ == '__main__':
# 5. 启动服务
web.run_app(
app,
host='127.0.0.1', # 监听地址
port=8080, # 监听端口
access_log=web.access_logger, # 访问日志记录器
)
5.2、POST 请求 - 表单请求体
客户端:
import asyncio
import json
import aiohttp
# 全局超时配置
timeout = aiohttp.ClientTimeout(total=10)
async def demo_post(session):
form_data = {"gender": "male", "city": "Beijing"}
# 单次请求头
headers = {"Accept": "application/x-www-form-urlencoded"}
try:
# 2. 发起 post 请求,await 等待响应
async with session.post(
url="http://127.0.0.1:8080/post",
data=form_data,
headers=headers,
timeout=timeout
) as response:
if response.status == 200: # HTTP 状态码判断
return await response.json() # 获取服务端返回的 JSON 响应数据
except aiohttp.ClientError as e:
raise aiohttp.ClientError(f"Error fetching {response.url}: {str(e)}")
async def main():
# 1. 创建 ClientSession(异步上下文管理器),自动管理连接池
async with aiohttp.ClientSession(
headers={"User-Agent": "aiohttp-demo/1.0"}, # 全局请求头
timeout=timeout
) as session:
result = await demo_post(session)
print("获取到服务器的返回结果:")
print(json.dumps(result, indent=2, ensure_ascii=False))
if __name__ == '__main__':
asyncio.run(main())
服务端:
from aiohttp import web
# 1. 定义请求处理函数
async def handle(request):
"""
使用web.request解析请求传递到服务端的参数
案例中客户端发送了 普通表单数据
"""
# print(request.method)
# print(request.headers)
form_data = await request.post()
gender = form_data.get("gender")
city = form_data.get("city")
print(f"异步服务端接收到的参数: gender={gender}, city={city}")
response_data = {
"code": "200",
"msg": "请求成功",
"data": {
"gender": gender,
"city": city
}
}
# 使用json_response响应客户端请求
return web.json_response(
data=response_data, # 响应数据
status=200
)
# 2. 创建应用实例
app = web.Application(
debug=False, # 启用调试模式
client_max_size=2*1024*1024, # 最大客户端请求体大小为2MB
)
# 3. 注册路由
app.add_routes(
[web.post('/post', handle)] # GET方法访问根路径时调用handle函数
)
if __name__ == '__main__':
# 5. 启动服务
web.run_app(
app,
host='127.0.0.1', # 监听地址
port=8080, # 监听端口
access_log=web.access_logger, # 访问日志记录器
)
5.3、POST 请求 - form-data请求体
import asyncio
import json
import aiohttp
# 全局超时配置
timeout = aiohttp.ClientTimeout(total=10)
async def demo_post(session):
multipart_data = aiohttp.FormData()
multipart_data.add_field("username", "user")
multipart_data.add_field("password", "pass123456")
# 单次请求头
headers = {"Content-Type": "application/x-www-form-urlencoded"}
try:
# 2. 发起 post 请求,await 等待响应
async with session.post(
url="http://127.0.0.1:8080/post",
data=multipart_data,
headers=headers,
timeout=timeout
) as response:
if response.status == 200: # HTTP 状态码判断
return await response.json() # 获取服务端返回的 JSON 响应数据
except aiohttp.ClientError as e:
raise aiohttp.ClientError(f"Error fetching {response.url}: {str(e)}")
async def main():
# 1. 创建 ClientSession(异步上下文管理器),自动管理连接池
async with aiohttp.ClientSession(
headers={"User-Agent": "aiohttp-demo/1.0"}, # 全局请求头
timeout=timeout
) as session:
result = await demo_post(session)
print("获取到服务器的返回结果:")
print(json.dumps(result, indent=2, ensure_ascii=False))
if __name__ == '__main__':
asyncio.run(main())
from aiohttp import web
# 1. 定义请求处理函数
async def handle(request):
"""
使用web.request解析请求传递到服务端的参数
案例中客户端发送了 form-data表单数据
"""
form_data = await request.post()
username = form_data.get("username")
password = form_data.get("password")
print(f"异步服务端接收到的参数: gender={username}, password={password}")
response_data = {
"code": "200",
"msg": "请求成功",
"data": {
"username": username,
"password": password
}
}
# 使用json_response响应客户端请求
return web.json_response(
data=response_data, # 响应数据
status=200
)
# 2. 创建应用实例
app = web.Application(
debug=False, # 启用调试模式
client_max_size=2*1024*1024, # 最大客户端请求体大小为2MB
)
# 3. 注册路由
app.add_routes(
[web.post('/post', handle)] # GET方法访问根路径时调用handle函数
)
if __name__ == '__main__':
# 5. 启动服务
web.run_app(
app,
host='127.0.0.1', # 监听地址
port=8080, # 监听端口
access_log=web.access_logger, # 访问日志记录器
)
5.4、POST 请求 -json请求体
客户端:
import asyncio
import json
import aiohttp
# 全局超时配置
timeout = aiohttp.ClientTimeout(total=10)
async def demo_post(session):
json_data = {"name": "张三", "age": 20}
# 单次请求头
headers = {"Accept": "application/json"}
try:
# 2. 发起 GET 请求,await 等待响应
async with session.post(
url="http://127.0.0.1:8080/post",
json=json_data,
headers=headers,
timeout=timeout
) as response:
if response.status == 200: # HTTP 状态码判断
return await response.json() # 获取服务端返回的 JSON 响应数据
except aiohttp.ClientError as e:
raise aiohttp.ClientError(f"Error fetching {response.url}: {str(e)}")
async def main():
# 1. 创建 ClientSession(异步上下文管理器),自动管理连接池
async with aiohttp.ClientSession(
headers={"User-Agent": "aiohttp-demo/1.0"}, # 全局请求头
timeout=timeout
) as session:
result = await demo_post(session)
print("获取到服务器的返回结果:")
print(json.dumps(result, indent=2, ensure_ascii=False))
if __name__ == '__main__':
asyncio.run(main())
服务端:
from aiohttp import web
# 1. 定义请求处理函数
async def handle(request):
"""
使用web.request解析请求传递到服务端的参数
案例中客户端发送了post请求
"""
data = await request.json()
name = data.get('name')
age = data.get('age')
print(f"异步服务端接收到的参数: name={name}, age={age}")
response_data = {
"code": "200",
"msg": "请求成功",
"data": {
"username": name,
"user_age": age
}
}
# 使用json_response响应客户端请求
return web.json_response(
data=response_data, # 响应数据
status=200
)
# 2. 创建应用实例
app = web.Application(
debug=False, # 启用调试模式
client_max_size=2*1024*1024, # 最大客户端请求体大小为2MB
)
# 3. 注册路由
app.add_routes(
[web.get('/post', handle)] # GET方法访问根路径时调用handle函数
)
if __name__ == '__main__':
# 5. 启动服务
web.run_app(
app,
host='127.0.0.1', # 监听地址
port=8080, # 监听端口
access_log=web.access_logger, # 访问日志记录器
)
六、WebSocket 异步通信(客户端 + 服务器)
服务器端:
from aiohttp import web
async def websocket_handler(request):
# 升级为WebSocket连接
ws = web.WebSocketResponse()
await ws.prepare(request)
# 监听消息
async for msg in ws:
if msg.type == web.WSMsgType.TEXT:
# 回复消息
print(f"服务器收到消息:{msg.data}")
await ws.send_str(f"thanks, hava a good day!")
if msg.data == "close":
await ws.close()
elif msg.type == web.WSMsgType.ERROR:
print(f"WebSocket错误:{ws.exception()}")
print("WebSocket连接关闭")
return ws
app = web.Application()
app.add_routes([web.get("/ws", websocket_handler)])
web.run_app(app, host="127.0.0.1", port=8080)
客户端:
import aiohttp
import asyncio
async def websocket_client():
async with aiohttp.ClientSession() as session:
# 连接WebSocket服务器
async with session.ws_connect("http://127.0.0.1:8080/ws") as ws:
# 发送消息
await ws.send_str("Hello, WebSocket!")
# 接收回复
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
print(f"客户端收到:{msg.data}")
if msg.data == "服务器收到:close":
await ws.close()
break
elif msg.type == aiohttp.WSMsgType.ERROR:
break
asyncio.run(websocket_client())
更多推荐

所有评论(0)