grpc-go客户端.md

1 客户端代码示例

1.1 main 函数

首先给出 grpc-go 启动客户端的代码示例,核心内容分三块:

  • 调用 grpc.Dial 方法,指定服务端 target,创建 grpc 连接代理对象 ClientConn

  • 调用 proto.NewHelloServiceClient 方法,基于 pb 桩代码构造客户端实例

  • 调用 client.SayHello 方法,真正发起 grpc 请求

    package main

    import (
    “context”
    “fmt”

    “github.com/grpc_demo/proto”

    “google.golang.org/grpc”
    “google.golang.org/grpc/credentials/insecure”
    )
    func main() {
    conn, err := grpc.Dial(“localhost:8093”, grpc.WithTransportCredentials(insecure.NewCredentials()))
    // …
    defer conn.Close()

    client := proto.NewHelloServiceClient(conn)


    resp, err := client.SayHello(context.Background(), &proto.HelloReq{
    Name: “xiaoxuxiansheng”,
    })
    // …


    fmt.Printf(“resp: %+v”, resp)
    }

1.2 proto 文件

对应的 proto 文件定义:

syntax = "proto3"; // 固定语法前缀
option go_package = ".";  // 指定生成的Go代码在你项目中的导入路径
package pb; // 包名


// 定义服务
service HelloService {
    // SayHello 方法
    rpc SayHello (HelloReq) returns (HelloResp) {}
}


// 请求消息
message HelloReq {
    string name = 1;
}


// 响应消息
message HelloResp {
    string reply = 1;
}

2 核心数据结构

下面对 grpc-go 客户端涉及到的几个核心数据结构以及其间的关联做个介绍.

2.1 ClientConn

ClientConn 是广义上的 grpc 连接代理对象,和 grpc 客户端是一对一的关系,内部包含了一个连接池,根据配置可能同时管理多笔连接.
其中几个核心字段包括:

  • target/parsedTarget:对服务端地址信息的封装

  • balancerWrapper:负载均衡器. 初始化时会启动一个守护协程,动态地对 ClientConn 及 Subconn 的状态进行刷新

  • blockingpicker:连接选择器. 在发送请求时,由其最终挑选出使用的 Subconn

  • resolverWrapper:解析器. 负责根据不同的 schema,通过 target 解析出服务端的实际地址

    type ClientConn struct {
    // 连接上下文
    ctx context.Context
    // 上下文终止控制器
    cancel context.CancelFunc


    // 连接的目标地址
    target string
    // 对连接目标地址的封装
    parsedTarget resolver.Target
    // …
    // 连接配置项
    dopts dialOptions
    // 负载均衡器,底层基于 gracefulswitch.balancer
    balancerWrapper *ccBalancerWrapper


    // 连接状态管理器
    csMgr *connectivityStateManager
    // 连接选择器
    blockingpicker *pickerWrapper
    // …
    // 读写互斥锁
    mu sync.RWMutex
    // 解析器
    resolverWrapper *ccResolverWrapper
    // 连接池
    conns map[*addrConn]struct{} // Set to nil on close.
    // …
    }

2.2 ccBalancerWrapper

ccBalancerWrapper是在负载均衡器Balancer的基础上做的封装.
在ccBalancerWrapper被初始化时,会开启一个守护协程,通过监听 updateCh 中到达的事件,对 ClientConn 和 Subconn
的状态进行刷新.

type ccBalancerWrapper struct {
    // 指向所属的 clientConn
    cc *ClientConn


    // 负载均衡器
    balancer        *gracefulswitch.Balancer
    curBalancerName string
  
    // 接收更新事件的 chan
    updateCh *buffer.Unbounded 
    // 接收处理结果的 chan
    resultCh *buffer.Unbounded 
    // ...
}

ccBalancerWrapper 的核心是一个负载均衡器 Balancer 接口,其中包含了几个核心方法:

  • UpdateClientConnState:更新 ClientConn 的连接状态

  • ResolverError:错误后处理

  • UpdateSubConnState:更新子连接 Subconn 状态

  • Close:关闭负载均衡器

    type Balancer interface {
    UpdateClientConnState(ClientConnState) error
    ResolverError(error)
    UpdateSubConnState(SubConn, SubConnState)
    Close()
    }

在默认情况下,grpc客户端框架会为我们提供一个默认的负载均衡器 pickfirstBalancer:

type pickfirstBalancer struct {
    state   connectivity.State
    cc      balancer.ClientConn
    subConn balancer.SubConn
}

与pickfirstBalancer具体的交互流程我们在第3章再作展开.

2.3 ccResolverWrapper

ccResolverWrapper的核心是内置的解析器 resolver.

