Correct way to perform a reconnect with gRPC client

ghz 1years ago ⋅ 7360 views

Question

I have a Go gRPC client connected to a gRPC server running in a different pod in my k8s cluster.

It's working well, receiving and processing requests.

I am now wondering how best to implement resiliency in the event that the gRPC server pod gets recycled.

As far as I can ascertain, the clientconn.go code should handle the reconnection automatically, but I just cannot get it to work and I fear my implementation is incorrect in the first instance.

Calling code from main:

go func() {     
        if err := gRPCClient.ProcessRequests(); err != nil {
            log.Error("Error while processing Requests")
            //do something here??
        }
    }()

My code in the gRPCClient wrapper module:

func (grpcclient *gRPCClient) ProcessRequests() error {
    defer grpcclient.Close()    

    for {
        request, err := reqclient.stream.Recv()
        log.Info("Request received")
        if err == io.EOF {          
            break
        }
        if err != nil {
            //when pod is recycled, this is what's hit with err:
            //rpc error: code = Unavailable desc = transport is closing"

            //what is the correct pattern for recovery here so that we can await connection
            //and continue processing requests once more?
            //should I return err here and somehow restart the ProcessRequests() go routine in the 
            //main funcition?
            break
            
        } else {
            //the happy path
            //code block to process any requests that are received
        }
    }

    return nil
}

func (reqclient *RequestClient) Close() {
//this is called soon after the conneciton drops
        reqclient.conn.Close()
}

EDIT: Emin Laletovic answered my question elegantly below and gets it most of the way there. I had to make a few changes to the waitUntilReady function:

func (grpcclient *gRPCClient) waitUntilReady() bool {
ctx, cancel := context.WithTimeout(context.Background(), 300*time.Second) //define how long you want to wait for connection to be restored before giving up
defer cancel()

currentState := grpcclient.conn.GetState()
stillConnecting := true

for currentState != connectivity.Ready && stillConnecting {
    //will return true when state has changed from thisState, false if timeout
    stillConnecting = grpcclient.conn.WaitForStateChange(ctx, currentState)
    currentState = grpcclient.conn.GetState()
    log.WithFields(log.Fields{"state: ": currentState, "timeout": timeoutDuration}).Info("Attempting reconnection. State has changed to:")
}

if stillConnecting == false {
    log.Error("Connection attempt has timed out.")
    return false
}

return true
}

Answer

The RPC connection is being handled automatically by clientconn.go, but that doesn't mean the stream is also automatically handled.

The stream, once broken, whether by the RPC connection breaking down or some other reason, cannot reconnect automatically, and you need to get a new stream from the server once the RPC connection is back up.

The pseudo-code for waiting the RPC connection to be in the READY state and establishing a new stream might look something like this:

func (grpcclient *gRPCClient) ProcessRequests() error {
    defer grpcclient.Close()    
    
    go grpcclient.process()
    for {
      select {
        case <- grpcclient.reconnect:
           if !grpcclient.waitUntilReady() {
             return errors.New("failed to establish a connection within the defined timeout")
           }
           go grpcclient.process()
        case <- grpcclient.done:
          return nil
      }
    }
}

func (grpcclient *gRPCClient) process() {
    reqclient := GetStream() //always get a new stream
    for {
        request, err := reqclient.stream.Recv()
        log.Info("Request received")
        if err == io.EOF {          
            grpcclient.done <- true
            return
        }
        if err != nil {
            grpcclient.reconnect <- true
            return
            
        } else {
            //the happy path
            //code block to process any requests that are received
        }
    }
}

func (grpcclient *gRPCClient) waitUntilReady() bool {
  ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) //define how long you want to wait for connection to be restored before giving up
  defer cancel()
  return grpcclient.conn.WaitForStateChange(ctx, conectivity.Ready)
}

EDIT:

Revisiting the code above, a couple of mistakes should be corrected. The WaitForStateChange function waits for the connection state to change from the passed state, it doesn't wait for the connection to change into the passed state.

It is better to track the current state of the connection and use the Connect function to connect if the channel is idle.

func (grpcclient *gRPCClient) ProcessRequests() error {
        defer grpcclient.Close()    
        
        go grpcclient.process()
        for {
          select {
            case <- grpcclient.reconnect:
               if !grpcclient.isReconnected(1*time.Second, 60*time.Second) {
                 return errors.New("failed to establish a connection within the defined timeout")
               }
               go grpcclient.process()
            case <- grpcclient.done:
              return nil
          }
        }
}

func (grpcclient *gRPCClient) isReconnected(check, timeout time.Duration) bool {
  ctx, cancel := context.context.WithTimeout(context.Background(), timeout)
  defer cancel()
  ticker := time.NewTicker(check)

  for{
    select {
      case <- ticker.C:
        grpcclient.conn.Connect()
 
        if grpcclient.conn.GetState() == connectivity.Ready {
          return true
        }
      case <- ctx.Done():
         return false
    }
  }
}