Fix the issue of incorrect values notified in the configuration center (#5348)

This commit is contained in:
kesonan
2025-12-23 22:06:04 +08:00
committed by GitHub
parent 39729f3756
commit 52df1c532a
4 changed files with 275 additions and 22 deletions

View File

@@ -3,6 +3,8 @@ package subscriber
import ( import (
"github.com/zeromicro/go-zero/core/discov" "github.com/zeromicro/go-zero/core/discov"
"github.com/zeromicro/go-zero/core/logx" "github.com/zeromicro/go-zero/core/logx"
"sync"
"sync/atomic"
) )
type ( type (
@@ -37,6 +39,7 @@ func NewEtcdSubscriber(conf EtcdConf) (Subscriber, error) {
func buildSubOptions(conf EtcdConf) []discov.SubOption { func buildSubOptions(conf EtcdConf) []discov.SubOption {
opts := []discov.SubOption{ opts := []discov.SubOption{
discov.WithExactMatch(), discov.WithExactMatch(),
discov.WithContainer(newConfigCenterContainer()),
} }
if len(conf.User) > 0 { if len(conf.User) > 0 {
@@ -65,3 +68,47 @@ func (s *etcdSubscriber) Value() (string, error) {
return "", nil return "", nil
} }
type configCenterContainer struct {
value atomic.Value
lock sync.Mutex
listeners []func()
}
func newConfigCenterContainer() *configCenterContainer {
return &configCenterContainer{}
}
func (c *configCenterContainer) OnAdd(kv discov.KV) {
c.value.Store([]string{kv.Val})
c.notifyChange()
}
func (c *configCenterContainer) OnDelete(_ discov.KV) {
c.value.Store([]string(nil))
c.notifyChange()
}
func (c *configCenterContainer) AddListener(listener func()) {
c.lock.Lock()
c.listeners = append(c.listeners, listener)
c.lock.Unlock()
}
func (c *configCenterContainer) GetValues() []string {
vals, ok := c.value.Load().([]string)
if !ok {
return []string(nil)
}
return vals
}
func (c *configCenterContainer) notifyChange() {
c.lock.Lock()
listeners := append(([]func())(nil), c.listeners...)
c.lock.Unlock()
for _, listener := range listeners {
listener()
}
}

View File

@@ -0,0 +1,186 @@
package subscriber
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/zeromicro/go-zero/core/discov"
)
const (
actionAdd = iota
actionDel
)
func TestConfigCenterContainer(t *testing.T) {
type action struct {
act int
key string
val string
}
tests := []struct {
name string
do []action
expect []string
}{
{
name: "add one",
do: []action{
{
act: actionAdd,
key: "first",
val: "a",
},
},
expect: []string{
"a",
},
},
{
name: "add two",
do: []action{
{
act: actionAdd,
key: "first",
val: "a",
},
{
act: actionAdd,
key: "second",
val: "b",
},
},
expect: []string{
"b",
},
},
{
name: "add two, delete one",
do: []action{
{
act: actionAdd,
key: "first",
val: "a",
},
{
act: actionAdd,
key: "second",
val: "b",
},
{
act: actionDel,
key: "first",
},
},
expect: []string(nil),
},
{
name: "add two, delete two",
do: []action{
{
act: actionAdd,
key: "first",
val: "a",
},
{
act: actionAdd,
key: "second",
val: "b",
},
{
act: actionDel,
key: "first",
},
{
act: actionDel,
key: "second",
},
},
expect: []string(nil),
},
{
name: "add two, dup values",
do: []action{
{
act: actionAdd,
key: "first",
val: "a",
},
{
act: actionAdd,
key: "second",
val: "b",
},
{
act: actionAdd,
key: "third",
val: "a",
},
},
expect: []string{"a"},
},
{
name: "add three, dup values, delete two, add one",
do: []action{
{
act: actionAdd,
key: "first",
val: "a",
},
{
act: actionAdd,
key: "second",
val: "b",
},
{
act: actionAdd,
key: "third",
val: "a",
},
{
act: actionDel,
key: "first",
},
{
act: actionDel,
key: "second",
},
{
act: actionAdd,
key: "forth",
val: "c",
},
},
expect: []string{"c"},
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
var changed bool
c := newConfigCenterContainer()
c.AddListener(func() {
changed = true
})
assert.Nil(t, c.GetValues())
assert.False(t, changed)
for _, order := range test.do {
if order.act == actionAdd {
c.OnAdd(discov.KV{
Key: order.key,
Val: order.val,
})
} else {
c.OnDelete(discov.KV{
Key: order.key,
Val: order.val,
})
}
}
assert.True(t, changed)
assert.ElementsMatch(t, test.expect, c.GetValues())
})
}
}

View File

@@ -19,8 +19,9 @@ type (
exclusive bool exclusive bool
key string key string
exactMatch bool exactMatch bool
items *container items Container
} }
KV = internal.KV
) )
// NewSubscriber returns a Subscriber. // NewSubscriber returns a Subscriber.
@@ -35,7 +36,9 @@ func NewSubscriber(endpoints []string, key string, opts ...SubOption) (*Subscrib
for _, opt := range opts { for _, opt := range opts {
opt(sub) opt(sub)
} }
sub.items = newContainer(sub.exclusive) if sub.items == nil {
sub.items = newContainer(sub.exclusive)
}
if err := internal.GetRegistry().Monitor(endpoints, key, sub.exactMatch, sub.items); err != nil { if err := internal.GetRegistry().Monitor(endpoints, key, sub.exactMatch, sub.items); err != nil {
return nil, err return nil, err
@@ -46,7 +49,7 @@ func NewSubscriber(endpoints []string, key string, opts ...SubOption) (*Subscrib
// AddListener adds listener to s. // AddListener adds listener to s.
func (s *Subscriber) AddListener(listener func()) { func (s *Subscriber) AddListener(listener func()) {
s.items.addListener(listener) s.items.AddListener(listener)
} }
// Close closes the subscriber. // Close closes the subscriber.
@@ -56,7 +59,7 @@ func (s *Subscriber) Close() {
// Values returns all the subscription values. // Values returns all the subscription values.
func (s *Subscriber) Values() []string { func (s *Subscriber) Values() []string {
return s.items.getValues() return s.items.GetValues()
} }
// Exclusive means that key value can only be 1:1, // Exclusive means that key value can only be 1:1,
@@ -88,16 +91,31 @@ func WithSubEtcdTLS(certFile, certKeyFile, caFile string, insecureSkipVerify boo
} }
} }
type container struct { // WithContainer provides a custom container to the subscriber.
exclusive bool func WithContainer(container Container) SubOption {
values map[string][]string return func(sub *Subscriber) {
mapping map[string]string sub.items = container
snapshot atomic.Value }
dirty *syncx.AtomicBool
listeners []func()
lock sync.Mutex
} }
type (
Container interface {
OnAdd(kv internal.KV)
OnDelete(kv internal.KV)
AddListener(listener func())
GetValues() []string
}
container struct {
exclusive bool
values map[string][]string
mapping map[string]string
snapshot atomic.Value
dirty *syncx.AtomicBool
listeners []func()
lock sync.Mutex
}
)
func newContainer(exclusive bool) *container { func newContainer(exclusive bool) *container {
return &container{ return &container{
exclusive: exclusive, exclusive: exclusive,
@@ -141,7 +159,7 @@ func (c *container) addKv(key, value string) ([]string, bool) {
return nil, false return nil, false
} }
func (c *container) addListener(listener func()) { func (c *container) AddListener(listener func()) {
c.lock.Lock() c.lock.Lock()
c.listeners = append(c.listeners, listener) c.listeners = append(c.listeners, listener)
c.lock.Unlock() c.lock.Unlock()
@@ -170,7 +188,7 @@ func (c *container) doRemoveKey(key string) {
} }
} }
func (c *container) getValues() []string { func (c *container) GetValues() []string {
if !c.dirty.True() { if !c.dirty.True() {
return c.snapshot.Load().([]string) return c.snapshot.Load().([]string)
} }

View File

@@ -171,10 +171,10 @@ func TestContainer(t *testing.T) {
t.Run(test.name, func(t *testing.T) { t.Run(test.name, func(t *testing.T) {
var changed bool var changed bool
c := newContainer(exclusive) c := newContainer(exclusive)
c.addListener(func() { c.AddListener(func() {
changed = true changed = true
}) })
assert.Nil(t, c.getValues()) assert.Nil(t, c.GetValues())
assert.False(t, changed) assert.False(t, changed)
for _, order := range test.do { for _, order := range test.do {
@@ -193,9 +193,9 @@ func TestContainer(t *testing.T) {
assert.True(t, changed) assert.True(t, changed)
assert.True(t, c.dirty.True()) assert.True(t, c.dirty.True())
assert.ElementsMatch(t, test.expect, c.getValues()) assert.ElementsMatch(t, test.expect, c.GetValues())
assert.False(t, c.dirty.True()) assert.False(t, c.dirty.True())
assert.ElementsMatch(t, test.expect, c.getValues()) assert.ElementsMatch(t, test.expect, c.GetValues())
}) })
} }
} }
@@ -204,12 +204,13 @@ func TestContainer(t *testing.T) {
func TestSubscriber(t *testing.T) { func TestSubscriber(t *testing.T) {
sub := new(Subscriber) sub := new(Subscriber)
Exclusive()(sub) Exclusive()(sub)
sub.items = newContainer(sub.exclusive) c := newContainer(sub.exclusive)
sub.items = c
var count int32 var count int32
sub.AddListener(func() { sub.AddListener(func() {
atomic.AddInt32(&count, 1) atomic.AddInt32(&count, 1)
}) })
sub.items.notifyChange() c.notifyChange()
assert.Empty(t, sub.Values()) assert.Empty(t, sub.Values())
assert.Equal(t, int32(1), atomic.LoadInt32(&count)) assert.Equal(t, int32(1), atomic.LoadInt32(&count))
} }
@@ -229,12 +230,13 @@ func TestWithSubEtcdAccount(t *testing.T) {
func TestWithExactMatch(t *testing.T) { func TestWithExactMatch(t *testing.T) {
sub := new(Subscriber) sub := new(Subscriber)
WithExactMatch()(sub) WithExactMatch()(sub)
sub.items = newContainer(sub.exclusive) c := newContainer(sub.exclusive)
sub.items = c
var count int32 var count int32
sub.AddListener(func() { sub.AddListener(func() {
atomic.AddInt32(&count, 1) atomic.AddInt32(&count, 1)
}) })
sub.items.notifyChange() c.notifyChange()
assert.Empty(t, sub.Values()) assert.Empty(t, sub.Values())
assert.Equal(t, int32(1), atomic.LoadInt32(&count)) assert.Equal(t, int32(1), atomic.LoadInt32(&count))
} }