type ccResolverWrapper struct {
    // 指向所属的 clientConn
    cc         *ClientConn
    resolverMu sync.Mutex
    // 核心成员:内置的解析器
    resolver   resolver.Resolver
    // ...
}

resolver通过Builder构造,对应的Buidler是一个interface,用户也可以提供自己的实现版本:

type Builder interface {
    // 构造解析器 resolver
    Build(target Target, cc ClientConn, opts BuildOptions) (Resolver, error)
    // Scheme returns the scheme supported by this resolver.
    // Scheme is defined at https://github.com/grpc/grpc/blob/master/doc/naming.md.
    Scheme() string
}

resolver本身是一个接口,核心的方法是 ResolveNow:通过 target 解析出实际的客户端地址.

type Resolver interface {
    // 解析 target
    ResolveNow(ResolveNowOptions)
    // 关闭 resolver
    Close()
}

grpc客户端为我们提供了一个默认的resolver:passthroughResolver:

type passthroughResolver struct {
    target resolver.Target
    cc     resolver.ClientConn
}

passthroughResolver在解析 target 时的策略是直接透传不作处理. 具体交互流程见第3章.

2.4 ClientStream

在grpc客户端客户端发起请求时,会首先创建出一个 ClientStream,并依赖其核心方法 SendMsg 和 RecvMsg
进行请求的发送和响应的接受.

type ClientStream interface {
    // 获取元数据
    Header() (metadata.MD, error)
    // 获取上下文
    Context() context.Context
    // 发送消息
    SendMsg(m interface{}) error
    // 接收消息
    RecvMsg(m interface{}) error
}

ClientStream是一个interface,其默认的实现是 clientStream:

type clientStream struct {
    // ...
    cc       *ClientConn
    desc     *StreamDesc


    // 编码模块
    codec baseCodec
    // 压缩模块
    cp    Compressor
    comp  encoding.Compressor
    // ...
    // 上下文
    ctx context.Context 
    // 请求尝试
    attempt *csAttempt
    // ...
}

2.5 csAttempt

csAttempt 代表了一次 grpc 请求尝试,本身是具有生命周期的.

type csAttempt struct {
    ctx        context.Context
    cs         *clientStream
    t          transport.ClientTransport
    s          *transport.Stream
    p          *parser
    pickResult balancer.PickResult


    // 解压模块
    dc        Decompressor
    decomp    encoding.Compressor
    decompSet bool


    // ...
}

3 grpc.Dial

3.1 grpc.Dial

func Dial(target string, opts ...DialOption) (*ClientConn, error) {
    return DialContext(context.Background(), target, opts...)
}
func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *ClientConn, err error) {
    cc := &ClientConn{
        target:            target,
        csMgr:             &connectivityStateManager{},
        conns:             make(map[*addrConn]struct{}),
        dopts:             defaultDialOptions(),
        blockingpicker:    newPickerWrapper(),
        czData:            new(channelzData),
        firstResolveEvent: grpcsync.NewEvent(),
    }
    // ...
    // Determine the resolver to use.
    resolverBuilder, err := cc.parseTargetAndFindResolver()
    // ...
    cc.balancerWrapper = newCCBalancerWrapper(cc, balancer.BuildOptions{
        DialCreds:        credsClone,
        CredsBundle:      cc.dopts.copts.CredsBundle,
        Dialer:           cc.dopts.copts.Dialer,
        Authority:        cc.authority,
        CustomUserAgent:  cc.dopts.copts.UserAgent,
        ChannelzParentID: cc.channelzID,
        Target:           cc.parsedTarget,
    })
    // ...
    rWrapper, err := newCCResolverWrapper(cc, resolverBuilder)
    // ...
    cc.mu.Lock()
    cc.resolverWrapper = rWrapper
    cc.mu.Unlock()
    // ...
    return cc, nil
}

在通过 DialContext 创建 grpc 连接代理 ClientConn 时,核心步骤包括:

  • 创建 ClientConn 实例
  • 调用 ClientConn.parseTargetAndFindResolver 方法,通过 target 中的 schema 获取到对应的解析器构造器 resolverBuilder
  • 调用 newCCBalancerWrapper 方法构造出负载均衡器封装对象 ccBalancerWrapper,在内部会开启守护协程感知和处理 ClientConn 和 Subconn 状态变更的事件
  • 调用 newCCResolverWrapper 方法,内部会调用 resolverBuilder 构造并启动 resolver 实例,同时会通过 ccBalancerWrapper 方法对 ClientConn 的状态进行更新

3.2 ClientConn.parseTargetAndFindResolver

