在 Python 中可以使用 aiohttp 库来实现服务端发送 Server-Sent Events (SSE) 和客户端接收 SSE。以下是一个简单的 SSE 客户端和服务端实现示例。

服务端代码 (SSE 发送)

import asyncio
from aiohttp import web

async def sse_handler(request):
    response = web.StreamResponse(
        status=200,
        reason='OK',
        headers={
            'Content-Type': 'text/event-stream',
            'Cache-Control': 'no-cache',
            'Connection': 'keep-alive',
        }
    )

    await response.prepare(request)

    # 持续发送消息
    for i in range(5):
        await response.write(f"data: Hello {i}\n\n".encode('utf-8'))
        await asyncio.sleep(1)  # 每秒发送一条消息

    return response

app = web.Application()
app.router.add_get('/sse', sse_handler)

if __name__ == '__main__':
    web.run_app(app, host='127.0.0.1', port=8080)

客户端代码 (SSE 接收)

import asyncio
import aiohttp

async def sse_client():
    url = 'http://127.0.0.1:8080/sse'
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            async for line in resp.content:
                if line.startswith(b'data:'):
                    message = line.decode('utf-8').strip().lstrip('data: ')
                    print(f"Received message: {message}")

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(sse_client())

解释:

  1. 服务端:

    • 使用 aiohttp 创建一个简单的 SSE 服务器,持续发送消息。
    • 服务器通过 StreamResponse 实现 SSE 规范,将消息以 data: 格式推送给客户端。
    • 每秒钟发送一条消息,共发送 5 条。
  2. 客户端:

    • 客户端使用 aiohttp.ClientSession() 连接到 SSE 服务。
    • 通过 async for 循环持续接收消息,并从服务器接收到的内容中提取消息数据。

你可以将上述代码分别保存为不同的文件并运行。首先运行服务端,然后运行客户端来接收消息。

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