grpc.md

概述

Golang grpc实现原理和 proxy 实现原理

grpc-go 服务端使用介绍及源码分析

protoc

安装

1
2
3
4
5
6
7
# 下载并安装
# https://link.zhihu.com/?target=https%3A//github.com/google/protobuf/releases

go install google.golang.org/protobuf/cmd/protoc-gen-go@v1.28
protoc-gen-go --version
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@v1.2
protoc-gen-go-grpc --version

使用

编写 proto 文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
syntax = "proto3"; // 固定语法前缀


option go_package = "."; // 指定生成的Go代码在你项目中的导入路径


package pb; // 包名


// 定义服务
service HelloService {
// SayHello 方法
rpc SayHello (HelloReq) returns (HelloResp) {}
}


// 请求消息
message HelloReq {
string name = 1;
}


// 响应消息
message HelloResp {
string reply = 1
}

该文件以 .proto 作为后缀,扮演着 grpc 客户端与服务端通信交互的接口定义语言(DDL)的角色.

protobuf 的细节内容与底层原理,后续我们单开章节再作介绍,此处能理解其基本用法即可.

上述内容中,抛开前置的固定语法标识外,分为三个核心部分:

定义业务处理服务 HelloService,声明业务方法的名称(SayHello)以及出入参协议(HelloReq/HelloResp)
遵循 protobuf 的风格,分别声明出入参的类型定义:HelloReq 和 HelloResp,其中分别包含了字符串类型的成员字段 name 和 reply

生成 pb.go 文件

1
2
protoc --go_out=. --go-grpc_out=. pb/hello.proto

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
// ...
package proto


import (
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
)


// ...
// 请求消息
type HelloReq struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
Age int32 `protobuf:"varint,2,opt,name=age,proto3" json:"age,omitempty"`
}


// ...
// 响应消息
type HelloResp struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields

Reply string `protobuf:"bytes,1,opt,name=reply,proto3" json:"reply,omitempty"`
}

上述代码展示了 pb.go 文件中的内容,核心是基于 .proto 定义的出入参协议,生成对应的 golang 类定义代码.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package proto


import (
context "context"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
)


// 基于 .proto 文件生成的客户端框架代码
// 客户端 interface
type HelloServiceClient interface {
// SayHello 方法
SayHello(ctx context.Context, in *HelloReq, opts ...grpc.CallOption) (*HelloResp, error)
}


// 客户端实现类
type helloServiceClient struct {
cc grpc.ClientConnInterface
}


// 客户端构造器函数
func NewHelloServiceClient(cc grpc.ClientConnInterface) HelloServiceClient {
return &helloServiceClient{cc}
}


// 客户端请求入口
func (c *helloServiceClient) SayHello(ctx context.Context, in *HelloReq, opts ...grpc.CallOption) (*HelloResp, error) {
out := new(HelloResp)
err := c.cc.Invoke(ctx, "/pb.HelloService/SayHello", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}


// 服务端注册入口
func RegisterHelloServiceServer(s grpc.ServiceRegistrar, srv HelloServiceServer) {
s.RegisterService(&HelloService_ServiceDesc, srv)
}


// 服务端业务方法框架代码
func _HelloService_SayHello_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(HelloReq)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(HelloServiceServer).SayHello(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/pb.HelloService/SayHello",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(HelloServiceServer).SayHello(ctx, req.(*HelloReq))
}
return interceptor(ctx, in, info, handler)
}