func (cc *ClientConn) parseTargetAndFindResolver() (resolver.Builder, error) {
    // ...
    var rb resolver.Builder
    parsedTarget, err := parseTarget(cc.target)
    
    rb = cc.getResolver(parsedTarget.URL.Scheme)
    if rb != nil {
        cc.parsedTarget = parsedTarget
        return rb, nil
    }
    // ...
}

ClientConn.parseTargetAndFindResolver 方法通过 target 中的 schema,会获取到对应的
resolverBuilder,后续用于构建出能解析出服务端地址的 resolver.

在 grpc-go 的 resolver 包下,会通过一个全局 map 实现 schema 到 resolverBuilder 的映射,同时会暴露出注册方法
Register,供用户自定义实现特定 schema 下的 resolverBuilder 和 resolver 并注入到 map 中.

var (
    m = make(map[string]Builder)
    defaultScheme = "passthrough"
)


func Register(b Builder) {
    m[b.Scheme()] = b
}


func Get(scheme string) Builder {
    if b, ok := m[scheme]; ok {
        return b
    }
    return nil
}

grpc-go 中,默认的 resolverBuilder 和对应的 resolver 是 passthrough 类型,这类 resolver
的解析策略是对 target 直接透传,不作解析处理.

其中,passthroughBuilder.Build 方法中,会执行 passthroughResolver.start
方法一键启动解析器,这部分逻辑我们放到 3.4 小节中,在 passthroughBuilder.Build 方法真正被调用时再作展开.

const scheme = "passthrough"


type passthroughBuilder struct{}


func (*passthroughBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
    // ...
    r := &passthroughResolver{
        target: target,
        cc:     cc,
    }
    r.start()
    return r, nil
}


type passthroughResolver struct {
    target resolver.Target
    cc     resolver.ClientConn
}


func (r *passthroughResolver) start() {
    r.cc.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: r.target.Endpoint()}}})
}


func (*passthroughResolver) ResolveNow(o resolver.ResolveNowOptions){}

3.3 newCCBalancerWrapper

func newCCBalancerWrapper(cc *ClientConn, bopts balancer.BuildOptions) *ccBalancerWrapper {
    ccb := &ccBalancerWrapper{
        cc:       cc,
        updateCh: buffer.NewUnbounded(),
        resultCh: buffer.NewUnbounded(),
        closed:   grpcsync.NewEvent(),
        done:     grpcsync.NewEvent(),
    }
    go ccb.watcher()
    ccb.balancer = gracefulswitch.NewBalancer(ccb, bopts)
    return ccb
}

func (ccb *ccBalancerWrapper) watcher() {
    for {
        select {
        case u := <-ccb.updateCh.Get():
            ccb.updateCh.Load()
            if ccb.closed.HasFired() {
                break
            }
            switch update := u.(type) {
            case *ccStateUpdate:
                ccb.handleClientConnStateChange(update.ccs)
            case *scStateUpdate:
                ccb.handleSubConnStateChange(update)
            case *exitIdleUpdate:
                ccb.handleExitIdle()
            case *resolverErrorUpdate:
                ccb.handleResolverError(update.err)
            case *switchToUpdate:
                ccb.handleSwitchTo(update.name)
            case *subConnUpdate:
                ccb.handleRemoveSubConn(update.acbw)
            default:
                logger.Errorf("ccBalancerWrapper.watcher: unknown update %+v, type %T", update, update)
            }
        case <-ccb.closed.Done():
        }


        // ...
    }
}

newCCBalancerWrapper 方法构造了 ccBalancer 实例,然后调用 ccBalancerWrapper.watcher
方法开启守护协程,分别监听 ClientConn 状态变更(ccStateUpdate)、Subconn
状态变更(scStateUpdate)、设定负载均衡器
balancer(switchToUpdate)、连接移除(subConnUpdate)等事件,并分别进行处理.

在 newCCBalancerWrapper 方法中,还调用了 gracefulswitch.NewBalancer
构造了内置负载均衡器的外壳,但真正的负载均衡器 Balancer 此时还未注入,注入实际会在 3.4 小节,resovler 启动的链路当中.

3.4 newCCResolverWrapper

func newCCResolverWrapper(cc *ClientConn, rb resolver.Builder) (*ccResolverWrapper, error) {
    ccr := &ccResolverWrapper{
        cc:   cc,
        done: grpcsync.NewEvent(),
    }


    // ...


    var err error
    ccr.resolverMu.Lock()
    defer ccr.resolverMu.Unlock()
    ccr.resolver, err = rb.Build(cc.parsedTarget, ccr, rbo)
    // ...
    return ccr, nil
}

newCCResolverWrapper 方法构造了 ccResolverWrapper 实例,但真正的核心逻辑是根据传入的 resolverBuilder
构造器出对应的 resolver 然后注入到 ccResolverWrapper 当中.

