optimize: improve breaker algorithm on recovery time (#4141)

This commit is contained in:
Kevin Wan
2024-05-11 21:44:26 +08:00
committed by GitHub
parent 7c730b97d8
commit 42b3bae65a
2 changed files with 69 additions and 16 deletions

View File

@@ -5,6 +5,8 @@ import (
"github.com/zeromicro/go-zero/core/collection" "github.com/zeromicro/go-zero/core/collection"
"github.com/zeromicro/go-zero/core/mathx" "github.com/zeromicro/go-zero/core/mathx"
"github.com/zeromicro/go-zero/core/syncx"
"github.com/zeromicro/go-zero/core/timex"
) )
const ( const (
@@ -13,9 +15,10 @@ const (
buckets = 40 buckets = 40
maxFailBucketsToDecreaseK = 30 maxFailBucketsToDecreaseK = 30
minBucketsToSpeedUp = 3 minBucketsToSpeedUp = 3
forcePassDuration = time.Second
k = 1.5 k = 1.5
minK = 1.1 minK = 1.1
recoveryK = 3 - k recoveryK = 4 - k
protection = 5 protection = 5
) )
@@ -23,9 +26,10 @@ const (
// see Client-Side Throttling section in https://landing.google.com/sre/sre-book/chapters/handling-overload/ // see Client-Side Throttling section in https://landing.google.com/sre/sre-book/chapters/handling-overload/
type ( type (
googleBreaker struct { googleBreaker struct {
k float64 k float64
stat *collection.RollingWindow[int64, *bucket] stat *collection.RollingWindow[int64, *bucket]
proba *mathx.Proba proba *mathx.Proba
lastPass *syncx.AtomicDuration
} }
windowResult struct { windowResult struct {
@@ -42,9 +46,10 @@ func newGoogleBreaker() *googleBreaker {
return new(bucket) return new(bucket)
}, buckets, bucketDuration) }, buckets, bucketDuration)
return &googleBreaker{ return &googleBreaker{
stat: st, stat: st,
k: k, k: k,
proba: mathx.NewProba(), proba: mathx.NewProba(),
lastPass: syncx.NewAtomicDuration(),
} }
} }
@@ -65,6 +70,12 @@ func (b *googleBreaker) accept() error {
return nil return nil
} }
lastPass := b.lastPass.Load()
if lastPass > 0 && timex.Since(lastPass) > forcePassDuration {
b.lastPass.Set(timex.Now())
return nil
}
// If we have more than 2 working buckets, we are in recovery mode, // If we have more than 2 working buckets, we are in recovery mode,
// the latest bucket is the current one, so we ignore it. // the latest bucket is the current one, so we ignore it.
if history.workingBuckets >= minBucketsToSpeedUp { if history.workingBuckets >= minBucketsToSpeedUp {
@@ -75,6 +86,8 @@ func (b *googleBreaker) accept() error {
return ErrServiceUnavailable return ErrServiceUnavailable
} }
b.lastPass.Set(timex.Now())
return nil return nil
} }
@@ -140,10 +153,10 @@ func (b *googleBreaker) history() windowResult {
} else if b.Success > 0 { } else if b.Success > 0 {
result.workingBuckets++ result.workingBuckets++
} }
if b.Drop > 0 && b.Failure > 0 { if b.Success > 0 {
result.failingBuckets++
} else {
result.failingBuckets = 0 result.failingBuckets = 0
} else if b.Failure > 0 {
result.failingBuckets++
} }
}) })

View File

@@ -10,6 +10,7 @@ import (
"github.com/zeromicro/go-zero/core/collection" "github.com/zeromicro/go-zero/core/collection"
"github.com/zeromicro/go-zero/core/mathx" "github.com/zeromicro/go-zero/core/mathx"
"github.com/zeromicro/go-zero/core/stat" "github.com/zeromicro/go-zero/core/stat"
"github.com/zeromicro/go-zero/core/syncx"
) )
const ( const (
@@ -26,9 +27,10 @@ func getGoogleBreaker() *googleBreaker {
return new(bucket) return new(bucket)
}, testBuckets, testInterval) }, testBuckets, testInterval)
return &googleBreaker{ return &googleBreaker{
stat: st, stat: st,
k: 5, k: 5,
proba: mathx.NewProba(), proba: mathx.NewProba(),
lastPass: syncx.NewAtomicDuration(),
} }
} }
@@ -70,9 +72,10 @@ func TestGoogleBreakerRecover(t *testing.T) {
return new(bucket) return new(bucket)
}, testBuckets*2, testInterval) }, testBuckets*2, testInterval)
b := &googleBreaker{ b := &googleBreaker{
stat: st, stat: st,
k: k, k: k,
proba: mathx.NewProba(), proba: mathx.NewProba(),
lastPass: syncx.NewAtomicDuration(),
} }
for i := 0; i < testBuckets; i++ { for i := 0; i < testBuckets; i++ {
for j := 0; j < 100; j++ { for j := 0; j < 100; j++ {
@@ -117,6 +120,43 @@ func TestGoogleBreakerReject(t *testing.T) {
}, nil, defaultAcceptable)) }, nil, defaultAcceptable))
} }
func TestGoogleBreakerMoreFallingBuckets(t *testing.T) {
t.Parallel()
t.Run("more falling buckets", func(t *testing.T) {
b := getGoogleBreaker()
func() {
stopChan := time.After(testInterval * minBucketsToSpeedUp * 2)
for {
time.Sleep(time.Millisecond)
select {
case <-stopChan:
return
default:
assert.Error(t, b.doReq(func() error {
return errors.New("foo")
}, func(err error) error {
return err
}, func(err error) bool {
return err == nil
}))
}
}
}()
var count int
for i := 0; i < 100; i++ {
if errors.Is(b.doReq(func() error {
return ErrServiceUnavailable
}, nil, defaultAcceptable), ErrServiceUnavailable) {
count++
}
}
assert.True(t, count > 90)
})
}
func TestGoogleBreakerAcceptable(t *testing.T) { func TestGoogleBreakerAcceptable(t *testing.T) {
b := getGoogleBreaker() b := getGoogleBreaker()
errAcceptable := errors.New("any") errAcceptable := errors.New("any")