Question
I have a Go gRPC client connected to a gRPC server running in a different pod in my k8s cluster.
It's working well, receiving and processing requests.
I am now wondering how best to implement resiliency in the event that the gRPC server pod gets recycled.
As far as I can ascertain, the clientconn.go code should handle the reconnection automatically, but I just cannot get it to work and I fear my implementation is incorrect in the first instance.
Calling code from main:
go func() {
if err := gRPCClient.ProcessRequests(); err != nil {
log.Error("Error while processing Requests")
//do something here??
}
}()
My code in the gRPCClient wrapper module:
func (grpcclient *gRPCClient) ProcessRequests() error {
defer grpcclient.Close()
for {
request, err := reqclient.stream.Recv()
log.Info("Request received")
if err == io.EOF {
break
}
if err != nil {
//when pod is recycled, this is what's hit with err:
//rpc error: code = Unavailable desc = transport is closing"
//what is the correct pattern for recovery here so that we can await connection
//and continue processing requests once more?
//should I return err here and somehow restart the ProcessRequests() go routine in the
//main funcition?
break
} else {
//the happy path
//code block to process any requests that are received
}
}
return nil
}
func (reqclient *RequestClient) Close() {
//this is called soon after the conneciton drops
reqclient.conn.Close()
}
EDIT: Emin Laletovic answered my question elegantly below and gets it most of the way there. I had to make a few changes to the waitUntilReady function:
func (grpcclient *gRPCClient) waitUntilReady() bool {
ctx, cancel := context.WithTimeout(context.Background(), 300*time.Second) //define how long you want to wait for connection to be restored before giving up
defer cancel()
currentState := grpcclient.conn.GetState()
stillConnecting := true
for currentState != connectivity.Ready && stillConnecting {
//will return true when state has changed from thisState, false if timeout
stillConnecting = grpcclient.conn.WaitForStateChange(ctx, currentState)
currentState = grpcclient.conn.GetState()
log.WithFields(log.Fields{"state: ": currentState, "timeout": timeoutDuration}).Info("Attempting reconnection. State has changed to:")
}
if stillConnecting == false {
log.Error("Connection attempt has timed out.")
return false
}
return true
}
Answer
The RPC connection is being handled automatically by clientconn.go
, but that
doesn't mean the stream is also automatically handled.
The stream, once broken, whether by the RPC connection breaking down or some other reason, cannot reconnect automatically, and you need to get a new stream from the server once the RPC connection is back up.
The pseudo-code for waiting the RPC connection to be in the READY
state and
establishing a new stream might look something like this:
func (grpcclient *gRPCClient) ProcessRequests() error {
defer grpcclient.Close()
go grpcclient.process()
for {
select {
case <- grpcclient.reconnect:
if !grpcclient.waitUntilReady() {
return errors.New("failed to establish a connection within the defined timeout")
}
go grpcclient.process()
case <- grpcclient.done:
return nil
}
}
}
func (grpcclient *gRPCClient) process() {
reqclient := GetStream() //always get a new stream
for {
request, err := reqclient.stream.Recv()
log.Info("Request received")
if err == io.EOF {
grpcclient.done <- true
return
}
if err != nil {
grpcclient.reconnect <- true
return
} else {
//the happy path
//code block to process any requests that are received
}
}
}
func (grpcclient *gRPCClient) waitUntilReady() bool {
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) //define how long you want to wait for connection to be restored before giving up
defer cancel()
return grpcclient.conn.WaitForStateChange(ctx, conectivity.Ready)
}
EDIT:
Revisiting the code above, a couple of mistakes should be corrected. The
WaitForStateChange
function waits for the connection state to change from
the passed state, it doesn't wait for the connection to change into the passed
state.
It is better to track the current state of the connection and use the
Connect
function to connect if the channel is idle.
func (grpcclient *gRPCClient) ProcessRequests() error {
defer grpcclient.Close()
go grpcclient.process()
for {
select {
case <- grpcclient.reconnect:
if !grpcclient.isReconnected(1*time.Second, 60*time.Second) {
return errors.New("failed to establish a connection within the defined timeout")
}
go grpcclient.process()
case <- grpcclient.done:
return nil
}
}
}
func (grpcclient *gRPCClient) isReconnected(check, timeout time.Duration) bool {
ctx, cancel := context.context.WithTimeout(context.Background(), timeout)
defer cancel()
ticker := time.NewTicker(check)
for{
select {
case <- ticker.C:
grpcclient.conn.Connect()
if grpcclient.conn.GetState() == connectivity.Ready {
return true
}
case <- ctx.Done():
return false
}
}
}