grpc-go 客户端默认的 resovlerBuilder 和 resolver 是 passthrough 类型,下面我们再一次展开

passthroughBuilder.Build 方法来看:

func (*passthroughBuilder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
    // ...
    r := &passthroughResolver{
        target: target,
        cc:     cc,
    }
    r.start()
    return r, nil
}

在方法中,构造了一个 passthroughResolver 实例,并在返回前调用了 passthroughResolver.start 方法启动了解析器.

func (r *passthroughResolver) start() {
    r.cc.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: r.target.Endpoint()}}})
}

此处直接透传了用户传入的服务端地址 target.Endpoint,经历了 ccResolverWrapper.UpdateState ->
ClientConn.updateResolverState 的调用链路,然后分别通过 ccBalancerWrapper.switchTo 和
ccBalancerWrapper.updateClientConnState 方法,通过 updateCh 向 ccBalancerWrapper
传递了设定 balancer 和更新 ClientConn 的事件.

func (ccr *ccResolverWrapper) UpdateState(s resolver.State) error {
    // ...
    ccr.curState = s
    if err := ccr.cc.updateResolverState(ccr.curState, nil); err == balancer.ErrBadResolverState {
        return balancer.ErrBadResolverState
    }
    return nil
}
func (cc *ClientConn) updateResolverState(s resolver.State, err error) error {
    // ...
    cc.maybeApplyDefaultServiceConfig(s.Addresses)
    // ...    
    uccsErr := bw.updateClientConnState(&balancer.ClientConnState{ResolverState: s, BalancerConfig: balCfg})
    // ...
}

3.4.1 ccBalancerWrapper.switchTo

func (cc *ClientConn) maybeApplyDefaultServiceConfig(addrs []resolver.Address) {
    // ...
    cc.applyServiceConfigAndBalancer(emptyServiceConfig, &defaultConfigSelector{emptyServiceConfig}, addrs)
}
const PickFirstBalancerName = "pick_first"


func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, configSelector iresolver.ConfigSelector, addrs []resolver.Address) {
    // 倘若未通过 config 设定,则默认的 balancer 是 pickFirst
    newBalancerName = PickFirstBalancerName
    // ...
    cc.balancerWrapper.switchTo(newBalancerName)
}
func (ccb *ccBalancerWrapper) switchTo(name string) {
    ccb.updateCh.Put(&switchToUpdate{name: name})
}

接下来在 ccBalancerWrapper 的守护协程中,一旦接收到 swtichToUpdate 类型的消息后,会执行
ccBalancerWrapper.handleSwitchTo 执行负载均衡器 Balancer 的设定.

func (ccb *ccBalancerWrapper) watcher() {
    for {
        select {
        case u := <-ccb.updateCh.Get():
            ccb.updateCh.Load()
            // ...
            switch update := u.(type) {
            // ...
            case *switchToUpdate:
                ccb.handleSwitchTo(update.name)
            // ...
            }
        // ...
        }
        // ...
    }
}
func (ccb *ccBalancerWrapper) handleSwitchTo(name string) {
    // ...
    builder := balancer.Get(name)
    if builder == nil {
        // ...
        builder = newPickfirstBuilder()
    } 
    if err := ccb.balancer.SwitchTo(builder); err != nil {
        // ...
    }
    ccb.curBalancerName = builder.Name()
}

在 ccBalancerWrapper.handleSwitchTo 方法中,由于传入的 name 为 pick_first,因此会构造出对应的
pickerFirst balancer 并注入到 ccBalancerWrapper 中.

func (gsb *Balancer) SwitchTo(builder balancer.Builder) error {
    // ...
    bw := &balancerWrapper{
        gsb: gsb,
        lastState: balancer.State{
            ConnectivityState: connectivity.Connecting,
            Picker:            base.NewErrPicker(balancer.ErrNoSubConnAvailable),
        },
        subconns: make(map[balancer.SubConn]bool),
    }
    // ...
    if gsb.balancerCurrent == nil {
        gsb.balancerCurrent = bw
    } else {
        gsb.balancerPending = bw
    }
    // ...
    newBalancer := builder.Build(bw, gsb.bOpts)
    // ...
    // ...
    bw.Balancer = newBalancer
    return nil
}

其中 pickerFirst balancer 定义如下:

const PickFirstBalancerName = "pick_first"


type pickfirstBuilder struct{}


