From 52df1c532af0bdd51a849e1bce622fe57b386d51 Mon Sep 17 00:00:00 2001 From: kesonan Date: Tue, 23 Dec 2025 22:06:04 +0800 Subject: [PATCH] Fix the issue of incorrect values notified in the configuration center (#5348) --- core/configcenter/subscriber/etcd.go | 47 ++++++ core/configcenter/subscriber/etcd_test.go | 186 ++++++++++++++++++++++ core/discov/subscriber.go | 46 ++++-- core/discov/subscriber_test.go | 18 ++- 4 files changed, 275 insertions(+), 22 deletions(-) create mode 100644 core/configcenter/subscriber/etcd_test.go diff --git a/core/configcenter/subscriber/etcd.go b/core/configcenter/subscriber/etcd.go index f9a11efcc..c5a8f5334 100644 --- a/core/configcenter/subscriber/etcd.go +++ b/core/configcenter/subscriber/etcd.go @@ -3,6 +3,8 @@ package subscriber import ( "github.com/zeromicro/go-zero/core/discov" "github.com/zeromicro/go-zero/core/logx" + "sync" + "sync/atomic" ) type ( @@ -37,6 +39,7 @@ func NewEtcdSubscriber(conf EtcdConf) (Subscriber, error) { func buildSubOptions(conf EtcdConf) []discov.SubOption { opts := []discov.SubOption{ discov.WithExactMatch(), + discov.WithContainer(newConfigCenterContainer()), } if len(conf.User) > 0 { @@ -65,3 +68,47 @@ func (s *etcdSubscriber) Value() (string, error) { return "", nil } + +type configCenterContainer struct { + value atomic.Value + lock sync.Mutex + listeners []func() +} + +func newConfigCenterContainer() *configCenterContainer { + return &configCenterContainer{} +} + +func (c *configCenterContainer) OnAdd(kv discov.KV) { + c.value.Store([]string{kv.Val}) + c.notifyChange() +} + +func (c *configCenterContainer) OnDelete(_ discov.KV) { + c.value.Store([]string(nil)) + c.notifyChange() +} + +func (c *configCenterContainer) AddListener(listener func()) { + c.lock.Lock() + c.listeners = append(c.listeners, listener) + c.lock.Unlock() +} + +func (c *configCenterContainer) GetValues() []string { + vals, ok := c.value.Load().([]string) + if !ok { + return []string(nil) + } + return vals +} + +func (c *configCenterContainer) notifyChange() { + c.lock.Lock() + listeners := append(([]func())(nil), c.listeners...) + c.lock.Unlock() + + for _, listener := range listeners { + listener() + } +} diff --git a/core/configcenter/subscriber/etcd_test.go b/core/configcenter/subscriber/etcd_test.go new file mode 100644 index 000000000..db6482f44 --- /dev/null +++ b/core/configcenter/subscriber/etcd_test.go @@ -0,0 +1,186 @@ +package subscriber + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/zeromicro/go-zero/core/discov" +) + +const ( + actionAdd = iota + actionDel +) + +func TestConfigCenterContainer(t *testing.T) { + type action struct { + act int + key string + val string + } + tests := []struct { + name string + do []action + expect []string + }{ + { + name: "add one", + do: []action{ + { + act: actionAdd, + key: "first", + val: "a", + }, + }, + expect: []string{ + "a", + }, + }, + { + name: "add two", + do: []action{ + { + act: actionAdd, + key: "first", + val: "a", + }, + { + act: actionAdd, + key: "second", + val: "b", + }, + }, + expect: []string{ + "b", + }, + }, + { + name: "add two, delete one", + do: []action{ + { + act: actionAdd, + key: "first", + val: "a", + }, + { + act: actionAdd, + key: "second", + val: "b", + }, + { + act: actionDel, + key: "first", + }, + }, + expect: []string(nil), + }, + { + name: "add two, delete two", + do: []action{ + { + act: actionAdd, + key: "first", + val: "a", + }, + { + act: actionAdd, + key: "second", + val: "b", + }, + { + act: actionDel, + key: "first", + }, + { + act: actionDel, + key: "second", + }, + }, + expect: []string(nil), + }, + { + name: "add two, dup values", + do: []action{ + { + act: actionAdd, + key: "first", + val: "a", + }, + { + act: actionAdd, + key: "second", + val: "b", + }, + { + act: actionAdd, + key: "third", + val: "a", + }, + }, + expect: []string{"a"}, + }, + { + name: "add three, dup values, delete two, add one", + do: []action{ + { + act: actionAdd, + key: "first", + val: "a", + }, + { + act: actionAdd, + key: "second", + val: "b", + }, + { + act: actionAdd, + key: "third", + val: "a", + }, + { + act: actionDel, + key: "first", + }, + { + act: actionDel, + key: "second", + }, + { + act: actionAdd, + key: "forth", + val: "c", + }, + }, + expect: []string{"c"}, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + var changed bool + c := newConfigCenterContainer() + c.AddListener(func() { + changed = true + }) + assert.Nil(t, c.GetValues()) + assert.False(t, changed) + + for _, order := range test.do { + if order.act == actionAdd { + c.OnAdd(discov.KV{ + Key: order.key, + Val: order.val, + }) + } else { + c.OnDelete(discov.KV{ + Key: order.key, + Val: order.val, + }) + } + } + + assert.True(t, changed) + assert.ElementsMatch(t, test.expect, c.GetValues()) + }) + } +} diff --git a/core/discov/subscriber.go b/core/discov/subscriber.go index 7cb7fd511..12fc5d2ea 100644 --- a/core/discov/subscriber.go +++ b/core/discov/subscriber.go @@ -19,8 +19,9 @@ type ( exclusive bool key string exactMatch bool - items *container + items Container } + KV = internal.KV ) // NewSubscriber returns a Subscriber. @@ -35,7 +36,9 @@ func NewSubscriber(endpoints []string, key string, opts ...SubOption) (*Subscrib for _, opt := range opts { opt(sub) } - sub.items = newContainer(sub.exclusive) + if sub.items == nil { + sub.items = newContainer(sub.exclusive) + } if err := internal.GetRegistry().Monitor(endpoints, key, sub.exactMatch, sub.items); err != nil { return nil, err @@ -46,7 +49,7 @@ func NewSubscriber(endpoints []string, key string, opts ...SubOption) (*Subscrib // AddListener adds listener to s. func (s *Subscriber) AddListener(listener func()) { - s.items.addListener(listener) + s.items.AddListener(listener) } // Close closes the subscriber. @@ -56,7 +59,7 @@ func (s *Subscriber) Close() { // Values returns all the subscription values. func (s *Subscriber) Values() []string { - return s.items.getValues() + return s.items.GetValues() } // Exclusive means that key value can only be 1:1, @@ -88,16 +91,31 @@ func WithSubEtcdTLS(certFile, certKeyFile, caFile string, insecureSkipVerify boo } } -type container struct { - exclusive bool - values map[string][]string - mapping map[string]string - snapshot atomic.Value - dirty *syncx.AtomicBool - listeners []func() - lock sync.Mutex +// WithContainer provides a custom container to the subscriber. +func WithContainer(container Container) SubOption { + return func(sub *Subscriber) { + sub.items = container + } } +type ( + Container interface { + OnAdd(kv internal.KV) + OnDelete(kv internal.KV) + AddListener(listener func()) + GetValues() []string + } + container struct { + exclusive bool + values map[string][]string + mapping map[string]string + snapshot atomic.Value + dirty *syncx.AtomicBool + listeners []func() + lock sync.Mutex + } +) + func newContainer(exclusive bool) *container { return &container{ exclusive: exclusive, @@ -141,7 +159,7 @@ func (c *container) addKv(key, value string) ([]string, bool) { return nil, false } -func (c *container) addListener(listener func()) { +func (c *container) AddListener(listener func()) { c.lock.Lock() c.listeners = append(c.listeners, listener) c.lock.Unlock() @@ -170,7 +188,7 @@ func (c *container) doRemoveKey(key string) { } } -func (c *container) getValues() []string { +func (c *container) GetValues() []string { if !c.dirty.True() { return c.snapshot.Load().([]string) } diff --git a/core/discov/subscriber_test.go b/core/discov/subscriber_test.go index b8762afa7..d91d72ae4 100644 --- a/core/discov/subscriber_test.go +++ b/core/discov/subscriber_test.go @@ -171,10 +171,10 @@ func TestContainer(t *testing.T) { t.Run(test.name, func(t *testing.T) { var changed bool c := newContainer(exclusive) - c.addListener(func() { + c.AddListener(func() { changed = true }) - assert.Nil(t, c.getValues()) + assert.Nil(t, c.GetValues()) assert.False(t, changed) for _, order := range test.do { @@ -193,9 +193,9 @@ func TestContainer(t *testing.T) { assert.True(t, changed) assert.True(t, c.dirty.True()) - assert.ElementsMatch(t, test.expect, c.getValues()) + assert.ElementsMatch(t, test.expect, c.GetValues()) assert.False(t, c.dirty.True()) - assert.ElementsMatch(t, test.expect, c.getValues()) + assert.ElementsMatch(t, test.expect, c.GetValues()) }) } } @@ -204,12 +204,13 @@ func TestContainer(t *testing.T) { func TestSubscriber(t *testing.T) { sub := new(Subscriber) Exclusive()(sub) - sub.items = newContainer(sub.exclusive) + c := newContainer(sub.exclusive) + sub.items = c var count int32 sub.AddListener(func() { atomic.AddInt32(&count, 1) }) - sub.items.notifyChange() + c.notifyChange() assert.Empty(t, sub.Values()) assert.Equal(t, int32(1), atomic.LoadInt32(&count)) } @@ -229,12 +230,13 @@ func TestWithSubEtcdAccount(t *testing.T) { func TestWithExactMatch(t *testing.T) { sub := new(Subscriber) WithExactMatch()(sub) - sub.items = newContainer(sub.exclusive) + c := newContainer(sub.exclusive) + sub.items = c var count int32 sub.AddListener(func() { atomic.AddInt32(&count, 1) }) - sub.items.notifyChange() + c.notifyChange() assert.Empty(t, sub.Values()) assert.Equal(t, int32(1), atomic.LoadInt32(&count)) }