diff --git a/core/discov/internal/registry.go b/core/discov/internal/registry.go index 7975db99a..c4ab41170 100644 --- a/core/discov/internal/registry.go +++ b/core/discov/internal/registry.go @@ -433,16 +433,16 @@ func (c *cluster) setupWatch(cli EtcdClient, key watchKey, rev int64) (context.C } ctx, cancel := context.WithCancel(cli.Ctx()) + + c.lock.Lock() if watcher, ok := c.watchers[key]; ok { watcher.cancel = cancel } else { val := newWatchValue() val.cancel = cancel - - c.lock.Lock() c.watchers[key] = val - c.lock.Unlock() } + c.lock.Unlock() rch = cli.Watch(clientv3.WithRequireLeader(ctx), wkey, ops...) diff --git a/core/discov/internal/registry_test.go b/core/discov/internal/registry_test.go index eaee49a99..21f7e079e 100644 --- a/core/discov/internal/registry_test.go +++ b/core/discov/internal/registry_test.go @@ -477,6 +477,72 @@ func TestRegistry_Unmonitor(t *testing.T) { assert.Nil(t, watchVals) } +// TestCluster_ConcurrentMonitor tests the race condition fix in setupWatch +// This test specifically covers the scenario from issue #5394 where: +// - addListener() writes to the watchers map (with lock) +// - setupWatch() reads from the watchers map (now with lock after fix) +// Running with -race flag will detect any race conditions +func TestCluster_ConcurrentMonitor(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + cli := NewMockEtcdClient(ctrl) + cli.EXPECT().Ctx().Return(context.Background()).AnyTimes() + cli.EXPECT().Watch(gomock.Any(), gomock.Any(), gomock.Any()).Return(make(chan clientv3.WatchResponse)).AnyTimes() + + c := &cluster{ + endpoints: []string{"localhost:2379"}, + key: "test-cluster", + watchers: make(map[watchKey]*watchValue), + watchGroup: threading.NewRoutineGroup(), + done: make(chan lang.PlaceholderType), + lock: sync.RWMutex{}, + } + + // Spawn multiple concurrent operations that simulate the race condition: + // - Some goroutines call addListener (write to map) + // - Some goroutines call setupWatch (read from map) + var wg sync.WaitGroup + numGoroutines := 20 + wg.Add(numGoroutines) + + keys := []watchKey{ + {key: "key-0", exactMatch: false}, + {key: "key-1", exactMatch: false}, + {key: "key-2", exactMatch: false}, + } + + for i := 0; i < numGoroutines; i++ { + idx := i + go func() { + defer wg.Done() + key := keys[idx%len(keys)] + + if idx%2 == 0 { + // Half the goroutines add listeners (write operation) + c.addListener(key, &mockListener{}) + } else { + // Half the goroutines setup watches (read operation) + _, _ = c.setupWatch(cli, key, 0) + } + }() + } + + // Wait for all goroutines to complete + wg.Wait() + + // Verify that watchers were correctly added + c.lock.RLock() + assert.True(t, len(c.watchers) > 0, "watchers should be added") + for _, watcher := range c.watchers { + assert.NotNil(t, watcher, "watcher should not be nil") + } + c.lock.RUnlock() + + // Clean up + close(c.done) +} + type mockListener struct { }