func (*pickfirstBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {
    return &pickfirstBalancer{cc: cc}
}


func (*pickfirstBuilder) Name() string {
    return PickFirstBalancerName
}


type pickfirstBalancer struct {
    state   connectivity.State
    cc      balancer.ClientConn
    subConn balancer.SubConn
}

3.4.2 ccBalancerWrapper.updateClientConnState

重新回到 ClientConn.updateResolverState 方法中,在处理完 ccBalancerWrapper.switchTo
分支完成负载均衡器 Balancer 的创建和设置后,会调用ccBalancerWrapper.updateClientConnState 方法对
ClientConn 的状态进行更新.

func (ccb *ccBalancerWrapper) updateClientConnState(ccs *balancer.ClientConnState) error {
    ccb.updateCh.Put(&ccStateUpdate{ccs: ccs})
    // ...
}

ccBalancerWrapper 的守护协程接收到 ccStateUpdate 消息后,会调用
ccBalancerWrapper.handleClientConnStateChange 方法对 ClientConn 的状态进行更新.

func (ccb *ccBalancerWrapper) watcher() {
    for {
        select {
        case u := <-ccb.updateCh.Get():
            ccb.updateCh.Load()
            // ...
            switch update := u.(type) {
            case *ccStateUpdate:
                ccb.handleClientConnStateChange(update.ccs)
            // ...
            }
        // ...
        }
        // ...
    }
}
func (ccb *ccBalancerWrapper) handleClientConnStateChange(ccs *balancer.ClientConnState) {
    // ...
    ccb.resultCh.Put(ccb.balancer.UpdateClientConnState(*ccs))
}
func (gsb *Balancer) UpdateClientConnState(state balancer.ClientConnState) error {
    // ...
    return balToUpdate.UpdateClientConnState(state)
}

此处会一路走到负载均衡器 Balancer 的 UpdateClientConnState 方法中. 对应的 Balancer 为 pickFirst
类型,我们展开对应的方法源码:

func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState) error {
    // ...
    subConn, err := b.cc.NewSubConn(state.ResolverState.Addresses, balancer.NewSubConnOptions{})
    // ...
    b.subConn = subConn
    b.state = connectivity.Idle
    b.cc.UpdateState(balancer.State{
        ConnectivityState: connectivity.Connecting,
        Picker:            &picker{err: balancer.ErrNoSubConnAvailable},
    })
    b.subConn.Connect()
    return nil
}

pickfirstBalancer.UpdateClientConnState 方法的核心步骤包括:

  • 调用 ccBalancerWrapper.NewSubConn 方法创建了子连接 subconn
  • 调用了 ccBalancerWrapper.UpdateState 方法,
  • 调用了 subconn.Connect 方法,建立子连接

(1)ccBalancerWrapper.NewSubConn

func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (balancer.SubConn, error) {
    // ...
    ac, err := ccb.cc.newAddrConn(addrs, opts)
    // ...
    acbw := &acBalancerWrapper{ac: ac, producers: make(map[balancer.ProducerBuilder]*refCountedProducer)}
    acbw.ac.mu.Lock()
    ac.acbw = acbw
    acbw.ac.mu.Unlock()
    return acbw, nil
}

在 ccBalancerWrapper.NewSubConn 方法中,基于 target 的 address 构造了连接 addrConn,并将其封装到
balancer.SubConn 具体实现类 acBalancerWrapper 中进行返回.

type acBalancerWrapper struct {
    mu        sync.Mutex
    ac        *addrConn
    // ...
}
type addrConn struct {
    ctx    context.Context
    cancel context.CancelFunc


    cc     *ClientConn
    dopts  dialOptions
    acbw   balancer.SubConn
    // ...
    transport transport.ClientTransport // The current transport.


    mu      sync.Mutex
    curAddr resolver.Address  
    addrs   []resolver.Address 
    state connectivity.State


    // ...
}
func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (*addrConn, error) {
    ac := &addrConn{
        state:        connectivity.Idle,
        cc:           cc,
        addrs:        addrs,
        scopts:       opts,
        dopts:        cc.dopts,
        czData:       new(channelzData),
        resetBackoff: make(chan struct{}),
    }
    // ...
    cc.conns[ac] = struct{}{}
    return ac, nil
}

在构造 addrConn 实例的过程中会将其添加到 ClientConn 的连接池集合 ClientConn.conns 当中.

(2)ccBalancerWrapper.UpdateState

在 ccBalancerWrapper.UpdateState 方法中,会对连接选择器 picker 进行设定,然后调用
connectivityStateManager.updateState 方法将连接状态设定为 connecting 连接中.

func (ccb *ccBalancerWrapper) UpdateState(s balancer.State) {
    // ...
    ccb.cc.blockingpicker.updatePicker(s.Picker)
    ccb.cc.csMgr.updateState(s.ConnectivityState)
}
func (pw *pickerWrapper) updatePicker(p balancer.Picker) {
    pw.mu.Lock()
    if pw.done {
        pw.mu.Unlock()
        return
    }
    pw.picker = p
    // pw.blockingCh should never be nil.
    close(pw.blockingCh)
    pw.blockingCh = make(chan struct{})
    pw.mu.Unlock()
}

