fix: resolve data race in service discovery map access (#5408)

This commit is contained in:
Kevin Wan
2026-02-06 23:16:05 +08:00
committed by GitHub
parent bdddf1f30c
commit b139a82c2e
2 changed files with 69 additions and 3 deletions

View File

@@ -433,16 +433,16 @@ func (c *cluster) setupWatch(cli EtcdClient, key watchKey, rev int64) (context.C
}
ctx, cancel := context.WithCancel(cli.Ctx())
c.lock.Lock()
if watcher, ok := c.watchers[key]; ok {
watcher.cancel = cancel
} else {
val := newWatchValue()
val.cancel = cancel
c.lock.Lock()
c.watchers[key] = val
c.lock.Unlock()
}
c.lock.Unlock()
rch = cli.Watch(clientv3.WithRequireLeader(ctx), wkey, ops...)

View File

@@ -477,6 +477,72 @@ func TestRegistry_Unmonitor(t *testing.T) {
assert.Nil(t, watchVals)
}
// TestCluster_ConcurrentMonitor tests the race condition fix in setupWatch
// This test specifically covers the scenario from issue #5394 where:
// - addListener() writes to the watchers map (with lock)
// - setupWatch() reads from the watchers map (now with lock after fix)
// Running with -race flag will detect any race conditions
func TestCluster_ConcurrentMonitor(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
cli := NewMockEtcdClient(ctrl)
cli.EXPECT().Ctx().Return(context.Background()).AnyTimes()
cli.EXPECT().Watch(gomock.Any(), gomock.Any(), gomock.Any()).Return(make(chan clientv3.WatchResponse)).AnyTimes()
c := &cluster{
endpoints: []string{"localhost:2379"},
key: "test-cluster",
watchers: make(map[watchKey]*watchValue),
watchGroup: threading.NewRoutineGroup(),
done: make(chan lang.PlaceholderType),
lock: sync.RWMutex{},
}
// Spawn multiple concurrent operations that simulate the race condition:
// - Some goroutines call addListener (write to map)
// - Some goroutines call setupWatch (read from map)
var wg sync.WaitGroup
numGoroutines := 20
wg.Add(numGoroutines)
keys := []watchKey{
{key: "key-0", exactMatch: false},
{key: "key-1", exactMatch: false},
{key: "key-2", exactMatch: false},
}
for i := 0; i < numGoroutines; i++ {
idx := i
go func() {
defer wg.Done()
key := keys[idx%len(keys)]
if idx%2 == 0 {
// Half the goroutines add listeners (write operation)
c.addListener(key, &mockListener{})
} else {
// Half the goroutines setup watches (read operation)
_, _ = c.setupWatch(cli, key, 0)
}
}()
}
// Wait for all goroutines to complete
wg.Wait()
// Verify that watchers were correctly added
c.lock.RLock()
assert.True(t, len(c.watchers) > 0, "watchers should be added")
for _, watcher := range c.watchers {
assert.NotNil(t, watcher, "watcher should not be nil")
}
c.lock.RUnlock()
// Clean up
close(c.done)
}
type mockListener struct {
}