mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-11-30 10:59:32 +08:00
Refine time sync logic log (#15251)
Signed-off-by: xiaofan-luan <xiaofan.luan@zilliz.com>
This commit is contained in:
parent
273890a91e
commit
885ecaf222
@ -59,8 +59,8 @@ var (
|
||||
// TODO: sunby put to config
|
||||
enableTtChecker = true
|
||||
ttCheckerName = "dataTtChecker"
|
||||
ttMaxInterval = 3 * time.Minute
|
||||
ttCheckerWarnMsg = fmt.Sprintf("we haven't received tt for %f minutes", ttMaxInterval.Minutes())
|
||||
ttMaxInterval = 2 * time.Minute
|
||||
ttCheckerWarnMsg = fmt.Sprintf("Datacoord haven't received tt for %f minutes", ttMaxInterval.Minutes())
|
||||
segmentTimedFlushDuration = 10.0
|
||||
)
|
||||
|
||||
|
@ -34,13 +34,13 @@ import (
|
||||
|
||||
// proxyManager manages proxy connected to the rootcoord
|
||||
type proxyManager struct {
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
lock sync.Mutex
|
||||
etcdCli *clientv3.Client
|
||||
getSessions []func([]*sessionutil.Session)
|
||||
addSessions []func(*sessionutil.Session)
|
||||
delSessions []func(*sessionutil.Session)
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
lock sync.Mutex
|
||||
etcdCli *clientv3.Client
|
||||
initSessionsFunc []func([]*sessionutil.Session)
|
||||
addSessionsFunc []func(*sessionutil.Session)
|
||||
delSessionsFunc []func(*sessionutil.Session)
|
||||
}
|
||||
|
||||
// newProxyManager helper function to create a proxyManager
|
||||
@ -54,22 +54,22 @@ func newProxyManager(ctx context.Context, client *clientv3.Client, fns ...func([
|
||||
lock: sync.Mutex{},
|
||||
etcdCli: client,
|
||||
}
|
||||
p.getSessions = append(p.getSessions, fns...)
|
||||
p.initSessionsFunc = append(p.initSessionsFunc, fns...)
|
||||
return p
|
||||
}
|
||||
|
||||
// AddSession adds functions to addSessions function list
|
||||
func (p *proxyManager) AddSession(fns ...func(*sessionutil.Session)) {
|
||||
// AddSessionFunc adds functions to addSessions function list
|
||||
func (p *proxyManager) AddSessionFunc(fns ...func(*sessionutil.Session)) {
|
||||
p.lock.Lock()
|
||||
defer p.lock.Unlock()
|
||||
p.addSessions = append(p.addSessions, fns...)
|
||||
p.addSessionsFunc = append(p.addSessionsFunc, fns...)
|
||||
}
|
||||
|
||||
// DelSession add functions to delSessions function list
|
||||
func (p *proxyManager) DelSession(fns ...func(*sessionutil.Session)) {
|
||||
// DelSessionFunc add functions to delSessions function list
|
||||
func (p *proxyManager) DelSessionFunc(fns ...func(*sessionutil.Session)) {
|
||||
p.lock.Lock()
|
||||
defer p.lock.Unlock()
|
||||
p.delSessions = append(p.delSessions, fns...)
|
||||
p.delSessionsFunc = append(p.delSessionsFunc, fns...)
|
||||
}
|
||||
|
||||
// WatchProxy starts a goroutine to watch proxy session changes on etcd
|
||||
@ -81,8 +81,8 @@ func (p *proxyManager) WatchProxy() error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
log.Debug("succeed to get sessions on etcd", zap.Any("sessions", sessions), zap.Int64("revision", rev))
|
||||
for _, f := range p.getSessions {
|
||||
log.Debug("succeed to init sessions on etcd", zap.Any("sessions", sessions), zap.Int64("revision", rev))
|
||||
for _, f := range p.initSessionsFunc {
|
||||
f(sessions)
|
||||
}
|
||||
|
||||
@ -136,7 +136,7 @@ func (p *proxyManager) handlePutEvent(e *clientv3.Event) error {
|
||||
return err
|
||||
}
|
||||
log.Debug("received proxy put event with session", zap.Any("session", session))
|
||||
for _, f := range p.addSessions {
|
||||
for _, f := range p.addSessionsFunc {
|
||||
f(session)
|
||||
}
|
||||
metrics.RootCoordProxyLister.WithLabelValues(metricProxy(session.ServerID)).Set(1)
|
||||
@ -149,7 +149,7 @@ func (p *proxyManager) handleDeleteEvent(e *clientv3.Event) error {
|
||||
return err
|
||||
}
|
||||
log.Debug("received proxy delete event with session", zap.Any("session", session))
|
||||
for _, f := range p.delSessions {
|
||||
for _, f := range p.delSessionsFunc {
|
||||
f(session)
|
||||
}
|
||||
metrics.RootCoordProxyLister.WithLabelValues(metricProxy(session.ServerID)).Set(0)
|
||||
|
@ -76,8 +76,8 @@ func TestProxyManager(t *testing.T) {
|
||||
assert.Equal(t, int64(100), sess.ServerID)
|
||||
t.Log("del session", sess)
|
||||
}
|
||||
pm.AddSession(fa)
|
||||
pm.DelSession(fd)
|
||||
pm.AddSessionFunc(fa)
|
||||
pm.DelSessionFunc(fd)
|
||||
|
||||
err = pm.WatchProxy()
|
||||
assert.Nil(t, err)
|
||||
|
@ -22,6 +22,7 @@ import (
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"syscall"
|
||||
@ -430,21 +431,6 @@ func (c *Core) getSegments(ctx context.Context, collID typeutil.UniqueID) (map[t
|
||||
return segID2PartID, nil
|
||||
}
|
||||
|
||||
func (c *Core) setDdMsgSendFlag(b bool) error {
|
||||
flag, err := c.MetaTable.txn.Load(DDMsgSendPrefix)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if (b && flag == "true") || (!b && flag == "false") {
|
||||
log.Debug("DdMsg send flag need not change", zap.String("flag", flag))
|
||||
return nil
|
||||
}
|
||||
|
||||
err = c.MetaTable.txn.Save(DDMsgSendPrefix, strconv.FormatBool(b))
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *Core) setMsgStreams() error {
|
||||
if Params.PulsarCfg.Address == "" {
|
||||
return fmt.Errorf("pulsar address is empty")
|
||||
@ -1043,11 +1029,11 @@ func (c *Core) Init() error {
|
||||
c.proxyManager = newProxyManager(
|
||||
c.ctx,
|
||||
c.etcdCli,
|
||||
c.chanTimeTick.clearSessions,
|
||||
c.chanTimeTick.initSessions,
|
||||
c.proxyClientManager.GetProxyClients,
|
||||
)
|
||||
c.proxyManager.AddSession(c.chanTimeTick.addSession, c.proxyClientManager.AddProxyClient)
|
||||
c.proxyManager.DelSession(c.chanTimeTick.delSession, c.proxyClientManager.DelProxyClient)
|
||||
c.proxyManager.AddSessionFunc(c.chanTimeTick.addSession, c.proxyClientManager.AddProxyClient)
|
||||
c.proxyManager.DelSessionFunc(c.chanTimeTick.delSession, c.proxyClientManager.DelProxyClient)
|
||||
|
||||
c.metricsCacheManager = metricsinfo.NewMetricsCacheManager()
|
||||
|
||||
@ -1066,8 +1052,21 @@ func (c *Core) Init() error {
|
||||
func (c *Core) reSendDdMsg(ctx context.Context, force bool) error {
|
||||
if !force {
|
||||
flag, err := c.MetaTable.txn.Load(DDMsgSendPrefix)
|
||||
if err != nil || flag == "true" {
|
||||
log.Debug("No un-successful DdMsg")
|
||||
if err != nil {
|
||||
// TODO, this is super ugly hack but our kv interface does not support loadWithExist
|
||||
// leave it for later
|
||||
if strings.Contains(err.Error(), "there is no value on key") {
|
||||
log.Debug("skip reSendDdMsg with no dd-msg-send key")
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
}
|
||||
value, err := strconv.ParseBool(flag)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if value {
|
||||
log.Debug("skip reSendDdMsg with dd-msg-send set to true")
|
||||
return nil
|
||||
}
|
||||
}
|
||||
@ -1167,7 +1166,7 @@ func (c *Core) reSendDdMsg(ctx context.Context, force bool) error {
|
||||
}
|
||||
|
||||
// Update DDOperation in etcd
|
||||
return c.setDdMsgSendFlag(true)
|
||||
return c.MetaTable.txn.Save(DDMsgSendPrefix, strconv.FormatBool(true))
|
||||
}
|
||||
|
||||
// Start start rootcoord
|
||||
|
@ -19,6 +19,7 @@ package rootcoord
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strconv"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/milvus-io/milvus/internal/common"
|
||||
@ -254,7 +255,7 @@ func (t *CreateCollectionReqTask) Execute(ctx context.Context) error {
|
||||
}
|
||||
|
||||
// Update DDOperation in etcd
|
||||
return t.core.setDdMsgSendFlag(true)
|
||||
return t.core.MetaTable.txn.Save(DDMsgSendPrefix, strconv.FormatBool(true))
|
||||
}
|
||||
|
||||
// DropCollectionReqTask drop collection request task
|
||||
@ -370,7 +371,7 @@ func (t *DropCollectionReqTask) Execute(ctx context.Context) error {
|
||||
t.core.ExpireMetaCache(ctx, aliases, ts)
|
||||
|
||||
// Update DDOperation in etcd
|
||||
return t.core.setDdMsgSendFlag(true)
|
||||
return t.core.MetaTable.txn.Save(DDMsgSendPrefix, strconv.FormatBool(true))
|
||||
}
|
||||
|
||||
// HasCollectionReqTask has collection request task
|
||||
@ -564,7 +565,7 @@ func (t *CreatePartitionReqTask) Execute(ctx context.Context) error {
|
||||
t.core.ExpireMetaCache(ctx, []string{t.Req.CollectionName}, ts)
|
||||
|
||||
// Update DDOperation in etcd
|
||||
return t.core.setDdMsgSendFlag(true)
|
||||
return t.core.MetaTable.txn.Save(DDMsgSendPrefix, strconv.FormatBool(true))
|
||||
}
|
||||
|
||||
// DropPartitionReqTask drop partition request task
|
||||
@ -658,7 +659,7 @@ func (t *DropPartitionReqTask) Execute(ctx context.Context) error {
|
||||
//}
|
||||
|
||||
// Update DDOperation in etcd
|
||||
return t.core.setDdMsgSendFlag(true)
|
||||
return t.core.MetaTable.txn.Save(DDMsgSendPrefix, strconv.FormatBool(true))
|
||||
}
|
||||
|
||||
// HasPartitionReqTask has partition request task
|
||||
|
@ -149,7 +149,7 @@ func (t *timetickSync) sendToChannel() {
|
||||
if len(idleSessionList) > 0 {
|
||||
// give warning every 2 second if not get ttMsg from source sessions
|
||||
if maxCnt%10 == 0 {
|
||||
log.Warn("session idle for long time", zap.Any("idle session list", idleSessionList),
|
||||
log.Warn("session idle for long time", zap.Any("idle list", idleSessionList),
|
||||
zap.Any("idle time", Params.ProxyCfg.TimeTickInterval.Milliseconds()*maxCnt))
|
||||
}
|
||||
return
|
||||
@ -273,11 +273,12 @@ func (t *timetickSync) delSession(sess *sessionutil.Session) {
|
||||
}
|
||||
}
|
||||
|
||||
func (t *timetickSync) clearSessions(sess []*sessionutil.Session) {
|
||||
func (t *timetickSync) initSessions(sess []*sessionutil.Session) {
|
||||
t.lock.Lock()
|
||||
defer t.lock.Unlock()
|
||||
for _, s := range sess {
|
||||
t.sess2ChanTsMap[s.ServerID] = nil
|
||||
log.Debug("Init proxy sessions for timeticksync", zap.Int64("serverID", s.ServerID))
|
||||
}
|
||||
}
|
||||
|
||||
@ -302,17 +303,14 @@ func (t *timetickSync) startWatch(wg *sync.WaitGroup) {
|
||||
log.Debug("timetickSync sendChan closed")
|
||||
return
|
||||
}
|
||||
|
||||
if enableTtChecker {
|
||||
checker.Check()
|
||||
}
|
||||
// reduce each channel to get min timestamp
|
||||
local := sessTimetick[t.sourceID]
|
||||
if len(local.chanTsMap) == 0 {
|
||||
continue
|
||||
}
|
||||
|
||||
if enableTtChecker {
|
||||
checker.Check()
|
||||
}
|
||||
|
||||
hdr := fmt.Sprintf("send ts to %d channels", len(local.chanTsMap))
|
||||
tr := timerecord.NewTimeRecorder(hdr)
|
||||
wg := sync.WaitGroup{}
|
||||
|
Loading…
Reference in New Issue
Block a user