此处设定的连接选择器 picker 的类型默认为 pickFirst picker. 其中内置了唯一一个 pickResult,在 pick
时会固定返回这个结果.

type picker struct {
    result balancer.PickResult
    err    error
}


func (p *picker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
    return p.result, p.err
}

(3)pickfirstBalancer.UpdateClientConnState

最后,pickfirstBalancer.UpdateClientConnState 方法还会调用 acBalancerWrapper.Connect
方法启动子连接. 其中会异步调用 addrConn.connect 方法

func (acbw *acBalancerWrapper) Connect() {
    acbw.mu.Lock()
    defer acbw.mu.Unlock()
    go acbw.ac.connect()
}
func (ac *addrConn) connect() error {
    ac.mu.Lock()
    // ...
    ac.updateConnectivityState(connectivity.Connecting, nil)
    // ...
    ac.resetTransport()
    return nil
}

其中会首先通过 addrConn.updateConnectivityState 暂时先将子连接状态设置为 connecting ,后续
addrConn.resetTransport 方法中,经由 addrConn.tryAllAddrs ->
addrConn.createTransport -> addrConn.startHealthCheck ->
ClientConn.handleSubConnStateChange -> ccBalancerWrapper.updateSubConnState
的方法链路,会向 ccBalancerWrapper.updateCh 中发送一条 scStateUpdate 类型的消息,将子连接 subconn
状态更新为 ready 添加到 picker 的 pickerResult 当中.

func (ac *addrConn) resetTransport() {
    // ...
    addrs := ac.addrs
    // ...
    if err := ac.tryAllAddrs(addrs, connectDeadline); err != nil {
        //  ...
    }
    // ...
}
func (ac *addrConn) tryAllAddrs(addrs []resolver.Address, connectDeadline time.Time) error {
    var firstConnErr error
    for _, addr := range addrs {
        // ...
        err := ac.createTransport(addr, copts, connectDeadline)
        if err == nil {
            return nil
        }
        // ...
    }
    // ...
}
func (ac *addrConn) createTransport(addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) error {
    addr.ServerName = ac.cc.getServerName(addr)
    hctx, hcancel := context.WithCancel(ac.ctx)
    // ...


    newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, addr, copts, onClose)
    // ...
    ac.curAddr = addr
    ac.transport = newTr
    ac.startHealthCheck(hctx) 
    return nil
}
func (ac *addrConn) startHealthCheck(ctx context.Context) {
    var healthcheckManagingState bool
    defer func() {
        if !healthcheckManagingState {
            ac.updateConnectivityState(connectivity.Ready, nil)
        }
    }()
    // ...
    healthCheckFunc := ac.cc.dopts.healthCheckFunc
    if healthCheckFunc == nil {
        // ...
        return
    }


    // ...
}
func (ac *addrConn) updateConnectivityState(s connectivity.State, lastErr error) {
    // ...
    ac.state = s
    // ...
    ac.cc.handleSubConnStateChange(ac.acbw, s, lastErr)
}
func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivity.State, err error) {
    cc.balancerWrapper.updateSubConnState(sc, s, err)
}
func (ccb *ccBalancerWrapper) updateSubConnState(sc balancer.SubConn, s connectivity.State, err error) {
    // ...
    ccb.updateCh.Put(&scStateUpdate{
        sc:    sc,
        state: s,
        err:   err,
    })
}

ccBalancerWrapper 的守护协程中,监听到 scStateUpdate 事件到达后,会经历
ccBalancerWrapper.handleSubConnStateChange ->
gracefulswitch.Balancer.UpdateSubConnState ->
pickfirstBalancer.UpdateSubConnState -> ccBalancerWrapper.UpdateState
的方法链路,完成对 pickfirstPicker 中 pickerResult 的更新.

