fix cpu leak

This commit is contained in:
barnettZQG 2020-08-12 13:26:20 +08:00
parent d033b8bd2c
commit 039387f149

View File

@ -173,30 +173,34 @@ func (wc *watchChan) startWatching(watchClosedCh chan struct{}) {
ctx, cancel := context.WithCancel(wc.ctx)
defer cancel()
wch := wc.watcher.client.Watch(ctx, wc.key, opts...)
timer := time.NewTimer(time.Second * 20)
defer timer.Stop()
lool:
for {
select {
case wres := <-wch:
if wres.Err() != nil {
err := wres.Err()
// If there is an error on server (e.g. compaction), the channel will return it before closed.
logrus.Errorf("watch chan error: %v", err)
wc.sendError(err)
return
err := func() error {
timer := time.NewTimer(time.Second * 20)
defer timer.Stop()
for {
select {
case wres := <-wch:
if wres.Err() != nil {
err := wres.Err()
// If there is an error on server (e.g. compaction), the channel will return it before closed.
logrus.Errorf("watch chan error: %v", err)
wc.sendError(err)
close(watchClosedCh)
return err
}
for _, e := range wres.Events {
wc.sendEvent(parseEvent(e))
}
timer.Reset(time.Second * 20)
case <-timer.C:
return nil
}
for _, e := range wres.Events {
wc.sendEvent(parseEvent(e))
}
timer.Reset(time.Second * 20)
case <-timer.C:
break lool
}
}()
if err == nil {
wc.initialRev = 0
logrus.Debugf("watcher sync, because of not updated for a long time")
go wc.startWatching(watchClosedCh)
}
wc.initialRev = 0
logrus.Debugf("watcher sync, because of not updated for a long time")
go wc.startWatching(watchClosedCh)
}
// processEvent processes events from etcd watcher and sends results to resultChan.