Python实现服务端发送 Server-Sent Events (SSE) 和客户端接收 SSE
在 Python 中可以使用aiohttp库来实现服务端发送 Server-Sent Events (SSE) 和客户端接收 SSE。以下是一个简单的 SSE 客户端和服务端实现示例。
·
在 Python 中可以使用 aiohttp
库来实现服务端发送 Server-Sent Events (SSE) 和客户端接收 SSE。以下是一个简单的 SSE 客户端和服务端实现示例。
服务端代码 (SSE 发送)
import asyncio
from aiohttp import web
async def sse_handler(request):
response = web.StreamResponse(
status=200,
reason='OK',
headers={
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
}
)
await response.prepare(request)
# 持续发送消息
for i in range(5):
await response.write(f"data: Hello {i}\n\n".encode('utf-8'))
await asyncio.sleep(1) # 每秒发送一条消息
return response
app = web.Application()
app.router.add_get('/sse', sse_handler)
if __name__ == '__main__':
web.run_app(app, host='127.0.0.1', port=8080)
客户端代码 (SSE 接收)
import asyncio
import aiohttp
async def sse_client():
url = 'http://127.0.0.1:8080/sse'
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
async for line in resp.content:
if line.startswith(b'data:'):
message = line.decode('utf-8').strip().lstrip('data: ')
print(f"Received message: {message}")
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(sse_client())
解释:
-
服务端:
- 使用
aiohttp
创建一个简单的 SSE 服务器,持续发送消息。 - 服务器通过
StreamResponse
实现 SSE 规范,将消息以data:
格式推送给客户端。 - 每秒钟发送一条消息,共发送 5 条。
- 使用
-
客户端:
- 客户端使用
aiohttp.ClientSession()
连接到 SSE 服务。 - 通过
async for
循环持续接收消息,并从服务器接收到的内容中提取消息数据。
- 客户端使用
你可以将上述代码分别保存为不同的文件并运行。首先运行服务端,然后运行客户端来接收消息。
更多推荐
所有评论(0)