func (ccb *ccBalancerWrapper) watcher() {
    for {
        select {
        case u := <-ccb.updateCh.Get():
            ccb.updateCh.Load()
            // ...
            switch update := u.(type) {
            // ...
            case *scStateUpdate:
                ccb.handleSubConnStateChange(update)
            // ...
            }
        case <-ccb.closed.Done():
        }
        // ...
    }
}
func (ccb *ccBalancerWrapper) handleSubConnStateChange(update *scStateUpdate) {
    ccb.balancer.UpdateSubConnState(update.sc, balancer.SubConnState{ConnectivityState: update.state, ConnectionError: update.err})
}
func (gsb *Balancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
    // ...
    balToUpdate.UpdateSubConnState(sc, state)
}
func (b *pickfirstBalancer) UpdateSubConnState(subConn balancer.SubConn, state balancer.SubConnState) {
    // ...


    switch state.ConnectivityState {
    case connectivity.Ready:
        b.cc.UpdateState(balancer.State{
            ConnectivityState: state.ConnectivityState,
            Picker:            &picker{result: balancer.PickResult{SubConn: subConn}},
        })
    case connectivity.Connecting:
        b.cc.UpdateState(balancer.State{
            ConnectivityState: state.ConnectivityState,
            Picker:            &picker{err: balancer.ErrNoSubConnAvailable},
        })
    // ...
    }
}
func (ccb *ccBalancerWrapper) UpdateState(s balancer.State) {
    // ...
    ccb.cc.blockingpicker.updatePicker(s.Picker)
    ccb.cc.csMgr.updateState(s.ConnectivityState)
}

到这里为止,通过 grpc.Dial 方法构造 ClientConn 对象,为客户端请求进行前置准备的主线链路就已经梳理完毕.

4 grpc.Invoke

4.1 ClientConn.Invoke

在grpc-go客户端通过pb桩代码发起请求时,内部会调用 ClientConn.Invoke 方法,核心步骤分为三部分:

  • 调用 newClientStream 方法构造一个 clientStream 用于与服务端的通信交互

  • 调用 clientStream.SendMsg 方法发送请求

  • 调用 clientStream.RecvMsg 方法接收响应

    func (c *helloServiceClient) SayHello(ctx context.Context, in *HelloReq, opts …grpc.CallOption) (*HelloResp, error) {
    out := new(HelloResp)
    err := c.cc.Invoke(ctx, “/pb.HelloService/SayHello”, in, out, opts…)
    // …
    return out, nil
    }
    func (cc *ClientConn) Invoke(ctx context.Context, method string, args, reply interface{}, opts …CallOption) error {
    // …
    return invoke(ctx, method, args, reply, cc, opts…)
    }
    func invoke(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts …CallOption) error {
    cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts…)
    // …
    if err := cs.SendMsg(req); err != nil {
    return err
    }
    return cs.RecvMsg(reply)
    }

4.2 newClientStream

4.2.1 newClientStreamWithParams

func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {
    // ...
    var newStream = func(ctx context.Context, done func()) (iresolver.ClientStream, error) {
        return newClientStreamWithParams(ctx, desc, cc, method, mc, onCommit, done, opts...)
    }
    // ...
    return newStream(ctx, func() {})
}
func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, mc serviceconfig.MethodConfig, onCommit, doneFunc func(), opts ...CallOption) (_ iresolver.ClientStream, err error) {
    // ...
    // 初始化压缩模块
    var cp Compressor
    // ...
    
    cs := &clientStream{
        callHdr:      callHdr,
        ctx:          ctx,
        methodConfig: &mc,
        opts:         opts,
        callInfo:     c,
        cc:           cc,
        desc:         desc,
        codec:        c.codec,
        cp:           cp,
        comp:         comp,
        cancel:       cancel,
        firstAttempt: true,
        onCommit:     onCommit,
    }
      
    // ...
    // 获取 csAttempt 的闭包函数
    op := func(a *csAttempt) error {
        if err := a.getTransport(); err != nil {
            return err
        }
        if err := a.newStream(); err != nil {
            return err
        }
        // ...
        cs.attempt = a
        return nil
    }
    if err := cs.withRetry(op, func() { cs.bufferForRetryLocked(0, op) }); err != nil {
        return nil, err
    }


    // ...
    return cs, nil
}

4.2.2 clientStream.withRetry

在 clientStream.withRetry 方法中,会针对 op 方法进行重试,直到处理成功或者返回的错误类型为 io.EOF.

func (cs *clientStream) withRetry(op func(a *csAttempt) error, onSuccess func()) error {
    // ...
    for {
        // ...
        err := op(a)
        // ...
        if err == io.EOF {
            <-a.s.Done()
        }
        // 处理完成
        if err == nil || (err == io.EOF && a.s.Status().Code() == codes.OK) {
            onSuccess()
            cs.mu.Unlock()
            return err
        }
        // ...
    }
}

4.2.3 csAttempt.getTransport

在 csAttempt.getTransport 方法链路中,最终会通过 pickFirstPicker.pick 方法获取到对应的
pickResult,拿到用于请求的的子连接 subconn.

此处呼应了本文 3.4.2 小节第(3)部分,在构建 resolver 过程中,就提前准备并添加到 pickFirstPicker 中的
pickResult.

