你真的懂微服务的RPC吗?
拦截器的作用,是在执行核心业务方法的前后,创造出一个统一的切片,来执行所有业务方法锁共有的通用逻辑。此外我们还能够通过这部分通用逻辑的执行结果,来判断是否需要熔断当前的执行链路,以起到拦截的效果有关grpc拦截器的内容,其实和gin框架中的handlersChain是异曲同工的。req:业务处理方法的请求参数info:当前所属的业务服务servicehandler:真正的业务处理方法// 前处理校
背景介绍
rpc
rpc,全称remote process call(远程过程调用),是微服务架构下的一种通信模式。这种通信模式下,一台服务器在调用远程机器的接口时,能够获得像调用本地方法一样的良好体验
rpc通常对标的是restful风格的http调用方式,rpc相较于http的优势所在:
- rpc调用基于sdk方式,调用方法和出入参协议固定,stub文件本身还能起到接口文档的作用,很大程度上优化了通信双方约定协议达成共识的成本。
- rpc在传输层协议tcp基础之上,可以由实现框架自定义填充应用层协议细节,理论上存在着更高的上线。
事务往往具有多面性,一些优点在转换视角后可能也会成为对应的劣势,因此从另一个角度看,rpc相较于http存在如下缺点:
- 基于sdk方式调用,灵活度低,开发成本高,更多的适合用于系统内部模块之间的通信交互,不适合对外
- 用户自定义实现应用层协议,下限水平也很不稳定
grpc - go
rpc领域中的一座大山是google的开源框架grpc。框架本身基于C++实现,但对应于几个主流语言也有相应的实现版本。
grpc-go是基于go语言实现的grpc框架,go语言本身也是google实现的,因此两者的契合度良好
grpc-go以http2作为应用层协议,使用protobuf作为数据序列化协议以及接口定义语言
protobuf开源地址:https://github.com/protocolbuffers/protobuf-go
grpc-go开源地址:https://github.com/grpc/grpc-go
grpc - go使用教程
前置准备
首先需要把依赖的grpc-go插件提前准备好,分为如下几步:
- 安装grpc
go get google.golang.org/grpc@latest
- 安装protocol buffer
根据操作系统型号,下载安装好对应版本的protobuf应用:
https://github.com/google/protobuf/releases
需要将protocol执行文件所在的目录添加到环境变量$PATH当中,安装完成后,可以通过查看protobuf版本指令(下文可能简称pb),检验是否安装成功:
protoc --version
- 安装protocolbuf -> pb.go插件
go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.28
该插件的作用是,能够基于.proto文件一键生成_pb.go文件,对应内容为通信请求/响应参数的对象模型。
go install指令默认会将插件安装到GOPATH/bin目录下,需要确保GOPATH/bin目录下,需要确保GOPATH/bin目录下,需要确保GOPATH/bin路径被添加到环境路径$PATH中
安装完成后,可以通过查看插件版本指令,校验安装是否成功
protoc-gen-go --version
- 安装protobuf -> grpc.pb.go插件
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.2
该插件的作用是,能够基于.proto文件生成_grpc.pb.go,对应内容为通信服务框架代码
安装完成后,可以通过查看插件版本指令,校验安装是否成功
protoc-gen-go-grpc --version
pb桩文件
1. 编写protobuf文件
syntax = "proto3"; // 固定语法前缀
option go_package = "."; // 指定生成的Go代码在你项目中的导入路径(告诉proto编译器,生成的go代码应该放在哪个包中)
package pb; // 包名,其它Go文件可以通过包名pb导入
// 定义服务
// 定义了一个gRPC服务,名为HelloService
// 有一个RPC方法,名为SayHello
// 调用方式:
// 客户端发送HelloReq消息
// 服务端返回HelloResp消息
service HelloService{
// SayHello方法
rpc SayHello(HelloReq)returns(HelloResp){}
}
// 请求参数
// 消息名:HelloReq
// 包含一个字段:name,类型为string
// =1是字段编号(不能重复,用于二进制编码)
message HelloReq{
string name =1;
}
// 响应消息
// 消息名:HelloResp
// 包含一个字段:reply,类型为string
message HelloResp{
string reply = 1;
}
2. 生成pb.go文件
通过使用插件,可以在.proto文件基础上,一键生成对用的go代码
protoc --go_out=. --go-grpc_out=. pb/hello.proto
–go_out:指定pb.go文件的生成位置
–go-grpc_out:指定grpc.pb.go文件的生成位置
pb/hello.proto:这是指定了.proto文件的所在位置
执行上述指令后,会生成pb.go和grpc.pb.go两个文件
// ...
package proto
import (
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
)
// ...
// 请求消息
type HelloReq struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
Age int32 `protobuf:"varint,2,opt,name=age,proto3" json:"age,omitempty"`
}
// ...
// 响应消息
type HelloResp struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Reply string `protobuf:"bytes,1,opt,name=reply,proto3" json:"reply,omitempty"`
}
上述代码展示了pb.go文件中的内容,核心是基于.proto定义的出入参协议,生成对应的golang类定义代码
package proto
import (
context "context"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
)
// 基于 .proto 文件生成的客户端框架代码
// 客户端 interface
type HelloServiceClient interface {
// SayHello 方法
SayHello(ctx context.Context, in *HelloReq, opts ...grpc.CallOption) (*HelloResp, error)
}
// 客户端实现类
type helloServiceClient struct {
cc grpc.ClientConnInterface
}
// 客户端构造器函数
func NewHelloServiceClient(cc grpc.ClientConnInterface) HelloServiceClient {
return &helloServiceClient{cc}
}
// 客户端请求入口
func (c *helloServiceClient) SayHello(ctx context.Context, in *HelloReq, opts ...grpc.CallOption) (*HelloResp, error) {
out := new(HelloResp)
err := c.cc.Invoke(ctx, "/pb.HelloService/SayHello", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// 服务端注册入口
func RegisterHelloServiceServer(s grpc.ServiceRegistrar, srv HelloServiceServer) {
s.RegisterService(&HelloService_ServiceDesc, srv)
}
// 服务端业务方法框架代码
func _HelloService_SayHello_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(HelloReq)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(HelloServiceServer).SayHello(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/pb.HelloService/SayHello",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(HelloServiceServer).SayHello(ctx, req.(*HelloReq))
}
return interceptor(ctx, in, info, handler)
}
// 服务端业务处理服务描述符
var HelloService_ServiceDesc = grpc.ServiceDesc{
ServiceName: "pb.HelloService",
HandlerType: (*HelloServiceServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "SayHello",
Handler: _HelloService_SayHello_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "proto/hello.proto",
}
上述代码展示了grpc.pb.go文件中的内容,核心内容包括:
- 基于.proto文件生成了客户端的桩代码,后续作为用户使用grpc客户端模块的sdk入口
- 基于.proto文件生成了服务端的服务注册桩代码,后续作为用户使用grpc服务端模块的sdk入口
- 基于.proto文件生成了业务处理服务(pb.HelloService)的描述符,每个描述符内部会建立基于方法名(SayHello)到具体处理函数(_HelloService_SayHello_Handler)的映射关系。
服务端
服务端启动代码示例如下:
package main
import (
"context"
"fmt"
"net"
"github.com/grpc_demo/proto"
"google.golang.org/grpc"
)
// 业务处理服务
type HelloService struct {
proto.UnimplementedHelloServiceServer
}
// 实现具体的业务方法逻辑
func (s *HelloService) SayHello(ctx context.Context, req *proto.HelloReq) (*proto.HelloResp, error) {
return &proto.HelloResp{
Reply: fmt.Sprintf("hello name: %s", req.Name),
}, nil
}
func main() {
// 创建 tcp 端口监听器
listener, err := net.Listen("tcp", ":8093")
if err != nil {
panic(err)
}
// 创建 grpc server
server := grpc.NewServer()
// 将自定义的业务处理服务注册到 grpc server 中
proto.RegisterHelloServiceServer(server, &HelloService{})
// 运行 grpc server
if err := server.Serve(listener); err != nil {
panic(err)
}
}
- 预声明业务处理服务HelloService,实现好桩文件中定义的业务处理方法SayHello
- 调用net.Listen方法,创建tcp端口监听器
- 调用grpc.NewServer方法,创建一个grpc server对象
- 调用桩文件中预生成好的注册方法proto.RegisterHelloServiceServer,将HelloService注册到grpc server对象中
- 运行server.Serve方法,监听指定的端口,真正启动grpc server
客户端
客户端启动代码示例如下:
import (
"context"
"fmt"
"github.com/grpc_demo/proto"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
func main() {
// 通过指定地址,建立与 grpc 服务端的连接
conn, err := grpc.Dial("localhost:8093", grpc.WithTransportCredentials(insecure.NewCredentials()))
// ...
// 调用 .grpc.pb.go 文件中预生成好的客户端构造器方法,创建 grpc 客户端
client := proto.NewHelloServiceClient(conn)
// 调用 .grpc.pb.go 文件预生成好的客户端请求方法,使用 .pb.go 文件中预生成好的请求参数作为入参,向 grpc 服务端发起请求
resp, err := client.SayHello(context.Background(), &proto.HelloReq{
Name: "xiaoxuxiansheng",
})
// ...
// 打印取得的响应参数
fmt.Printf("resp: %+v", resp)
}
客户端代码中完成的核心步骤包括:
- 调用grpc.Dial方法,与指定地址的grpc服务端建立连接
- 调用桩文件中的方法proto.NewHelliServiceClient,创建pb文件预声明好的grpc客户端对象
- 调用client.SayHello方法,发送grpc请求,并处理响应结果
服务端
核心数据结构
在grpc服务端领域,自上而下有着三个层次分明的结构
server->service->method
- 最高级别是server,是对整个grpc服务端的抽象
- 一个server下可以注册挂载多个业务服务service
- 一个service下存在多个业务处理方法method
1. server
type Server struct {
// 配置项
opts serverOptions
// 互斥锁保证并发安全
mu sync.Mutex
// tcp 端口监听器池
lis map[net.Listener]bool
// ...
// 连接池
conns map[string]map[transport.ServerTransport]bool
serve bool
cv *sync.Cond
// 业务服务映射管理
services map[string]*serviceInfo // service name -> service info
// ...
serveWG sync.WaitGroup
// ...
}
Server类是对grpc服务端的代码实现,其中通过一个名为services的map,记录了由服务名到具体业务服务模块的映射关系。
2. serviceInfo
type serviceInfo struct {
// 业务服务类
serviceImpl interface{}
// 业务方法映射管理
methods map[string]*MethodDesc
// ...
}
serviceInfo是某一个具体的业务模块,其中通过一个名为methods的map记录了由方法名到具体方法的映射关系。
3. MethodDesc
type MethodDesc struct {
MethodName string
Handler methodHandler
}
MethodDesc是对方法的封装,其中的字段Handler是真正的业务处理方法
4. methodHandler
type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor UnaryServerInterceptor) (interface{}, error)
methodsHandler是业务处理方法的类型,其中几个关键入参的含义分别是:
- srv:业务处理方法从属的业务服务模块
- dec:进行入参req反序列化的闭包函数
- interceptor:业务处理方法外部包裹的拦截器方法
创建server
func NewServer(opt ...ServerOption) *Server {
opts := defaultServerOptions
for _, o := range extraServerOptions {
o.apply(&opts)
}
for _, o := range opt {
o.apply(&opts)
}
s := &Server{
lis: make(map[net.Listener]bool),
opts: opts,
conns: make(map[string]map[transport.ServerTransport]bool),
services: make(map[string]*serviceInfo),
quit: grpcsync.NewEvent(),
done: grpcsync.NewEvent(),
czData: new(channelzData),
}
chainUnaryServerInterceptors(s)
//...
s.cv = sync.NewCond(&s.mu)
// ...
return s
}
grpc.NewServer方法会创建server实例,并调用chainUnaryServerInterceptors方法,将一系列拦截器interceptor成链,并注入到ServerOption当中。
chainUnaryServerInterceptors方法如下:
func chainUnaryServerInterceptors(s *Server) {
interceptors := s.opts.chainUnaryInts
if s.opts.unaryInt != nil {
interceptors = append([]UnaryServerInterceptor{s.opts.unaryInt}, s.opts.chainUnaryInts...)
}
var chainedInt UnaryServerInterceptor
if len(interceptors) == 0 {
chainedInt = nil
} else if len(interceptors) == 1 {
chainedInt = interceptors[0]
} else {
chainedInt = chainUnaryInterceptors(interceptors)
}
s.opts.unaryInt = chainedInt
}
注册service
创建好grpc server后,接下来通过使用桩代码中预生成好的RegisterXXXServer方法,将业务处理服务service模块注入到server当中
func RegisterHelloServiceServer(s grpc.ServiceRegistrar,srv HelloServiceServer) {
s.RegisterService(&HelloService_ServiceDesc, srv)
}
func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {
// ...
s.register(sd, ss)
}
func (s *Server) register(sd *ServiceDesc, ss interface{}) {
s.mu.Lock()
defer s.mu.Unlock()
// ...
info := &serviceInfo{
serviceImpl: ss,
methods: make(map[string]*MethodDesc),
streams: make(map[string]*StreamDesc),
mdata: sd.Metadata,
}
for i := range sd.Methods {
d := &sd.Methods[i]
info.methods[d.MethodName] = d
}
// ...
s.services[sd.ServiceName] = info
}
注册过程中会经历RegisterHelloServiceServer->Server.RegisterService->Server.Register的调用链路,把service的所有方法注册到serviceInfo的methods map当中,然后将service封装到serviceInfo实例中,注册到server的services map当中
运行server
func (s *Server) Serve(lis net.Listener) error {
// ...
var tempDelay time.Duration // how long to sleep on accept failure
for {
rawConn, err := lis.Accept()
if err != nil {
// ...
}
// ...
s.serveWG.Add(1)
go func() {
s.handleRawConn(lis.Addr().String(), rawConn)
s.serveWG.Done()
}()
}
}
grpc server运行的流程,核心是基于for循环实现的主动轮询模型,每轮会通过调用net.Listener.Accept方法,基于IO多路复用epoll方式,阻塞等待grpc请求的到达
每当有新的连接到达后,服务端会开启一个goroutine,调用对应的Server.handleRawConn方法对请求进行处理
处理请求
func (s *Server) handleRawConn(lisAddr string, rawConn net.Conn) {
// ...
st := s.newHTTP2Transport(rawConn)
// ...
go func() {
s.serveStreams(st)
s.removeConn(lisAddr, st)
}()
}
在Server.handleRawConn方法中,会基于原始的net.Conn封装生成一个HTTP2Transport,然后开启goroutine调用Server.serveStream方法请求处理请求
- 首先调用serveStreams方法
func (s *Server) serveStreams(st transport.ServerTransport) {
var wg sync.WaitGroup
var roundRobinCounter uint32
st.HandleStreams(func(stream *transport.Stream) {
go func() {
defer wg.Done()
s.handleStream(st, stream, s.traceInfo(st, stream))
}()
}, func(ctx context.Context, method string) context.Context {
// ...
})
wg.Wait()
}
- 接着调用handleStream方法
func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
sm := stream.Method() // 获取方法全名
// ...
pos := strings.LastIndex(sm, "/") // 找到最后一个"/"的位置
service := sm[:pos] // 服务名:/package.service
method := sm[pos+1:] // 方法名:method
// 查找服务
// s.services:服务器注册的所有服务映射
// 键:服务名(如/pb.HelloService)
// 值:service结构体,包含该服务的方法信息
srv, knownService := s.services[service]
// 查找方法并分发
if knownService {
// 查找一元RPC方法
if md, ok := srv.methods[method]; ok {
s.processUnaryRPC(t, stream, srv, md, trInfo)
return
}
// 查找流式RPC方法
if sd, ok := srv.streams[method]; ok {
s.processStreamingRPC(t, stream, srv, sd, trInfo)
return
}
}
// ...
}
经过上述调用链之后,最终会在processXXX方法中通过recvAndDecompress读取到请求内容字节流,然后通过闭包函数
df封装好反序列请求参数的逻辑,继而调用md.Handler方法处理请求,最终通过Server.sendResponse方法将响应结果进行返回
func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, info *serviceInfo, md *MethodDesc, trInfo *traceInfo) (err error) {
// ...
d, err := recvAndDecompress(&parser{r: stream}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp)
// ...
df := func(v interface{}) error {
if err := s.getCodec(stream.ContentSubtype()).Unmarshal(d, v); err != nil {
// ...
}
// ...
}
ctx := NewContextWithServerTransportStream(stream.Context(), stream)
reply, appErr := md.Handler(info.serviceImpl, ctx, df, s.opts.unaryInt)
// ...
if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil {
// ...
}
// ...
}
以本文介绍的helloService为例,客户端调用SayHello方法后,服务端对应的md.Handler正是.proto文件生成的位于.grpc.pb.go文件中的桩方法_HelloService_SayHello_Handler
在该桩方法内部,包含的执行步骤如下:
- 调用闭包函数dec,将请求内容反序列化到请求入参in当中
- 将业务处理方法HelloServiceServer.SayHello闭包封装到一个UnaryHandler当中
- 调用interceptor方法,分别执行拦截器和handler的处理逻辑
拦截器
原理介绍
拦截器的作用,是在执行核心业务方法的前后,创造出一个统一的切片,来执行所有业务方法锁共有的通用逻辑。此外我们还能够通过这部分通用逻辑的执行结果,来判断是否需要熔断当前的执行链路,以起到拦截的效果
有关grpc拦截器的内容,其实和gin框架中的handlersChain是异曲同工的。
对于一个拦截器函数的具体定义:
type UnaryServerInterceptor func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (resp interface{}, err error)
其中几个入参的含义分别为:
- req:业务处理方法的请求参数
- info:当前所属的业务服务service
- handler:真正的业务处理方法
因此一个拦截器函数的使用模式应该是:
var myInterceptor1 = func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
// 前处理校验
if err := preLogicCheck();err != nil{
// 前处理校验不通过,则拦截,不调用业务方法直接返回
return nil,err
}
// 前处理校验通过,正常调用业务方法
resp, err = handle(ctx,req)
if err != nil{
return nil,err
}
// 后置处理校验
if err := postLogicCheck();err != nil{
// 后置处理校验不通过,则拦截结果,包装错误返回
return nil,err
}
// 正常返回结果
return resp,nil
}
拦截器链
func chainUnaryInterceptors(interceptors []UnaryServerInterceptor) UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (interface{}, error) {
return interceptors[0](ctx, req, info, getChainUnaryHandler(interceptors, 0, info, handler))
}
}
func getChainUnaryHandler(interceptors []UnaryServerInterceptor, curr int, info *UnaryServerInfo, finalHandler UnaryHandler) UnaryHandler {
if curr == len(interceptors)-1 {
return finalHandler
}
return func(ctx context.Context, req interface{}) (interface{}, error) {
return interceptors[curr+1](ctx, req, info, getChainUnaryHandler(interceptors, curr+1, info, finalHandler))
}
}
首先,chainUnaryInterceptors方法会将一系列拦截器interceptor成链,并返回首枚interceptor供ServerOption接收设置
其中,拦截器成链的关键在于getChainUnaryHandler方法中,其中会闭包调用拦截器数组的首枚拦截器函数,接下来依次用下一枚拦截器对业务方法handler进行包裹,封装成一个新的"handler"供当前拦截器使用。
操作实践
下面展示一下grpc拦截器链的实操例子:
- 依次声明拦截器1 myInterceptor1 和 拦截器2 myInterceptor2,会在调用业务方法 handler 前后分别打印一行内容
- 在创建 grpc server 时,将两个拦截器基于 option 注入
- 通过客户端请求服务端,通过输出日志观察拦截器运行效果
更多推荐


所有评论(0)