From b3cd8a32edfc58ddd62c0cbd559fbb887e0d77b0 Mon Sep 17 00:00:00 2001 From: Kevin Wan Date: Tue, 30 Apr 2024 19:01:20 +0800 Subject: [PATCH] feat: trigger breaker on underlying service timeout (#4112) --- .../serverinterceptors/breakerinterceptor.go | 35 ++++++++++++---- .../breakerinterceptor_test.go | 42 +++++++++++++++++++ 2 files changed, 70 insertions(+), 7 deletions(-) diff --git a/zrpc/internal/serverinterceptors/breakerinterceptor.go b/zrpc/internal/serverinterceptors/breakerinterceptor.go index 7d64561b5..3be0aba14 100644 --- a/zrpc/internal/serverinterceptors/breakerinterceptor.go +++ b/zrpc/internal/serverinterceptors/breakerinterceptor.go @@ -5,6 +5,7 @@ import ( "errors" "github.com/zeromicro/go-zero/core/breaker" + "github.com/zeromicro/go-zero/core/errorx" "github.com/zeromicro/go-zero/zrpc/internal/codes" "google.golang.org/grpc" gcodes "google.golang.org/grpc/codes" @@ -13,11 +14,13 @@ import ( // StreamBreakerInterceptor is an interceptor that acts as a circuit breaker. func StreamBreakerInterceptor(svr any, stream grpc.ServerStream, info *grpc.StreamServerInfo, - handler grpc.StreamHandler) (err error) { + handler grpc.StreamHandler) error { breakerName := info.FullMethod - return breaker.DoWithAcceptable(breakerName, func() error { + err := breaker.DoWithAcceptable(breakerName, func() error { return handler(svr, stream) - }, codes.Acceptable) + }, serverSideAcceptable) + + return convertError(err) } // UnaryBreakerInterceptor is an interceptor that acts as a circuit breaker. @@ -28,10 +31,28 @@ func UnaryBreakerInterceptor(ctx context.Context, req any, info *grpc.UnaryServe var err error resp, err = handler(ctx, req) return err - }, codes.Acceptable) - if errors.Is(err, breaker.ErrServiceUnavailable) { - err = status.Error(gcodes.Unavailable, err.Error()) + }, serverSideAcceptable) + + return resp, convertError(err) +} + +func convertError(err error) error { + if err == nil { + return nil } - return resp, err + // we don't convert context.DeadlineExceeded to status error, + // because grpc will convert it and return to the client. + if errors.Is(err, breaker.ErrServiceUnavailable) { + return status.Error(gcodes.Unavailable, err.Error()) + } + + return err +} + +func serverSideAcceptable(err error) bool { + if errorx.In(err, context.DeadlineExceeded, breaker.ErrServiceUnavailable) { + return false + } + return codes.Acceptable(err) } diff --git a/zrpc/internal/serverinterceptors/breakerinterceptor_test.go b/zrpc/internal/serverinterceptors/breakerinterceptor_test.go index 58662de26..344709bc4 100644 --- a/zrpc/internal/serverinterceptors/breakerinterceptor_test.go +++ b/zrpc/internal/serverinterceptors/breakerinterceptor_test.go @@ -29,6 +29,15 @@ func TestUnaryBreakerInterceptor(t *testing.T) { assert.NotNil(t, err) } +func TestUnaryBreakerInterceptorOK(t *testing.T) { + _, err := UnaryBreakerInterceptor(context.Background(), nil, &grpc.UnaryServerInfo{ + FullMethod: "any", + }, func(_ context.Context, _ any) (any, error) { + return nil, nil + }) + assert.NoError(t, err) +} + func TestUnaryBreakerInterceptor_Unavailable(t *testing.T) { _, err := UnaryBreakerInterceptor(context.Background(), nil, &grpc.UnaryServerInfo{ FullMethod: "any", @@ -36,4 +45,37 @@ func TestUnaryBreakerInterceptor_Unavailable(t *testing.T) { return nil, breaker.ErrServiceUnavailable }) assert.NotNil(t, err) + code := status.Code(err) + assert.Equal(t, codes.Unavailable, code) +} + +func TestUnaryBreakerInterceptor_Deadline(t *testing.T) { + for i := 0; i < 1000; i++ { + _, err := UnaryBreakerInterceptor(context.Background(), nil, &grpc.UnaryServerInfo{ + FullMethod: "any", + }, func(_ context.Context, _ any) (any, error) { + return nil, context.DeadlineExceeded + }) + switch status.Code(err) { + case codes.Unavailable: + default: + assert.Equal(t, context.DeadlineExceeded, err) + } + } + + var dropped bool + for i := 0; i < 100; i++ { + _, err := UnaryBreakerInterceptor(context.Background(), nil, &grpc.UnaryServerInfo{ + FullMethod: "any", + }, func(_ context.Context, _ any) (any, error) { + return nil, context.DeadlineExceeded + }) + switch status.Code(err) { + case codes.Unavailable: + dropped = true + default: + assert.Equal(t, context.DeadlineExceeded, err) + } + } + assert.True(t, dropped) }