在获取到子连接 subconn 后,会调用其中的 addrConn.getReadyTranport 获取到通信器 transport.
这部分在后续单开的通信篇再作展开.

func (a *csAttempt) getTransport() error {
    cs := a.cs


    var err error
    a.t, a.pickResult, err = cs.cc.getTransport(a.ctx, cs.callInfo.failFast, cs.callHdr.Method)
    // ...
    return nil
}
func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, balancer.PickResult, error) {
    return cc.blockingpicker.pick(ctx, failfast, balancer.PickInfo{
        Ctx:            ctx,
        FullMethodName: method,
    })
}
func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.PickInfo) (transport.ClientTransport, balancer.PickResult, error) {
    for {
        // ...
        ch = pw.blockingCh
        p := pw.picker
        pw.mu.Unlock()


        pickResult, err := p.Pick(info)
        // ...


        acw, ok := pickResult.SubConn.(*acBalancerWrapper)
        // ...
        if t := acw.getAddrConn().getReadyTransport(); t != nil {
            // ...
            return t, pickResult, nil
        }
        // ...
        
    }
}
func (p *picker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
    return p.result, p.err
}

4.2.4 csAttempt.newStream

在 csAttempt.newStream 方法中,会通过 ClientTransport.NewStream 方法创建一个 rpc stream
用于后续的请求通信. 这部分放在后续单开的通信篇中再作展开.

func (a *csAttempt) newStream() error {
    cs := a.cs
    // ...
    // 构造一个 grpc stream
    s, err := a.t.NewStream(a.ctx, cs.callHdr)
    // ...
    a.s = s
    a.p = &parser{r: s}
    return nil
}

4.3 clientStream.SendMsg

发送请求是基于 clientStream.SendMsg -> csAttempt.ClientTransport.sendMsg
来执行的,有关通信模块的细节,我们后续单开一个通信篇再作展开.

func (cs *clientStream) SendMsg(m interface{}) (err error) {
    // ...
    // 消息前处理,包括编码、压缩等细节
    hdr, payload, data, err := prepareMsg(m, cs.codec, cs.cp, cs.comp)
    // ...
    // 通过 csAttempt 发送请求
    op := func(a *csAttempt) error {
        return a.sendMsg(m, hdr, payload, data)
    }
    err = cs.withRetry(op, func() { cs.bufferForRetryLocked(len(hdr)+len(payload), op) })
    // ...
    return err
}
func (a *csAttempt) sendMsg(m interface{}, hdr, payld, data []byte) error {
    cs := a.cs
    // ...
    if err := a.t.Write(a.s, hdr, payld, &transport.Options{Last: !cs.desc.ClientStreams}); err != nil {
        // ...
    }
    // ...
    return nil
}

4.4 clientStream.RecvMsg

grpc-go客户端接收来自服务端的响应参数时,基于 clientStream.RecvMsg -> csAttempt.recvMsg -> recv
的方法链路完成,recv 方法内部会先通过 recvAndDecompress 方法接收响应并进行解压,然后调用 baseCodec.Unmarshal
方法,遵循特定的协议对响应进行反序列化.

这部分通信和编码相关的细节,我们后续单开通信篇章再作展开.

func (cs *clientStream) RecvMsg(m interface{}) error {
    // 通过 csAttempt 接收响应
    err := cs.withRetry(func(a *csAttempt) error {
        return a.recvMsg(m, recvInfo)
    }, cs.commitAttemptLocked)
    // ...
    return err
}
func (a *csAttempt) recvMsg(m interface{}, payInfo *payloadInfo) (err error) {
    cs := a.cs
    // ...
    err = recv(a.p, cs.codec, a.s, a.dc, m, *cs.callInfo.maxReceiveMessageSize, payInfo, a.decomp)
    // ...
}
func recv(p *parser, c baseCodec, s *transport.Stream, dc Decompressor, m interface{}, maxReceiveMessageSize int, payInfo *payloadInfo, compressor encoding.Compressor) error {
    d, err := recvAndDecompress(p, s, dc, maxReceiveMessageSize, payInfo, compressor)
    // ...
    if err := c.Unmarshal(d, m); err != nil {
        return status.Errorf(codes.Internal, "grpc: failed to unmarshal the received message: %v", err)
    }
    // ...
}

5 小结

本文从grpc-go客户端视角出发,沿着 grpc.Dial 和 grpc.Invoke
两条主线进行了源码走读,整体来说代码量偏大,分析的部分较少,更多是在梳理客户端的主流程框架,为后续grpc-go通信篇的展开打好基础.


grpc-go客户端.md
https://abrance.github.io/2024/05/11/mdstorage/domain/network/协议/grpc-go客户端/
Author
xiaoy
Posted on
May 11, 2024
Licensed under