From b69db5e09d5d42daa55f93396409c28e9bec44ef Mon Sep 17 00:00:00 2001 From: Kevin Wan Date: Fri, 1 Aug 2025 23:12:49 +0800 Subject: [PATCH] chore: refactor etcd discov (#5046) --- core/discov/publisher.go | 38 ++++++++++++++++++++++---------------- 1 file changed, 22 insertions(+), 16 deletions(-) diff --git a/core/discov/publisher.go b/core/discov/publisher.go index 36c0d1762..549a81e7e 100644 --- a/core/discov/publisher.go +++ b/core/discov/publisher.go @@ -92,12 +92,12 @@ func (p *Publisher) doKeepAlive() error { default: cli, err := p.doRegister() if err != nil { - logc.Errorf(cli.Ctx(), "etcd publisher doRegister: %s", err.Error()) + logc.Errorf(cli.Ctx(), "etcd publisher doRegister: %v", err) break } if err := p.keepAliveAsync(cli); err != nil { - logc.Errorf(cli.Ctx(), "etcd publisher keepAliveAsync: %s", err.Error()) + logc.Errorf(cli.Ctx(), "etcd publisher keepAliveAsync: %v", err) break } @@ -125,33 +125,39 @@ func (p *Publisher) keepAliveAsync(cli internal.EtcdClient) error { } threading.GoSafe(func() { - watchChan := cli.Watch(cli.Ctx(), p.fullKey, clientv3.WithFilterPut()) + wch := cli.Watch(cli.Ctx(), p.fullKey, clientv3.WithFilterPut()) + for { select { case _, ok := <-ch: if !ok { p.revoke(cli) if err := p.doKeepAlive(); err != nil { - logc.Errorf(cli.Ctx(), "etcd publisher KeepAlive: %s", err.Error()) + logc.Errorf(cli.Ctx(), "etcd publisher KeepAlive: %v", err) } return } - case c := <-watchChan: + case c := <-wch: if c.Err() != nil { - logc.Errorf(cli.Ctx(), "etcd publisher watch: %s", c.Err().Error()) + logc.Errorf(cli.Ctx(), "etcd publisher watch: %v", c.Err()) if err := p.doKeepAlive(); err != nil { - logc.Errorf(cli.Ctx(), "etcd publisher KeepAlive: %s", err.Error()) + logc.Errorf(cli.Ctx(), "etcd publisher KeepAlive: %v", err) } return } - if c.Events[0].Type == clientv3.EventTypeDelete { - logc.Infof(cli.Ctx(), "etcd publisher watch: %s, event: %v", c.Events[0].Kv.Key, c.Events[0].Type) - _, err := cli.Put(cli.Ctx(), p.fullKey, p.value, clientv3.WithLease(p.lease)) - if err != nil { - logc.Errorf(cli.Ctx(), "etcd publisher re-put key: %s", err.Error()) - } else { - logc.Infof(cli.Ctx(), "etcd publisher re-put key: %s, value: %s", p.fullKey, p.value) + + for _, evt := range c.Events { + if evt.Type == clientv3.EventTypeDelete { + logc.Infof(cli.Ctx(), "etcd publisher watch: %s, event: %v", + evt.Kv.Key, evt.Type) + _, err := cli.Put(cli.Ctx(), p.fullKey, p.value, clientv3.WithLease(p.lease)) + if err != nil { + logc.Errorf(cli.Ctx(), "etcd publisher re-put key: %v", err) + } else { + logc.Infof(cli.Ctx(), "etcd publisher re-put key: %s, value: %s", + p.fullKey, p.value) + } } } case <-p.pauseChan: @@ -160,7 +166,7 @@ func (p *Publisher) keepAliveAsync(cli internal.EtcdClient) error { select { case <-p.resumeChan: if err := p.doKeepAlive(); err != nil { - logc.Errorf(cli.Ctx(), "etcd publisher KeepAlive: %s", err.Error()) + logc.Errorf(cli.Ctx(), "etcd publisher KeepAlive: %v", err) } return case <-p.quit.Done(): @@ -195,7 +201,7 @@ func (p *Publisher) register(client internal.EtcdClient) (clientv3.LeaseID, erro func (p *Publisher) revoke(cli internal.EtcdClient) { if _, err := cli.Revoke(cli.Ctx(), p.lease); err != nil { - logc.Errorf(cli.Ctx(), "etcd publisher revoke: %s", err.Error()) + logc.Errorf(cli.Ctx(), "etcd publisher revoke: %v", err) } }