// 服务端业务处理服务描述符
var HelloService_ServiceDesc = grpc.ServiceDesc{
ServiceName: "pb.HelloService",
HandlerType: (*HelloServiceServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "SayHello",
Handler: _HelloService_SayHello_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "proto/hello.proto",
}

上述代码展示了 grpc.pb.go 文件中的内容,核心内容包括:

  • 基于 .proto 文件生成了客户端的桩代码,后续作为用户使用 grpc 客户端模块的 sdk 入口.
  • 基于 .proto 文件生成了服务端的服务注册桩代码,后续作为用户使用 grpc 服务端模块的 sdk 入口
  • 基于 .proto 文件生成了业务处理服务(pb.HelloService)的描述符,每个描述符内部会建立基于方法名(SayHello)到具体处理函数(_
    HelloService_SayHello_Handler)的映射关系

服务端启动

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package main


import (
"context"
"fmt"
"net"


"github.com/grpc_demo/proto"


"google.golang.org/grpc"
)


// 业务处理服务
type HelloService struct {
proto.UnimplementedHelloServiceServer
}


// 实现具体的业务方法逻辑
func (s *HelloService) SayHello(ctx context.Context, req *proto.HelloReq) (*proto.HelloResp, error) {
return &proto.HelloResp{
Reply: fmt.Sprintf("hello name: %s", req.Name),
}, nil
}


func main() {
// 创建 tcp 端口监听器
listener, err := net.Listen("tcp", ":8093")
if err != nil {
panic(err)
}


// 创建 grpc server
server := grpc.NewServer()
// 将自定义的业务处理服务注册到 grpc server 中
proto.RegisterHelloServiceServer(server, &HelloService{})
// 运行 grpc server
if err := server.Serve(listener); err != nil {
panic(err)
}
}

  • 预声明业务处理服务 HelloService,实现好桩文件中定义的业务处理方法 SayHello
  • 调用 net.Listen 方法,创建 tcp 端口监听器
  • 调用 grpc.NewServer 方法,创建一个 grpc server 对象
  • 调用桩文件中预生成好的注册方法 proto.RegisterHelloServiceServer,将 HelloService 注册到 grpc server 对象当中
  • 运行 server.Serve 方法,监听指定的端口,真正启动 grpc server

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import (
"context"
"fmt"


"github.com/grpc_demo/proto"


"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)


func main() {
// 通过指定地址,建立与 grpc 服务端的连接
conn, err := grpc.Dial("localhost:8093", grpc.WithTransportCredentials(insecure.NewCredentials()))
// ...
// 调用 .grpc.pb.go 文件中预生成好的客户端构造器方法,创建 grpc 客户端
client := proto.NewHelloServiceClient(conn)

// 调用 .grpc.pb.go 文件预生成好的客户端请求方法,使用 .pb.go 文件中预生成好的请求参数作为入参,向 grpc 服务端发起请求
resp, err := client.SayHello(context.Background(), &proto.HelloReq{
Name: "xiaoxuxiansheng",
})
// ...
// 打印取得的响应参数
fmt.Printf("resp: %+v", resp)
}

  • 调用 grpc.Dial 方法,与指定地址的 grpc 服务端建立连接
  • 调用桩文件中的方法 proto.NewHelloServiceClient,创建 pb 文件预声明好的 grpc 客户端对象
  • 调用 client.SayHello 方法,发送 grpc 请求,并处理响应结果

3 服务端

3.1 核心数据结构

核心数据结构

在 grpc 服务端领域,自上而下有着三个层次分明的结构:server->service->method

  • 最高级别是 server,是对整个 grpc 服务端的抽象
  • 一个 server 下可以注册挂载多个业务服务 service
  • 一个 service 下存在多个业务处理方法 method

(1)server

type Server struct {
    // 配置项
    opts serverOptions
    // 互斥锁保证并发安全
    mu  sync.Mutex 
    // tcp 端口监听器池
    lis map[net.Listener]bool
    // ...
    // 连接池
    conns    map[string]map[transport.ServerTransport]bool
    serve    bool
    cv       *sync.Cond          
    // 业务服务映射管理  
    services map[string]*serviceInfo // service name -> service info
    // ...
    serveWG            sync.WaitGroup 
    // ...
}

Server 类是对 grpc 服务端的代码实现,其中通过一个名为 services 的 map,记录了由服务名到具体业务服务模块的映射关系.

(2)serviceInfo

type serviceInfo struct {
    // 业务服务类
    serviceImpl interface{
    // 业务方法映射管理  
    methods     map[string]*MethodDesc
    // ...
}

serviceInfo 是某一个具体的业务服务模块,其中通过一个名为 methods 的 map 记录了由方法名到具体方法的映射关系.

(3)MethodDesc

type MethodDesc struct {
    MethodName string
    Handler    methodHandler
}

MethodDesc 是对方法的封装,其中的字段 Handler 是真正的业务处理方法.

(4)methodHandler

type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor UnaryServerInterceptor) (interface{}, error)

methodsHandler 是业务处理方法的类型,其中几个关键入参的含义分别是:

  • srv:业务处理方法从属的业务服务模块
  • dec:进行入参 req 反序列化的闭包函数
  • interceptor:业务处理方法外部包裹的拦截器方法

3.2 创建 server

func NewServer(opt ...ServerOption) *Server {
    opts := defaultServerOptions
    for _, o := range extraServerOptions {
        o.apply(&opts)
    }
    for _, o := range opt {
        o.apply(&opts)
    }
    s := &Server{
        lis:      make(map[net.Listener]bool),
        opts:     opts,
        conns:    make(map[string]map[transport.ServerTransport]bool),
        services: make(map[string]*serviceInfo),
        quit:     grpcsync.NewEvent(),
        done:     grpcsync.NewEvent(),
        czData:   new(channelzData),
    }
    chainUnaryServerInterceptors(s)
    //...
    s.cv = sync.NewCond(&s.mu)
    // ...   
    return s
}

grpc.NewServer 方法中会创建 server 实例,并调用 chainUnaryServerInterceptors 方法,将一系列拦截器
interceptor 成链,并注入到 ServerOption 当中. 有关拦截器的内容,放在本文第 4 章再作展开.

func chainUnaryServerInterceptors(s *Server) {


    interceptors := s.opts.chainUnaryInts
    if s.opts.unaryInt != nil {
        interceptors = append([]UnaryServerInterceptor{s.opts.unaryInt}, s.opts.chainUnaryInts...)
    }


    var chainedInt UnaryServerInterceptor
    if len(interceptors) == 0 {
        chainedInt = nil
    } else if len(interceptors) == 1 {
        chainedInt = interceptors[0]
    } else {
        chainedInt = chainUnaryInterceptors(interceptors)
    }
    
    s.opts.unaryInt = chainedInt
}

3.3 注册 service

service 注册

创建好 grpc server 后,接下来通过使用桩代码中预生成好的 RegisterXXXServer 方法,业务处理服务 service 模块注入到
server 当中.

func RegisterHelloServiceServer(s grpc.ServiceRegistrar, srv HelloServiceServer) {
    s.RegisterService(&HelloService_ServiceDesc, srv)
}


func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {
    // ...
    s.register(sd, ss)
}


func (s *Server) register(sd *ServiceDesc, ss interface{}) {
    s.mu.Lock()
    defer s.mu.Unlock()
    // ...
    info := &serviceInfo{
        serviceImpl: ss,
        methods:     make(map[string]*MethodDesc),
        streams:     make(map[string]*StreamDesc),
        mdata:       sd.Metadata,
    }
    for i := range sd.Methods {
        d := &sd.Methods[i]
        info.methods[d.MethodName] = d
    }
    // ...
    s.services[sd.ServiceName] = info
}

注册过程会经历 RegisterHelloServiceServer->Server.RegisterService -> Server.register
的调用链路,把 service 的所有方法注册到 serviceInfo 的 methods map 当中,然后将 service 封装到
serviceInfo 实例中,注册到 server 的 services map 当中

3.4 运行 server

func (s *Server) Serve(lis net.Listener) error {
    // ...


    var tempDelay time.Duration // how long to sleep on accept failure
    for {
        rawConn, err := lis.Accept()
        if err != nil {
            // ...
        }
        // ...
        s.serveWG.Add(1)
        go func() {
            s.handleRawConn(lis.Addr().String(), rawConn)
            s.serveWG.Done()
        }()
    }
}

grpc 运行关系

grpc server 运行的流程,核心是基于 for 循环实现的主动轮询模型,每轮会通过调用 net.Listener.Accept 方法,基于 IO
多路复用 epoll 方式,阻塞等待 grpc 请求的到达.

每当有新的连接到达后,服务端会开启一个 goroutine,调用对应的 Server.handleRawConn 方法对请求进行处理.

3.5 处理请求

func (s *Server) handleRawConn(lisAddr string, rawConn net.Conn) {
    // ...
    st := s.newHTTP2Transport(rawConn)
    // ...
    go func() {
        s.serveStreams(st)
        s.removeConn(lisAddr, st)
    }()
}

处理请求关系图

在 Server.handleRawConn 方法中,会基于原始的 net.Conn 封装生成一个 HTTP2Transport,然后开启
goroutine 调用 Server.serveStream 方法处理请求.

func (s *Server) serveStreams(st transport.ServerTransport) {
    var wg sync.WaitGroup


    var roundRobinCounter uint32
    st.HandleStreams(func(stream *transport.Stream) {
        go func() {
            defer wg.Done()
            s.handleStream(st, stream, s.traceInfo(st, stream))
        }()
    }, func(ctx context.Context, method string) context.Context {
        // ...
    })
    wg.Wait()
}


func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
    sm := stream.Method()
    // ...
    pos := strings.LastIndex(sm, "/")
    
    service := sm[:pos]
    method := sm[pos+1:]


    srv, knownService := s.services[service]
    if knownService {
        if md, ok := srv.methods[method]; ok {
            s.processUnaryRPC(t, stream, srv, md, trInfo)
            return
        }
        if sd, ok := srv.streams[method]; ok {
            s.processStreamingRPC(t, stream, srv, sd, trInfo)
            return
        }
    }
    // ...
}


func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, info *serviceInfo, md *MethodDesc, trInfo *traceInfo) (err error) {
    // ...
    d, err := recvAndDecompress(&parser{r: stream}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp)
    // ...
    df := func(v interface{}) error {
        if err := s.getCodec(stream.ContentSubtype()).Unmarshal(d, v); err != nil {
           // ...
        }
        // ...
    }
    ctx := NewContextWithServerTransportStream(stream.Context(), stream)
    reply, appErr := md.Handler(info.serviceImpl, ctx, df, s.opts.unaryInt)
    // ...


    if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil {
        // ...
    }
    // ...
}

