chore: refactor etcd discov (#5046)

This commit is contained in:
Kevin Wan
2025-08-01 23:12:49 +08:00
committed by GitHub
parent ee6b7cee79
commit b69db5e09d

View File

@@ -92,12 +92,12 @@ func (p *Publisher) doKeepAlive() error {
default: default:
cli, err := p.doRegister() cli, err := p.doRegister()
if err != nil { if err != nil {
logc.Errorf(cli.Ctx(), "etcd publisher doRegister: %s", err.Error()) logc.Errorf(cli.Ctx(), "etcd publisher doRegister: %v", err)
break break
} }
if err := p.keepAliveAsync(cli); err != nil { if err := p.keepAliveAsync(cli); err != nil {
logc.Errorf(cli.Ctx(), "etcd publisher keepAliveAsync: %s", err.Error()) logc.Errorf(cli.Ctx(), "etcd publisher keepAliveAsync: %v", err)
break break
} }
@@ -125,33 +125,39 @@ func (p *Publisher) keepAliveAsync(cli internal.EtcdClient) error {
} }
threading.GoSafe(func() { threading.GoSafe(func() {
watchChan := cli.Watch(cli.Ctx(), p.fullKey, clientv3.WithFilterPut()) wch := cli.Watch(cli.Ctx(), p.fullKey, clientv3.WithFilterPut())
for { for {
select { select {
case _, ok := <-ch: case _, ok := <-ch:
if !ok { if !ok {
p.revoke(cli) p.revoke(cli)
if err := p.doKeepAlive(); err != nil { if err := p.doKeepAlive(); err != nil {
logc.Errorf(cli.Ctx(), "etcd publisher KeepAlive: %s", err.Error()) logc.Errorf(cli.Ctx(), "etcd publisher KeepAlive: %v", err)
} }
return return
} }
case c := <-watchChan: case c := <-wch:
if c.Err() != nil { if c.Err() != nil {
logc.Errorf(cli.Ctx(), "etcd publisher watch: %s", c.Err().Error()) logc.Errorf(cli.Ctx(), "etcd publisher watch: %v", c.Err())
if err := p.doKeepAlive(); err != nil { if err := p.doKeepAlive(); err != nil {
logc.Errorf(cli.Ctx(), "etcd publisher KeepAlive: %s", err.Error()) logc.Errorf(cli.Ctx(), "etcd publisher KeepAlive: %v", err)
} }
return return
} }
if c.Events[0].Type == clientv3.EventTypeDelete {
logc.Infof(cli.Ctx(), "etcd publisher watch: %s, event: %v", c.Events[0].Kv.Key, c.Events[0].Type) for _, evt := range c.Events {
_, err := cli.Put(cli.Ctx(), p.fullKey, p.value, clientv3.WithLease(p.lease)) if evt.Type == clientv3.EventTypeDelete {
if err != nil { logc.Infof(cli.Ctx(), "etcd publisher watch: %s, event: %v",
logc.Errorf(cli.Ctx(), "etcd publisher re-put key: %s", err.Error()) evt.Kv.Key, evt.Type)
} else { _, err := cli.Put(cli.Ctx(), p.fullKey, p.value, clientv3.WithLease(p.lease))
logc.Infof(cli.Ctx(), "etcd publisher re-put key: %s, value: %s", p.fullKey, p.value) if err != nil {
logc.Errorf(cli.Ctx(), "etcd publisher re-put key: %v", err)
} else {
logc.Infof(cli.Ctx(), "etcd publisher re-put key: %s, value: %s",
p.fullKey, p.value)
}
} }
} }
case <-p.pauseChan: case <-p.pauseChan:
@@ -160,7 +166,7 @@ func (p *Publisher) keepAliveAsync(cli internal.EtcdClient) error {
select { select {
case <-p.resumeChan: case <-p.resumeChan:
if err := p.doKeepAlive(); err != nil { if err := p.doKeepAlive(); err != nil {
logc.Errorf(cli.Ctx(), "etcd publisher KeepAlive: %s", err.Error()) logc.Errorf(cli.Ctx(), "etcd publisher KeepAlive: %v", err)
} }
return return
case <-p.quit.Done(): case <-p.quit.Done():
@@ -195,7 +201,7 @@ func (p *Publisher) register(client internal.EtcdClient) (clientv3.LeaseID, erro
func (p *Publisher) revoke(cli internal.EtcdClient) { func (p *Publisher) revoke(cli internal.EtcdClient) {
if _, err := cli.Revoke(cli.Ctx(), p.lease); err != nil { if _, err := cli.Revoke(cli.Ctx(), p.lease); err != nil {
logc.Errorf(cli.Ctx(), "etcd publisher revoke: %s", err.Error()) logc.Errorf(cli.Ctx(), "etcd publisher revoke: %v", err)
} }
} }