Python msgpack-rpc 模块 getattr 深度解析
1.2 MessagePack 技术栈MessagePack(简称 msgpack)是一种高效的二进制序列化格式,由 Sadayuki Furuhashi 于 2011 年创建。它的核心优势在于:MessagePack 核心特性高效二进制编码零拷贝序列化流式处理支持固定格式 Header自描述类型小整数优化直接内存操作避免临时对象内存池重用二、深度剖析:msgpack-rpc 的架构设计2.1 m
·
一、追根溯源:从 RPC 演进到 msgpack-rpc
1.1 RPC 技术发展时间轴
timeline
title RPC 技术演进历程
section 早期阶段 (1980s-1990s)
Sun RPC (1985) : 基于 XDR 的经典实现
CORBA (1991) : 跨语言对象模型
DCOM (1996) : Microsoft 分布式组件
section Web 服务时代 (2000s)
XML-RPC (1998) : 基于 XML 的简单协议
SOAP (1998) : 企业级 Web 服务
gRPC (2015) : Google 高性能框架
section 轻量级时代 (2010s+)
MessagePack (2011) : 高效二进制序列化
msgpack-rpc (2011) : 轻量级 RPC 协议
JSON-RPC 2.0 (2013) : JSON 标准协议
1.2 MessagePack 技术栈
MessagePack(简称 msgpack)是一种高效的二进制序列化格式,由 Sadayuki Furuhashi 于 2011 年创建。它的核心优势在于:
- 体积小:比 JSON 小 20-50%
- 速度快:解析速度比 JSON 快 10-100 倍
- 跨语言:支持 50+ 种编程语言
- 类型丰富:支持扩展类型和二进制数据
二、深度剖析:msgpack-rpc 的架构设计
2.1 msgpack-rpc 协议架构
2.2 __getattr__ 的动态代理机制
__getattr__ 是 Python 的魔法方法,当访问不存在的属性时被调用。msgpack-rpc 利用这一特性实现了动态的远程方法调用:
class Client:
def __init__(self, transport, packer=None, unpacker=None):
self._transport = transport
self._next_msgid = 0
self._responses = {}
self._pending = []
def __getattr__(self, name):
"""
动态创建远程方法调用
当访问 client.remote_method 时,如果属性不存在,
则调用 __getattr__ 返回一个闭包函数
"""
def remote_method(*args):
# 创建消息ID
msgid = self._next_msgid
self._next_msgid += 1
# 构建请求消息 [type, msgid, method, params]
request = [REQUEST, msgid, name, args]
# 序列化并发送
data = self._packer.pack(request)
self._transport.send(data)
# 创建并返回 Future 对象
future = Future(msgid)
self._responses[msgid] = future
return future
return remote_method
2.3 请求-响应时序图
三、核心实现:完整的 msgpack-rpc 示例
3.1 完整的服务器实现
#!/usr/bin/env python3
"""
msgpack-rpc 服务器示例
演示动态方法注册和异步处理机制
"""
import msgpack
import socket
import threading
import struct
import time
from typing import Any, Dict, List, Optional, Callable
from concurrent.futures import ThreadPoolExecutor
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class RPCServer:
"""
MessagePack-RPC 服务器实现
特性:
1. 支持动态方法注册
2. 异步请求处理
3. 连接池管理
4. 错误处理与超时控制
"""
MSG_REQUEST = 0
MSG_RESPONSE = 1
MSG_NOTIFY = 2
def __init__(self, host: str = "127.0.0.1", port: int = 18800,
max_workers: int = 10):
"""
初始化 RPC 服务器
Args:
host: 监听主机地址
port: 监听端口
max_workers: 线程池最大工作线程数
"""
self.host = host
self.port = port
self.methods = {}
self.executor = ThreadPoolExecutor(max_workers=max_workers)
self.running = False
self.server_socket = None
def register_method(self, name: str, func: Callable) -> None:
"""
注册 RPC 方法
Args:
name: 方法名称
func: 可调用对象
"""
self.methods[name] = func
logger.info(f"注册方法: {name}")
def register(self, name: Optional[str] = None):
"""
方法注册装饰器
Args:
name: 方法名称,如果为None则使用函数名
Returns:
装饰器函数
"""
def decorator(func):
method_name = name or func.__name__
self.register_method(method_name, func)
return func
return decorator
def _handle_request(self, msgid: int, method: str, params: List[Any],
client_socket: socket.socket) -> None:
"""
处理单个 RPC 请求
Args:
msgid: 消息ID
method: 方法名
params: 参数列表
client_socket: 客户端套接字
"""
try:
if method not in self.methods:
error = {"code": -32601, "message": f"方法未找到: {method}"}
response = [self.MSG_RESPONSE, msgid, error, None]
else:
func = self.methods[method]
result = func(*params)
response = [self.MSG_RESPONSE, msgid, None, result]
except Exception as e:
error = {"code": -32000, "message": str(e)}
response = [self.MSG_RESPONSE, msgid, error, None]
logger.error(f"执行方法 {method} 时出错: {e}")
# 发送响应
try:
packed = msgpack.packb(response, use_bin_type=True)
length = struct.pack(">I", len(packed))
client_socket.sendall(length + packed)
except Exception as e:
logger.error(f"发送响应失败: {e}")
def _handle_client(self, client_socket: socket.socket, address: tuple) -> None:
"""
处理客户端连接
Args:
client_socket: 客户端套接字
address: 客户端地址 (ip, port)
"""
logger.info(f"客户端连接: {address}")
try:
while self.running:
# 读取消息长度
length_data = client_socket.recv(4)
if not length_data:
break
length = struct.unpack(">I", length_data)[0]
# 读取消息体
data = b""
while len(data) < length:
chunk = client_socket.recv(length - len(data))
if not chunk:
break
data += chunk
if len(data) < length:
break
# 解析消息
try:
message = msgpack.unpackb(data, raw=False)
except Exception as e:
logger.error(f"消息解析失败: {e}")
continue
msg_type = message[0]
if msg_type == self.MSG_REQUEST:
# 请求消息: [type, msgid, method, params]
_, msgid, method, params = message
logger.info(f"收到请求: msgid={msgid}, method={method}")
# 提交到线程池处理
self.executor.submit(
self._handle_request, msgid, method, params, client_socket
)
elif msg_type == self.MSG_NOTIFY:
# 通知消息: [type, method, params]
_, method, params = message
logger.info(f"收到通知: method={method}")
if method in self.methods:
self.executor.submit(self.methods[method], *params)
except (ConnectionResetError, BrokenPipeError):
logger.info(f"客户端断开连接: {address}")
except Exception as e:
logger.error(f"处理客户端时出错: {e}")
finally:
client_socket.close()
logger.info(f"关闭连接: {address}")
def start(self) -> None:
"""
启动 RPC 服务器
"""
self.server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server_socket.bind((self.host, self.port))
self.server_socket.listen(10)
self.running = True
logger.info(f"服务器启动在 {self.host}:{self.port}")
try:
while self.running:
client_socket, address = self.server_socket.accept()
thread = threading.Thread(
target=self._handle_client,
args=(client_socket, address),
daemon=True
)
thread.start()
except KeyboardInterrupt:
logger.info("接收到中断信号")
except Exception as e:
logger.error(f"服务器错误: {e}")
finally:
self.stop()
def stop(self) -> None:
"""
停止 RPC 服务器
"""
self.running = False
if self.server_socket:
self.server_socket.close()
self.executor.shutdown(wait=True)
logger.info("服务器已停止")
class CalculatorService:
"""计算器服务实现"""
def add(self, a: float, b: float) -> float:
"""加法运算"""
return a + b
def subtract(self, a: float, b: float) -> float:
"""减法运算"""
return a - b
def multiply(self, a: float, b: float) -> float:
"""乘法运算"""
return a * b
def divide(self, a: float, b: float) -> float:
"""除法运算"""
if b == 0:
raise ValueError("除数不能为零")
return a / b
def fibonacci(self, n: int) -> int:
"""计算斐波那契数列"""
if n <= 0:
return 0
elif n == 1:
return 1
a, b = 0, 1
for _ in range(2, n + 1):
a, b = b, a + b
return b
def main():
"""主函数"""
# 创建服务器实例
server = RPCServer(port=18800, max_workers=5)
# 创建服务实例
calculator = CalculatorService()
# 注册服务方法
server.register_method("add", calculator.add)
server.register_method("subtract", calculator.subtract)
server.register_method("multiply", calculator.multiply)
server.register_method("divide", calculator.divide)
server.register_method("fibonacci", calculator.fibonacci)
# 使用装饰器注册方法
@server.register("echo")
def echo_message(message: str) -> str:
"""回显消息"""
return f"Echo: {message}"
@server.register()
def get_server_info() -> Dict:
"""获取服务器信息"""
return {
"name": "msgpack-rpc-server",
"version": "1.0.0",
"methods": list(server.methods.keys()),
"timestamp": time.time()
}
# 启动服务器
server.start()
if __name__ == "__main__":
main()
3.2 完整的客户端实现
#!/usr/bin/env python3
"""
msgpack-rpc 客户端示例
演示 __getattr__ 动态代理和异步调用
"""
import msgpack
import socket
import struct
import time
import threading
from typing import Any, Optional, Dict, List, Callable
from concurrent.futures import Future as ConcurrentFuture
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class RPCFuture:
"""
RPC 异步结果封装
提供 get() 和 wait() 方法等待结果返回
支持超时和异常处理
"""
def __init__(self, msgid: int):
self.msgid = msgid
self._result = None
self._error = None
self._event = threading.Event()
self._callbacks = []
def set_result(self, result: Any) -> None:
"""设置结果值"""
self._result = result
self._event.set()
for callback in self._callbacks:
try:
callback(result)
except Exception as e:
logger.error(f"回调函数执行失败: {e}")
def set_error(self, error: Dict) -> None:
"""设置错误"""
self._error = error
self._event.set()
def get(self, timeout: Optional[float] = None) -> Any:
"""
获取结果,阻塞直到结果返回或超时
Args:
timeout: 超时时间(秒),None 表示无限等待
Returns:
远程调用结果
Raises:
TimeoutError: 等待超时
RuntimeError: RPC 调用错误
"""
if not self._event.wait(timeout):
raise TimeoutError(f"等待结果超时 (msgid={self.msgid})")
if self._error:
raise RuntimeError(f"RPC 错误: {self._error}")
return self._result
def add_done_callback(self, callback: Callable) -> None:
"""添加完成回调"""
if self._event.is_set():
try:
callback(self._result if not self._error else None)
except Exception as e:
logger.error(f"回调函数执行失败: {e}")
else:
self._callbacks.append(callback)
def done(self) -> bool:
"""检查是否完成"""
return self._event.is_set()
class RPCClient:
"""
MessagePack-RPC 客户端实现
通过 __getattr__ 实现动态方法代理
支持同步和异步调用
"""
MSG_REQUEST = 0
MSG_RESPONSE = 1
MSG_NOTIFY = 2
def __init__(self, host: str = "127.0.0.1", port: int = 18800,
timeout: float = 30.0):
"""
初始化 RPC 客户端
Args:
host: 服务器地址
port: 服务器端口
timeout: 连接超时时间(秒)
"""
self.host = host
self.port = port
self.timeout = timeout
self.socket = None
self.next_msgid = 1
self.pending_futures = {}
self.response_handler = None
self.running = False
self.lock = threading.Lock()
def connect(self) -> None:
"""连接到服务器"""
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.settimeout(self.timeout)
self.socket.connect((self.host, self.port))
self.running = True
# 启动响应处理线程
self.response_handler = threading.Thread(
target=self._receive_responses,
daemon=True
)
self.response_handler.start()
logger.info(f"已连接到服务器 {self.host}:{self.port}")
def disconnect(self) -> None:
"""断开连接"""
self.running = False
if self.socket:
self.socket.close()
self.socket = None
logger.info("已断开服务器连接")
def __enter__(self):
"""上下文管理器入口"""
self.connect()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""上下文管理器退出"""
self.disconnect()
def __getattr__(self, name: str) -> Callable:
"""
动态方法代理
当访问 client.method_name 时,如果属性不存在,
则返回一个闭包函数用于远程调用
Args:
name: 方法名称
Returns:
远程调用函数
"""
def remote_method(*args, **kwargs):
"""
远程方法调用闭包
将本地方法调用转换为 RPC 请求
支持位置参数和关键字参数
"""
# 合并参数
all_args = list(args)
if kwargs:
all_args.append(kwargs)
# 生成消息ID
with self.lock:
msgid = self.next_msgid
self.next_msgid += 1
# 构建请求消息
request = [self.MSG_REQUEST, msgid, name, all_args]
# 发送请求
future = self._send_request(request, msgid)
# 返回 Future 对象
return future
# 设置方法名,便于调试
remote_method.__name__ = f"remote_{name}"
return remote_method
def _send_request(self, request: List, msgid: int) -> RPCFuture:
"""
发送 RPC 请求
Args:
request: 请求消息
msgid: 消息ID
Returns:
RPCFuture 对象
"""
# 创建 Future
future = RPCFuture(msgid)
self.pending_futures[msgid] = future
try:
# 序列化消息
data = msgpack.packb(request, use_bin_type=True)
length = struct.pack(">I", len(data))
# 发送数据
self.socket.sendall(length + data)
logger.debug(f"发送请求: msgid={msgid}, method={request[2]}")
except Exception as e:
# 发送失败,移除 Future 并设置错误
self.pending_futures.pop(msgid, None)
future.set_error({"code": -32000, "message": f"发送失败: {str(e)}"})
logger.error(f"发送请求失败: {e}")
return future
def notify(self, method: str, *args, **kwargs) -> None:
"""
发送通知消息(不需要响应)
Args:
method: 方法名
args: 位置参数
kwargs: 关键字参数
"""
# 合并参数
all_args = list(args)
if kwargs:
all_args.append(kwargs)
# 构建通知消息
notify_msg = [self.MSG_NOTIFY, method, all_args]
try:
# 序列化并发送
data = msgpack.packb(notify_msg, use_bin_type=True)
length = struct.pack(">I", len(data))
self.socket.sendall(length + data)
logger.debug(f"发送通知: method={method}")
except Exception as e:
logger.error(f"发送通知失败: {e}")
def _receive_responses(self) -> None:
"""
接收服务器响应的后台线程
持续监听来自服务器的响应消息
解析后将结果设置到对应的 Future
"""
buffer = b""
expected_length = None
while self.running and self.socket:
try:
# 接收数据
chunk = self.socket.recv(4096)
if not chunk:
logger.warning("连接被服务器关闭")
break
buffer += chunk
# 处理完整的数据包
while buffer:
# 读取消息长度
if expected_length is None and len(buffer) >= 4:
length_data = buffer[:4]
expected_length = struct.unpack(">I", length_data)[0]
buffer = buffer[4:]
# 读取消息体
if expected_length is not None and len(buffer) >= expected_length:
message_data = buffer[:expected_length]
buffer = buffer[expected_length:]
expected_length = None
# 处理消息
self._process_response(message_data)
else:
break
except socket.timeout:
continue
except (ConnectionResetError, BrokenPipeError):
logger.error("连接中断")
break
except Exception as e:
logger.error(f"接收响应时出错: {e}")
break
def _process_response(self, data: bytes) -> None:
"""
处理单个响应消息
Args:
data: 原始消息数据
"""
try:
message = msgpack.unpackb(data, raw=False)
msg_type = message[0]
if msg_type == self.MSG_RESPONSE:
# 响应消息: [type, msgid, error, result]
_, msgid, error, result = message
# 查找对应的 Future
future = self.pending_futures.pop(msgid, None)
if future:
if error:
future.set_error(error)
logger.warning(f"RPC 错误: msgid={msgid}, error={error}")
else:
future.set_result(result)
logger.debug(f"收到响应: msgid={msgid}")
else:
logger.warning(f"未找到对应的 Future: msgid={msgid}")
except Exception as e:
logger.error(f"处理响应消息失败: {e}")
def demo_sync_calls():
"""演示同步调用"""
print("\n=== 同步调用演示 ===")
with RPCClient("127.0.0.1", 18800) as client:
try:
# 调用远程方法(同步等待)
result = client.add(10, 20).get()
print(f"10 + 20 = {result}")
# 多个连续调用
results = []
for i in range(5):
future = client.fibonacci(i + 1)
results.append(future.get())
print(f"斐波那契数列前5项: {results}")
# 获取服务器信息
info = client.get_server_info().get()
print(f"服务器信息: {info}")
# 错误处理
try:
result = client.divide(10, 0).get()
except RuntimeError as e:
print(f"捕获到预期错误: {e}")
except Exception as e:
print(f"调用失败: {e}")
def demo_async_calls():
"""演示异步调用"""
print("\n=== 异步调用演示 ===")
with RPCClient("127.0.0.1", 18800) as client:
# 发起多个异步调用
futures = []
operations = [
("加法", client.add, (100, 200)),
("减法", client.subtract, (500, 300)),
("乘法", client.multiply, (12, 12)),
("除法", client.divide, (100, 5)),
]
for name, method, args in operations:
future = method(*args)
futures.append((name, future))
print(f"已发起 {name} 调用 (msgid={future.msgid})")
# 等待所有调用完成
print("\n等待结果...")
for name, future in futures:
try:
result = future.get(timeout=5)
print(f"{name}: {result}")
except TimeoutError:
print(f"{name}: 超时")
except RuntimeError as e:
print(f"{name}: 错误 - {e}")
def demo_callback_pattern():
"""演示回调模式"""
print("\n=== 回调模式演示 ===")
def on_calculated(result, operation=None):
"""计算结果回调"""
print(f"[回调] {operation}: {result}")
with RPCClient("127.0.0.1", 18800) as client:
# 发起调用并设置回调
future1 = client.multiply(25, 4)
future1.add_done_callback(
lambda r: on_calculated(r, "25 × 4")
)
future2 = client.fibonacci(10)
future2.add_done_callback(
lambda r: on_calculated(r, "fibonacci(10)")
)
# 等待一段时间让回调执行
import time
time.sleep(1)
def demo_batch_operations():
"""演示批量操作"""
print("\n=== 批量操作演示 ===")
with RPCClient("127.0.0.1", 18800) as client:
import time
# 批量计算斐波那契数列
n = 10
start_time = time.time()
futures = []
for i in range(1, n + 1):
future = client.fibonacci(i)
futures.append(future)
# 收集结果
results = []
for i, future in enumerate(futures, 1):
try:
result = future.get(timeout=10)
results.append((i, result))
except Exception as e:
results.append((i, f"错误: {e}"))
elapsed = time.time() - start_time
print(f"计算 fibonacci(1..{n}) 耗时: {elapsed:.3f}秒")
print(f"结果: {results}")
def demo_notification():
"""演示通知消息"""
print("\n=== 通知消息演示 ===")
with RPCClient("127.0.0.1", 18800) as client:
# 发送通知(不需要响应)
client.notify("echo", "这是一个通知消息")
print("已发送通知消息")
# 等待一下让服务器处理
import time
time.sleep(0.5)
def main():
"""主函数"""
print("msgpack-rpc 客户端演示")
print("=" * 50)
try:
# 测试连接
client = RPCClient("127.0.0.1", 18800)
client.connect()
# 演示各种调用模式
demo_sync_calls()
demo_async_calls()
demo_callback_pattern()
demo_batch_operations()
demo_notification()
client.disconnect()
except ConnectionRefusedError:
print("错误: 无法连接到服务器,请确保服务器正在运行")
except Exception as e:
print(f"错误: {e}")
if __name__ == "__main__":
main()
3.3 Makefile 构建配置
# msgpack-rpc 示例项目 Makefile
# 编译和执行 Python RPC 示例
.PHONY: all server client test clean
# 默认目标
all: deps server client
# 安装依赖
deps:
@echo "安装 Python 依赖..."
pip install msgpack==1.0.5
pip install msgpack-rpc-python==0.4.1
# 启动服务器
server:
@echo "启动 RPC 服务器..."
python3 server.py &
# 启动客户端
client:
@echo "等待服务器启动..."
sleep 2
@echo "启动 RPC 客户端..."
python3 client.py
# 运行测试
test:
@echo "运行测试..."
python3 -m pytest test_rpc.py -v
# 清理
clean:
@echo "清理进程..."
pkill -f "python3 server.py" || true
@echo "完成清理"
# 同时启动服务器和客户端(不同终端)
run-server:
@echo "启动 RPC 服务器 (端口: 18800)..."
python3 server.py
run-client:
@echo "启动 RPC 客户端..."
python3 client.py
# 性能测试
benchmark:
@echo "运行性能基准测试..."
python3 benchmark.py
# 检查代码规范
lint:
@echo "检查代码规范..."
flake8 *.py
mypy *.py --ignore-missing-imports
四、设计原理深度剖析
4.1 __getattr__ 的设计哲学
__getattr__ 在 msgpack-rpc 中的运用体现了 Python 的"鸭子类型"哲学:
4.2 动态代理的优势与权衡
优势:
- 灵活性:无需预定义接口,支持动态方法发现
- 简洁性:客户端代码与本地调用几乎无差别
- 扩展性:服务端新增方法,客户端自动可用
权衡:
- 类型安全:缺乏静态类型检查
- IDE支持:代码补全和文档提示有限
- 错误发现:运行时才能发现方法不存在错误
4.3 消息处理状态机
五、高级应用场景
5.1 微服务架构中的 RPC 调用
"""
微服务架构中的 RPC 应用
演示服务发现和负载均衡
"""
class ServiceDiscovery:
"""服务发现客户端"""
def __init__(self, registry_url: str):
self.registry = RPCClient(registry_url)
self.service_cache = {}
self.balancer = RoundRobinBalancer()
def get_service(self, service_name: str) -> RPCClient:
"""获取服务实例"""
if service_name not in self.service_cache:
# 从注册中心获取服务地址
instances = self.registry.get_instances(service_name).get()
self.service_cache[service_name] = instances
# 负载均衡选择实例
instance = self.balancer.select(self.service_cache[service_name])
return RPCClient(instance.host, instance.port)
def call_service(self, service_name: str, method: str, *args):
"""调用远程服务"""
client = self.get_service(service_name)
remote_method = getattr(client, method)
return remote_method(*args)
# 使用示例
discovery = ServiceDiscovery("registry:18800")
result = discovery.call_service("user-service", "get_user", 123)
5.2 异步流式处理
"""
流式 RPC 处理
支持大文件传输和流式计算
"""
class StreamRPCClient(RPCClient):
"""支持流式传输的 RPC 客户端"""
def upload_file(self, filename: str, chunk_size: int = 8192):
"""流式上传文件"""
def generate_chunks():
with open(filename, 'rb') as f:
while chunk := f.read(chunk_size):
yield chunk
# 发送开始通知
self.notify("upload_start", filename)
# 流式发送数据
for i, chunk in enumerate(generate_chunks()):
self.notify("upload_chunk", i, chunk)
# 发送结束通知
self.notify("upload_end", filename)
def stream_process(self, method: str, data_stream):
"""流式处理数据"""
# 创建流式会话
session_id = str(uuid.uuid4())
# 发送数据流
for chunk in data_stream:
future = getattr(self, f"{method}_chunk")(session_id, chunk)
yield future.get()
# 获取最终结果
future = getattr(self, f"{method}_finalize")(session_id)
yield future.get()
六、性能优化建议
6.1 连接池管理
class ConnectionPool:
"""RPC 连接池"""
def __init__(self, host: str, port: int, max_size: int = 10):
self.host = host
self.port = port
self.max_size = max_size
self.pool = queue.Queue(max_size)
self.current_size = 0
self.lock = threading.Lock()
def get_connection(self) -> RPCClient:
"""获取连接"""
try:
return self.pool.get_nowait()
except queue.Empty:
with self.lock:
if self.current_size < self.max_size:
client = RPCClient(self.host, self.port)
client.connect()
self.current_size += 1
return client
# 等待其他连接释放
return self.pool.get()
def release_connection(self, client: RPCClient):
"""释放连接"""
self.pool.put(client)
def __enter__(self):
return self.get_connection()
def __exit__(self, exc_type, exc_val, exc_tb):
self.release_connection(self)
6.2 批量请求优化
class BatchClient(RPCClient):
"""支持批量请求的客户端"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.batch_queue = []
self.batch_size = kwargs.get('batch_size', 100)
self.batch_interval = kwargs.get('batch_interval', 0.1)
def batch_call(self, method: str, *args):
"""批量调用"""
future = RPCFuture(len(self.batch_queue))
self.batch_queue.append((future, method, args))
# 触发批量发送
if len(self.batch_queue) >= self.batch_size:
self._flush_batch()
return future
def _flush_batch(self):
"""发送批量请求"""
if not self.batch_queue:
return
batch_requests = []
for future, method, args in self.batch_queue:
msgid = future.msgid
request = [self.MSG_REQUEST, msgid, method, args]
batch_requests.append((msgid, request))
# 批量发送
batch_data = msgpack.packb(batch_requests, use_bin_type=True)
self.socket.sendall(struct.pack(">I", len(batch_data)) + batch_data)
self.batch_queue.clear()
七、总结
通过对 msgpack-rpc 模块中 __getattr__ 机制的深度解析,我们可以看到:
- 动态代理模式:
__getattr__实现了透明的远程方法调用,使 RPC 调用像本地调用一样自然 - 异步处理机制:通过 Future 模式实现了非阻塞调用,支持回调和超时控制
- 协议设计精巧:基于 MessagePack 的高效序列化,设计了简洁的消息格式
- 工程实践完善:提供了完整的连接管理、错误处理和性能优化方案
这种设计体现了 Python 的哲学——“简单胜于复杂”,通过动态特性简化了分布式调用的复杂度,同时保持了足够的灵活性和性能。
在实际应用中,msgpack-rpc 适用于:
- 内部微服务通信
- 实时数据处理管道
- 游戏服务器通信
- IoT 设备控制
- 需要高性能跨语言通信的场景
通过合理运用 __getattr__ 的动态代理能力,开发者可以构建出既简洁又强大的分布式系统。
更多推荐


所有评论(0)