stream 调用链

接下来一连建立了 Server.serveStreams -> http2Server.HandleStreams ->
http2Server.operateHeaders -> http2Server.handleStream ->
Server.processUnaryRPC 的方法调用链:

  • 在 Server.handleStream 方法中,会拆解来自客户端的请求路径 ${service}/${method},通过”/“ 前段得到 service 名称,通过 “/“
    后端得到 method 名称,并分别映射到对应的业务服务和业务方法

  • 在 Server.processUnaryRPC 方法中,会通过 recvAndDecompress 读取到请求内容字节流,然后通过闭包函数 df 封装好反序列请求参数的逻辑,继而调用
    md.Handler 方法处理请求,最终通过 Server.sendResponse 方法将响应结果进行返回

    func _HelloService_SayHello_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
    in := new(HelloReq)
    if err := dec(in); err != nil {
    return nil, err
    }
    if interceptor == nil {
    return srv.(HelloServiceServer).SayHello(ctx, in)
    }
    info := &grpc.UnaryServerInfo{
    Server: srv,
    FullMethod: “/pb.HelloService/SayHello”,
    }
    handler := func(ctx context.Context, req interface{}) (interface{}, error) {
    return srv.(HelloServiceServer).SayHello(ctx, req.(*HelloReq))
    }
    return interceptor(ctx, in, info, handler)
    }

