From 74ee16376109911ee641cf6138c11bc10f6457fa Mon Sep 17 00:00:00 2001 From: Kevin Wan Date: Thu, 17 Jun 2021 18:46:16 +0800 Subject: [PATCH] fix bug that etcd stream cancelled without re-watch (#770) --- core/discov/internal/registry.go | 18 +++++++++++++----- core/discov/internal/registry_test.go | 9 +++++++-- 2 files changed, 20 insertions(+), 7 deletions(-) diff --git a/core/discov/internal/registry.go b/core/discov/internal/registry.go index 38e5765e6..799a72c1c 100644 --- a/core/discov/internal/registry.go +++ b/core/discov/internal/registry.go @@ -260,26 +260,34 @@ func (c *cluster) reload(cli EtcdClient) { } func (c *cluster) watch(cli EtcdClient, key string) { + for { + if c.watchStream(cli, key) { + return + } + } +} + +func (c *cluster) watchStream(cli EtcdClient, key string) bool { rch := cli.Watch(clientv3.WithRequireLeader(c.context(cli)), makeKeyPrefix(key), clientv3.WithPrefix()) for { select { case wresp, ok := <-rch: if !ok { logx.Error("etcd monitor chan has been closed") - return + return false } if wresp.Canceled { - logx.Error("etcd monitor chan has been canceled") - return + logx.Errorf("etcd monitor chan has been canceled, error: %v", wresp.Err()) + return false } if wresp.Err() != nil { logx.Error(fmt.Sprintf("etcd monitor chan error: %v", wresp.Err())) - return + return false } c.handleWatchEvents(key, wresp.Events) case <-c.done: - return + return true } } } diff --git a/core/discov/internal/registry_test.go b/core/discov/internal/registry_test.go index af9b4ad1a..7c142564a 100644 --- a/core/discov/internal/registry_test.go +++ b/core/discov/internal/registry_test.go @@ -8,6 +8,7 @@ import ( "github.com/golang/mock/gomock" "github.com/stretchr/testify/assert" "github.com/tal-tech/go-zero/core/contextx" + "github.com/tal-tech/go-zero/core/lang" "github.com/tal-tech/go-zero/core/logx" "github.com/tal-tech/go-zero/core/stringx" "go.etcd.io/etcd/clientv3" @@ -202,11 +203,13 @@ func TestClusterWatch_RespFailures(t *testing.T) { restore := setMockClient(cli) defer restore() ch := make(chan clientv3.WatchResponse) - cli.EXPECT().Watch(gomock.Any(), "any/", gomock.Any()).Return(ch) + cli.EXPECT().Watch(gomock.Any(), "any/", gomock.Any()).Return(ch).AnyTimes() cli.EXPECT().Ctx().Return(context.Background()).AnyTimes() c := new(cluster) + c.done = make(chan lang.PlaceholderType) go func() { ch <- resp + close(c.done) }() c.watch(cli, "any") }) @@ -220,11 +223,13 @@ func TestClusterWatch_CloseChan(t *testing.T) { restore := setMockClient(cli) defer restore() ch := make(chan clientv3.WatchResponse) - cli.EXPECT().Watch(gomock.Any(), "any/", gomock.Any()).Return(ch) + cli.EXPECT().Watch(gomock.Any(), "any/", gomock.Any()).Return(ch).AnyTimes() cli.EXPECT().Ctx().Return(context.Background()).AnyTimes() c := new(cluster) + c.done = make(chan lang.PlaceholderType) go func() { close(ch) + close(c.done) }() c.watch(cli, "any") }