diff --git a/core/discov/publisher.go b/core/discov/publisher.go index 03f23a52d..36c0d1762 100644 --- a/core/discov/publisher.go +++ b/core/discov/publisher.go @@ -125,6 +125,7 @@ func (p *Publisher) keepAliveAsync(cli internal.EtcdClient) error { } threading.GoSafe(func() { + watchChan := cli.Watch(cli.Ctx(), p.fullKey, clientv3.WithFilterPut()) for { select { case _, ok := <-ch: @@ -135,6 +136,24 @@ func (p *Publisher) keepAliveAsync(cli internal.EtcdClient) error { } return } + + case c := <-watchChan: + if c.Err() != nil { + logc.Errorf(cli.Ctx(), "etcd publisher watch: %s", c.Err().Error()) + if err := p.doKeepAlive(); err != nil { + logc.Errorf(cli.Ctx(), "etcd publisher KeepAlive: %s", err.Error()) + } + return + } + if c.Events[0].Type == clientv3.EventTypeDelete { + logc.Infof(cli.Ctx(), "etcd publisher watch: %s, event: %v", c.Events[0].Kv.Key, c.Events[0].Type) + _, err := cli.Put(cli.Ctx(), p.fullKey, p.value, clientv3.WithLease(p.lease)) + if err != nil { + logc.Errorf(cli.Ctx(), "etcd publisher re-put key: %s", err.Error()) + } else { + logc.Infof(cli.Ctx(), "etcd publisher re-put key: %s, value: %s", p.fullKey, p.value) + } + } case <-p.pauseChan: logc.Infof(cli.Ctx(), "paused etcd renew, key: %s, value: %s", p.key, p.value) p.revoke(cli) diff --git a/core/discov/publisher_test.go b/core/discov/publisher_test.go index 34a34f84f..2021d6023 100644 --- a/core/discov/publisher_test.go +++ b/core/discov/publisher_test.go @@ -15,6 +15,7 @@ import ( "github.com/zeromicro/go-zero/core/lang" "github.com/zeromicro/go-zero/core/logx" "github.com/zeromicro/go-zero/core/stringx" + "go.etcd.io/etcd/api/v3/mvccpb" clientv3 "go.etcd.io/etcd/client/v3" "golang.org/x/net/http2" "google.golang.org/grpc" @@ -211,6 +212,9 @@ func TestPublisher_keepAliveAsyncQuit(t *testing.T) { defer restore() cli.EXPECT().Ctx().AnyTimes() cli.EXPECT().KeepAlive(gomock.Any(), id) + // Add Watch expectation for the new watch mechanism + watchChan := make(<-chan clientv3.WatchResponse) + cli.EXPECT().Watch(gomock.Any(), gomock.Any(), gomock.Any()).Return(watchChan) var wg sync.WaitGroup wg.Add(1) cli.EXPECT().Revoke(gomock.Any(), id).Do(func(_, _ any) { @@ -232,6 +236,9 @@ func TestPublisher_keepAliveAsyncPause(t *testing.T) { defer restore() cli.EXPECT().Ctx().AnyTimes() cli.EXPECT().KeepAlive(gomock.Any(), id) + // Add Watch expectation for the new watch mechanism + watchChan := make(<-chan clientv3.WatchResponse) + cli.EXPECT().Watch(gomock.Any(), gomock.Any(), gomock.Any()).Return(watchChan) pub := NewPublisher(nil, "thekey", "thevalue") var wg sync.WaitGroup wg.Add(1) @@ -245,6 +252,112 @@ func TestPublisher_keepAliveAsyncPause(t *testing.T) { wg.Wait() } +// Test case for key deletion and re-registration (covers lines 148-155) +func TestPublisher_keepAliveAsyncKeyDeletion(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + const id clientv3.LeaseID = 1 + cli := internal.NewMockEtcdClient(ctrl) + restore := setMockClient(cli) + defer restore() + cli.EXPECT().Ctx().AnyTimes() + cli.EXPECT().KeepAlive(gomock.Any(), id) + + // Create a watch channel that will send a delete event + watchChan := make(chan clientv3.WatchResponse, 1) + watchResp := clientv3.WatchResponse{ + Events: []*clientv3.Event{{ + Type: clientv3.EventTypeDelete, + Kv: &mvccpb.KeyValue{ + Key: []byte("thekey"), + }, + }}, + } + watchChan <- watchResp + + cli.EXPECT().Watch(gomock.Any(), gomock.Any(), gomock.Any()).Return((<-chan clientv3.WatchResponse)(watchChan)) + + var wg sync.WaitGroup + wg.Add(1) // Only wait for Revoke call + + // Use a channel to signal when Put has been called + putCalled := make(chan struct{}) + + // Expect the re-put operation when key is deleted + cli.EXPECT().Put(gomock.Any(), "thekey", "thevalue", gomock.Any()).Do(func(_, _, _, _ any) { + close(putCalled) // Signal that Put has been called + }).Return(nil, nil) + + // Expect revoke when Stop is called + cli.EXPECT().Revoke(gomock.Any(), id).Do(func(_, _ any) { + wg.Done() + }) + + pub := NewPublisher(nil, "thekey", "thevalue") + pub.lease = id + pub.fullKey = "thekey" + + assert.Nil(t, pub.keepAliveAsync(cli)) + + // Wait for Put to be called, then stop + <-putCalled + pub.Stop() + wg.Wait() +} + +// Test case for key deletion with re-put error (covers error branch in lines 151-152) +func TestPublisher_keepAliveAsyncKeyDeletionPutError(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + const id clientv3.LeaseID = 1 + cli := internal.NewMockEtcdClient(ctrl) + restore := setMockClient(cli) + defer restore() + cli.EXPECT().Ctx().AnyTimes() + cli.EXPECT().KeepAlive(gomock.Any(), id) + + // Create a watch channel that will send a delete event + watchChan := make(chan clientv3.WatchResponse, 1) + watchResp := clientv3.WatchResponse{ + Events: []*clientv3.Event{{ + Type: clientv3.EventTypeDelete, + Kv: &mvccpb.KeyValue{ + Key: []byte("thekey"), + }, + }}, + } + watchChan <- watchResp + + cli.EXPECT().Watch(gomock.Any(), gomock.Any(), gomock.Any()).Return((<-chan clientv3.WatchResponse)(watchChan)) + + var wg sync.WaitGroup + wg.Add(1) // Only wait for Revoke call + + // Use a channel to signal when Put has been called + putCalled := make(chan struct{}) + + // Expect the re-put operation to fail + cli.EXPECT().Put(gomock.Any(), "thekey", "thevalue", gomock.Any()).Do(func(_, _, _, _ any) { + close(putCalled) // Signal that Put has been called + }).Return(nil, errors.New("put error")) + + // Expect revoke when Stop is called + cli.EXPECT().Revoke(gomock.Any(), id).Do(func(_, _ any) { + wg.Done() + }) + + pub := NewPublisher(nil, "thekey", "thevalue") + pub.lease = id + pub.fullKey = "thekey" + + assert.Nil(t, pub.keepAliveAsync(cli)) + + // Wait for Put to be called, then stop + <-putCalled + pub.Stop() + wg.Wait() +} + func TestPublisher_Resume(t *testing.T) { publisher := new(Publisher) publisher.resumeChan = make(chan lang.PlaceholderType) @@ -273,6 +386,9 @@ func TestPublisher_keepAliveAsync(t *testing.T) { defer restore() cli.EXPECT().Ctx().AnyTimes() cli.EXPECT().KeepAlive(gomock.Any(), id) + // Add Watch expectation for the new watch mechanism + watchChan := make(<-chan clientv3.WatchResponse) + cli.EXPECT().Watch(gomock.Any(), gomock.Any(), gomock.Any()).Return(watchChan) cli.EXPECT().Grant(gomock.Any(), timeToLive).Return(&clientv3.LeaseGrantResponse{ ID: 1, }, nil)