以本文介绍的 helloService 为例,客户端调用 SayHello 方法后,服务端对应的 md.Handler 正是 .proto 文件生成的位于
.grpc.pb.go 文件中的桩方法 _HelloService_SayHello_Handler.

在该桩方法内部中,包含的执行步骤如下:

  • 调用闭包函数 dec,将请求内容反序列化到请求入参 in 当中
  • 将业务处理方法 HelloServiceServer.SayHello 闭包封装到一个 UnaryHandler 当中
  • 调用 intercetor 方法,分别执行拦截器和 handler 的处理逻辑

4 拦截器

有关 grpc 中拦截器 interceptor 部分的内容理解起来比较费脑,我们单开一章来展开聊聊.

4.1 原理介绍

拦截器的作用,是在执行核心业务方法的前后,创造出一个统一的切片,来执行所有业务方法锁共有的通用逻辑.
此外,我们还能够通过这部分通用逻辑的执行结果,来判断是否需要熔断当前的执行链路,以起到所谓的”拦截“效果.

有关 grpc 拦截器的内容,其实和 gin 框架中的 handlersChain 是异曲同工的. 在我之前分享的文章 ”解析 Gin 框架底层原理“ 的第
5 章内容中有作详细介绍,大家不妨引用对比,以此来触类旁通,加深理解.

