mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-11-30 19:08:30 +08:00
Add logs for add delta channels (#12347)
Signed-off-by: godchen <qingxiang.chen@zilliz.com>
This commit is contained in:
parent
0e7f24491f
commit
b45d7baf97
@ -151,6 +151,7 @@ func newMeta(ctx context.Context, kv kv.MetaKv, factory msgstream.Factory, idAll
|
||||
}
|
||||
|
||||
func (m *MetaReplica) reloadFromKV() error {
|
||||
log.Debug("start reload from kv")
|
||||
collectionKeys, collectionValues, err := m.client.LoadWithPrefix(collectionMetaPrefix)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -230,6 +231,7 @@ func (m *MetaReplica) reloadFromKV() error {
|
||||
m.globalSeekPosition = position
|
||||
}
|
||||
//TODO::update partition states
|
||||
log.Debug("reload from kv finished")
|
||||
|
||||
return nil
|
||||
}
|
||||
@ -1000,13 +1002,16 @@ func (m *MetaReplica) setDeltaChannel(collectionID UniqueID, infos []*datapb.Vch
|
||||
defer m.deltaChannelMu.Unlock()
|
||||
_, ok := m.deltaChannelInfos[collectionID]
|
||||
if ok {
|
||||
log.Debug("delta channel already exist", zap.Any("collectionID", collectionID))
|
||||
return nil
|
||||
}
|
||||
|
||||
err := saveDeltaChannelInfo(collectionID, infos, m.client)
|
||||
if err != nil {
|
||||
log.Error("save delta channel info error", zap.Error(err))
|
||||
return err
|
||||
}
|
||||
log.Debug("save delta channel infos to meta", zap.Any("collectionID", collectionID), zap.Any("infos", infos))
|
||||
m.deltaChannelInfos[collectionID] = infos
|
||||
return nil
|
||||
}
|
||||
|
@ -464,7 +464,10 @@ func (lct *loadCollectionTask) execute(ctx context.Context) error {
|
||||
Infos: mergedDeltaChannels,
|
||||
}
|
||||
// If meta is not updated here, deltaChannel meta will not be available when loadSegment reschedule
|
||||
lct.meta.setDeltaChannel(watchDeltaChannelReq.CollectionID, watchDeltaChannelReq.Infos)
|
||||
err = lct.meta.setDeltaChannel(watchDeltaChannelReq.CollectionID, watchDeltaChannelReq.Infos)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
internalTasks, err := assignInternalTask(ctx, collectionID, lct, lct.meta, lct.cluster, loadSegmentReqs, watchDmChannelReqs, watchDeltaChannelReq, false, nil, nil)
|
||||
if err != nil {
|
||||
@ -804,7 +807,10 @@ func (lpt *loadPartitionTask) execute(ctx context.Context) error {
|
||||
Infos: mergedDeltaChannels,
|
||||
}
|
||||
// If meta is not updated here, deltaChannel meta will not be available when loadSegment reschedule
|
||||
lpt.meta.setDeltaChannel(watchDeltaChannelReq.CollectionID, watchDeltaChannelReq.Infos)
|
||||
err := lpt.meta.setDeltaChannel(watchDeltaChannelReq.CollectionID, watchDeltaChannelReq.Infos)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
internalTasks, err := assignInternalTask(ctx, collectionID, lpt, lpt.meta, lpt.cluster, loadSegmentReqs, watchDmReqs, watchDeltaChannelReq, false, nil, nil)
|
||||
if err != nil {
|
||||
log.Warn("loadPartitionTask: assign child task failed", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", partitionIDs))
|
||||
@ -1591,7 +1597,10 @@ func (ht *handoffTask) execute(ctx context.Context) error {
|
||||
Infos: mergedDeltaChannels,
|
||||
}
|
||||
// If meta is not updated here, deltaChannel meta will not be available when loadSegment reschedule
|
||||
ht.meta.setDeltaChannel(watchDeltaChannelReq.CollectionID, watchDeltaChannelReq.Infos)
|
||||
err = ht.meta.setDeltaChannel(watchDeltaChannelReq.CollectionID, watchDeltaChannelReq.Infos)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
internalTasks, err := assignInternalTask(ctx, collectionID, ht, ht.meta, ht.cluster, []*querypb.LoadSegmentsRequest{loadSegmentReq}, nil, watchDeltaChannelReq, true, nil, nil)
|
||||
if err != nil {
|
||||
log.Error("handoffTask: assign child task failed", zap.Any("segmentInfo", segmentInfo))
|
||||
@ -1819,7 +1828,10 @@ func (lbt *loadBalanceTask) execute(ctx context.Context) error {
|
||||
Infos: mergedDeltaChannel,
|
||||
}
|
||||
// If meta is not updated here, deltaChannel meta will not be available when loadSegment reschedule
|
||||
lbt.meta.setDeltaChannel(watchDeltaChannelReq.CollectionID, watchDeltaChannelReq.Infos)
|
||||
err = lbt.meta.setDeltaChannel(watchDeltaChannelReq.CollectionID, watchDeltaChannelReq.Infos)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
internalTasks, err := assignInternalTask(ctx, collectionID, lbt, lbt.meta, lbt.cluster, loadSegmentReqs, watchDmChannelReqs, watchDeltaChannelReq, true, lbt.SourceNodeIDs, lbt.DstNodeIDs)
|
||||
if err != nil {
|
||||
@ -1977,7 +1989,10 @@ func (lbt *loadBalanceTask) execute(ctx context.Context) error {
|
||||
Infos: mergedDeltaChannels,
|
||||
}
|
||||
// If meta is not updated here, deltaChannel meta will not be available when loadSegment reschedule
|
||||
lbt.meta.setDeltaChannel(watchDeltaChannelReq.CollectionID, watchDeltaChannelReq.Infos)
|
||||
err = lbt.meta.setDeltaChannel(watchDeltaChannelReq.CollectionID, watchDeltaChannelReq.Infos)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// TODO:: assignInternalTask with multi collection
|
||||
internalTasks, err := assignInternalTask(ctx, collectionID, lbt, lbt.meta, lbt.cluster, loadSegmentReqs, nil, watchDeltaChannelReq, false, lbt.SourceNodeIDs, lbt.DstNodeIDs)
|
||||
@ -2227,5 +2242,9 @@ func mergeWatchDeltaChannelInfo(infos []*datapb.VchannelInfo) []*datapb.Vchannel
|
||||
for _, index := range minPositions {
|
||||
result = append(result, infos[index])
|
||||
}
|
||||
log.Debug("merge delta channels finished",
|
||||
zap.Any("origin info length", len(infos)),
|
||||
zap.Any("merged info length", len(result)),
|
||||
)
|
||||
return result
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user