diff --git a/core/breaker/googlebreaker.go b/core/breaker/googlebreaker.go index f48464267..11b7c4563 100644 --- a/core/breaker/googlebreaker.go +++ b/core/breaker/googlebreaker.go @@ -5,6 +5,8 @@ import ( "github.com/zeromicro/go-zero/core/collection" "github.com/zeromicro/go-zero/core/mathx" + "github.com/zeromicro/go-zero/core/syncx" + "github.com/zeromicro/go-zero/core/timex" ) const ( @@ -13,9 +15,10 @@ const ( buckets = 40 maxFailBucketsToDecreaseK = 30 minBucketsToSpeedUp = 3 + forcePassDuration = time.Second k = 1.5 minK = 1.1 - recoveryK = 3 - k + recoveryK = 4 - k protection = 5 ) @@ -23,9 +26,10 @@ const ( // see Client-Side Throttling section in https://landing.google.com/sre/sre-book/chapters/handling-overload/ type ( googleBreaker struct { - k float64 - stat *collection.RollingWindow[int64, *bucket] - proba *mathx.Proba + k float64 + stat *collection.RollingWindow[int64, *bucket] + proba *mathx.Proba + lastPass *syncx.AtomicDuration } windowResult struct { @@ -42,9 +46,10 @@ func newGoogleBreaker() *googleBreaker { return new(bucket) }, buckets, bucketDuration) return &googleBreaker{ - stat: st, - k: k, - proba: mathx.NewProba(), + stat: st, + k: k, + proba: mathx.NewProba(), + lastPass: syncx.NewAtomicDuration(), } } @@ -65,6 +70,12 @@ func (b *googleBreaker) accept() error { return nil } + lastPass := b.lastPass.Load() + if lastPass > 0 && timex.Since(lastPass) > forcePassDuration { + b.lastPass.Set(timex.Now()) + return nil + } + // If we have more than 2 working buckets, we are in recovery mode, // the latest bucket is the current one, so we ignore it. if history.workingBuckets >= minBucketsToSpeedUp { @@ -75,6 +86,8 @@ func (b *googleBreaker) accept() error { return ErrServiceUnavailable } + b.lastPass.Set(timex.Now()) + return nil } @@ -140,10 +153,10 @@ func (b *googleBreaker) history() windowResult { } else if b.Success > 0 { result.workingBuckets++ } - if b.Drop > 0 && b.Failure > 0 { - result.failingBuckets++ - } else { + if b.Success > 0 { result.failingBuckets = 0 + } else if b.Failure > 0 { + result.failingBuckets++ } }) diff --git a/core/breaker/googlebreaker_test.go b/core/breaker/googlebreaker_test.go index cbd7e6d9e..a7df29345 100644 --- a/core/breaker/googlebreaker_test.go +++ b/core/breaker/googlebreaker_test.go @@ -10,6 +10,7 @@ import ( "github.com/zeromicro/go-zero/core/collection" "github.com/zeromicro/go-zero/core/mathx" "github.com/zeromicro/go-zero/core/stat" + "github.com/zeromicro/go-zero/core/syncx" ) const ( @@ -26,9 +27,10 @@ func getGoogleBreaker() *googleBreaker { return new(bucket) }, testBuckets, testInterval) return &googleBreaker{ - stat: st, - k: 5, - proba: mathx.NewProba(), + stat: st, + k: 5, + proba: mathx.NewProba(), + lastPass: syncx.NewAtomicDuration(), } } @@ -70,9 +72,10 @@ func TestGoogleBreakerRecover(t *testing.T) { return new(bucket) }, testBuckets*2, testInterval) b := &googleBreaker{ - stat: st, - k: k, - proba: mathx.NewProba(), + stat: st, + k: k, + proba: mathx.NewProba(), + lastPass: syncx.NewAtomicDuration(), } for i := 0; i < testBuckets; i++ { for j := 0; j < 100; j++ { @@ -117,6 +120,43 @@ func TestGoogleBreakerReject(t *testing.T) { }, nil, defaultAcceptable)) } +func TestGoogleBreakerMoreFallingBuckets(t *testing.T) { + t.Parallel() + + t.Run("more falling buckets", func(t *testing.T) { + b := getGoogleBreaker() + + func() { + stopChan := time.After(testInterval * minBucketsToSpeedUp * 2) + for { + time.Sleep(time.Millisecond) + select { + case <-stopChan: + return + default: + assert.Error(t, b.doReq(func() error { + return errors.New("foo") + }, func(err error) error { + return err + }, func(err error) bool { + return err == nil + })) + } + } + }() + + var count int + for i := 0; i < 100; i++ { + if errors.Is(b.doReq(func() error { + return ErrServiceUnavailable + }, nil, defaultAcceptable), ErrServiceUnavailable) { + count++ + } + } + assert.True(t, count > 90) + }) +} + func TestGoogleBreakerAcceptable(t *testing.T) { b := getGoogleBreaker() errAcceptable := errors.New("any")