下面我们看看 grpc 中对于一个拦截器函数的具体定义:

type UnaryServerInterceptor func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (resp interface{}, err error)

其中几个入参的含义分别为:

  • req:业务处理方法的请求参数
  • info:当前所属的业务服务 service
  • handler:真正的业务处理方法

因此一个拦截器函数的使用模式应该是:

var myInterceptor1 = func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
    // 前处理校验
    if err := preLogicCheck();err != nil{
       // 前处理校验不通过,则拦截,不调用业务方法直接返回
       return nil,err 
    }
    
     // 前处理校验通过,正常调用业务方法
     resp, err = handle(ctx,req)
     if err != nil{
         return nil,err 
     }
     
      // 后置处理校验
      if err := postLogicCheck();err != nil{
         // 后置处理校验不通过,则拦截结果,包装错误返回
         return nil,err 
      }
      
      // 正常返回结果
      return resp,nil 
}

4.2 拦截器链

拦截器链

func chainUnaryInterceptors(interceptors []UnaryServerInterceptor) UnaryServerInterceptor {
    return func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (interface{}, error) {
        return interceptors[0](ctx, req, info, getChainUnaryHandler(interceptors, 0, info, handler))
    }
}


func getChainUnaryHandler(interceptors []UnaryServerInterceptor, curr int, info *UnaryServerInfo, finalHandler UnaryHandler) UnaryHandler {
    if curr == len(interceptors)-1 {
        return finalHandler
    }
    return func(ctx context.Context, req interface{}) (interface{}, error) {
        return interceptors[curr+1](ctx, req, info, getChainUnaryHandler(interceptors, curr+1, info, finalHandler))
    }
}

首先,chainUnaryInterceptors 方法会将一系列拦截器 interceptor 成链,并返回首枚interceptor 供
ServerOption 接收设置.

其中,拦截器成链的关键在于 getChainUnaryHandler 方法中,其中会闭包调用拦截器数组的首枚拦截器函数,接下来依次用下一枚拦截器对业务方法
handler 进行包裹,封装成一个新的 ”handler“ 供当前拦截器使用.

拦截器链关系

4.3 操作实践

下面展示一下 grpc 拦截器链的实操例子.

  • 依次声明拦截器1 myInterceptor1 和 拦截器2 myInterceptor2,会在调用业务方法 handler 前后分别打印一行内容

  • 在创建 grpc server 时,将两个拦截器基于 option 注入

  • 通过客户端请求服务端,通过输出日志观察拦截器运行效果

    var myInterceptor1 = func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
    fmt.Printf(“interceptor1 preprocess, req: %+v\n”, req)
    resp, err = handler(ctx, req)
    fmt.Printf(“interceptor1 postprocess, req: %+v\n”, resp)
    return
    }

    var myInterceptor2 = func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
    fmt.Printf(“interceptor2 preprocess, req: %+v\n”, req)
    resp, err = handler(ctx, req)
    fmt.Printf(“interceptor2 postprocess, resp: %+v\n”, resp)
    return
    }

    func (s *Server) SayHello(ctx context.Context, req *proto.HelloReq) (*proto.HelloResp, error) {
    fmt.Println(“core handle logic……”)
    return &proto.HelloResp{
    Reply: fmt.Sprintf(“hello name: %s”, req.Name),
    }, nil
    }

    func main() {
    listener, err := net.Listen(“tcp”, “:8093”)
    if err != nil {
    panic(err)
    }


    server := grpc.NewServer(grpc.ChainUnaryInterceptor(myInterceptor1, myInterceptor2))
    proto.RegisterHelloServiceServer(server, &Server{})


    if err := server.Serve(listener); err != nil {
    panic(err)
    }
    }

拦截器输出


grpc.md
https://abrance.github.io/2024/04/30/domain/network/协议/grpc/
Author
xiaoy
Posted on
April 30, 2024
Licensed under