From 27eca2881aa08166794ea0f31fa38f0e29868ec6 Mon Sep 17 00:00:00 2001 From: yah01 Date: Fri, 1 Jul 2022 19:16:25 +0800 Subject: [PATCH] Clean segments as releasing collection (#17932) Signed-off-by: yah01 --- internal/querycoord/meta.go | 16 +++++++++++++ internal/querycoord/meta_test.go | 3 ++- internal/querycoord/task.go | 40 +++++++++++++++++++++----------- internal/querycoord/util.go | 11 ++++++++- 4 files changed, 54 insertions(+), 16 deletions(-) diff --git a/internal/querycoord/meta.go b/internal/querycoord/meta.go index 75f0f93281..40d7cc61fc 100644 --- a/internal/querycoord/meta.go +++ b/internal/querycoord/meta.go @@ -182,6 +182,11 @@ func (m *MetaReplica) reloadFromKV() error { if err := m.segmentsInfo.loadSegments(); err != nil { return err } + for id, segment := range m.segmentsInfo.segmentIDMap { + if _, ok := m.collectionInfos[segment.CollectionID]; !ok { + delete(m.segmentsInfo.segmentIDMap, id) + } + } deltaChannelKeys, deltaChannelValues, err := m.getKvClient().LoadWithPrefix(deltaChannelMetaPrefix) if err != nil { @@ -524,6 +529,14 @@ func (m *MetaReplica) releaseCollection(collectionID UniqueID) error { m.replicas.Remove(collection.ReplicaIds...) + m.segmentsInfo.mu.Lock() + for id, segment := range m.segmentsInfo.segmentIDMap { + if segment.CollectionID == collectionID { + delete(m.segmentsInfo.segmentIDMap, id) + } + } + m.segmentsInfo.mu.Unlock() + return nil } @@ -1182,6 +1195,9 @@ func removeCollectionMeta(collectionID UniqueID, replicas []UniqueID, kv kv.Meta prefixes = append(prefixes, replicaPrefix) } + prefixes = append(prefixes, + fmt.Sprintf("%s/%d", util.SegmentMetaPrefix, collectionID)) + return kv.MultiRemoveWithPrefix(prefixes) } diff --git a/internal/querycoord/meta_test.go b/internal/querycoord/meta_test.go index af260cb86c..3c9baadafd 100644 --- a/internal/querycoord/meta_test.go +++ b/internal/querycoord/meta_test.go @@ -319,7 +319,8 @@ func TestReloadMetaFromKV(t *testing.T) { kvs[collectionKey] = string(collectionBlobs) segmentInfo := &querypb.SegmentInfo{ - SegmentID: defaultSegmentID, + SegmentID: defaultSegmentID, + CollectionID: defaultCollectionID, } segmentBlobs, err := proto.Marshal(segmentInfo) assert.Nil(t, err) diff --git a/internal/querycoord/task.go b/internal/querycoord/task.go index 17ed042bfe..fef6555fc1 100644 --- a/internal/querycoord/task.go +++ b/internal/querycoord/task.go @@ -1913,7 +1913,14 @@ func (lbt *loadBalanceTask) processNodeDownLoadBalance(ctx context.Context) erro recoveredCollectionIDs.Insert(watchInfo.CollectionID) } + log.Debug("loadBalanceTask: ready to process segments and channels on offline node", + zap.Int64("taskID", lbt.getTaskID()), + zap.Int("segmentNum", len(segments)), + zap.Int("dmChannelNum", len(dmChannels))) + if len(segments) == 0 && len(dmChannels) == 0 { + log.Debug("loadBalanceTask: no segment/channel on this offline node, skip it", + zap.Int64("taskID", lbt.getTaskID())) continue } @@ -1922,7 +1929,7 @@ func (lbt *loadBalanceTask) processNodeDownLoadBalance(ctx context.Context) erro watchDmChannelReqs := make([]*querypb.WatchDmChannelsRequest, 0) collectionInfo, err := lbt.meta.getCollectionInfoByID(collectionID) if err != nil { - log.Error("loadBalanceTask: get collectionInfo from meta failed", zap.Int64("collectionID", collectionID), zap.Error(err)) + log.Error("loadBalanceTask: get collectionInfo from meta failed", zap.Int64("taskID", lbt.getTaskID()), zap.Int64("collectionID", collectionID), zap.Error(err)) lbt.setResultInfo(err) return err } @@ -1934,25 +1941,25 @@ func (lbt *loadBalanceTask) processNodeDownLoadBalance(ctx context.Context) erro if collectionInfo.LoadType == querypb.LoadType_LoadCollection { toRecoverPartitionIDs, err = lbt.broker.showPartitionIDs(ctx, collectionID) if err != nil { - log.Error("loadBalanceTask: show collection's partitionIDs failed", zap.Int64("collectionID", collectionID), zap.Error(err)) + log.Error("loadBalanceTask: show collection's partitionIDs failed", zap.Int64("taskID", lbt.getTaskID()), zap.Int64("collectionID", collectionID), zap.Error(err)) lbt.setResultInfo(err) panic(err) } } else { toRecoverPartitionIDs = collectionInfo.PartitionIDs } - log.Info("loadBalanceTask: get collection's all partitionIDs", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", toRecoverPartitionIDs)) + log.Info("loadBalanceTask: get collection's all partitionIDs", zap.Int64("taskID", lbt.getTaskID()), zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", toRecoverPartitionIDs)) replica, err := lbt.getReplica(nodeID, collectionID) if err != nil { // getReplica maybe failed, it will cause the balanceTask execute infinitely - log.Warn("loadBalanceTask: get replica failed", zap.Int64("collectionID", collectionID), zap.Int64("nodeId", nodeID)) + log.Warn("loadBalanceTask: get replica failed", zap.Int64("taskID", lbt.getTaskID()), zap.Int64("collectionID", collectionID), zap.Int64("nodeId", nodeID)) continue } for _, partitionID := range toRecoverPartitionIDs { vChannelInfos, binlogs, err := lbt.broker.getRecoveryInfo(lbt.ctx, collectionID, partitionID) if err != nil { - log.Error("loadBalanceTask: getRecoveryInfo failed", zap.Int64("collectionID", collectionID), zap.Int64("partitionID", partitionID), zap.Error(err)) + log.Error("loadBalanceTask: getRecoveryInfo failed", zap.Int64("taskID", lbt.getTaskID()), zap.Int64("collectionID", collectionID), zap.Int64("partitionID", partitionID), zap.Error(err)) lbt.setResultInfo(err) panic(err) } @@ -1986,7 +1993,7 @@ func (lbt *loadBalanceTask) processNodeDownLoadBalance(ctx context.Context) erro for _, info := range vChannelInfos { deltaChannel, err := generateWatchDeltaChannelInfo(info) if err != nil { - log.Error("loadBalanceTask: generateWatchDeltaChannelInfo failed", zap.Int64("collectionID", collectionID), zap.String("channelName", info.ChannelName), zap.Error(err)) + log.Error("loadBalanceTask: generateWatchDeltaChannelInfo failed", zap.Int64("taskID", lbt.getTaskID()), zap.Int64("collectionID", collectionID), zap.String("channelName", info.ChannelName), zap.Error(err)) lbt.setResultInfo(err) panic(err) } @@ -1999,7 +2006,7 @@ func (lbt *loadBalanceTask) processNodeDownLoadBalance(ctx context.Context) erro // If meta is not updated here, deltaChannel meta will not be available when loadSegment reschedule err = lbt.meta.setDeltaChannel(collectionID, mergedDeltaChannel) if err != nil { - log.Error("loadBalanceTask: set delta channel info meta failed", zap.Int64("collectionID", collectionID), zap.Error(err)) + log.Error("loadBalanceTask: set delta channel info meta failed", zap.Int64("taskID", lbt.getTaskID()), zap.Int64("collectionID", collectionID), zap.Error(err)) lbt.setResultInfo(err) panic(err) } @@ -2038,7 +2045,7 @@ func (lbt *loadBalanceTask) processNodeDownLoadBalance(ctx context.Context) erro tasks, err := assignInternalTask(ctx, lbt, lbt.meta, lbt.cluster, loadSegmentReqs, watchDmChannelReqs, false, lbt.SourceNodeIDs, lbt.DstNodeIDs, replica.GetReplicaID(), lbt.broker) if err != nil { - log.Error("loadBalanceTask: assign child task failed", zap.Int64("sourceNodeID", nodeID)) + log.Error("loadBalanceTask: assign child task failed", zap.Int64("taskID", lbt.getTaskID()), zap.Int64("sourceNodeID", nodeID)) lbt.setResultInfo(err) return err } @@ -2047,9 +2054,9 @@ func (lbt *loadBalanceTask) processNodeDownLoadBalance(ctx context.Context) erro } for _, internalTask := range internalTasks { lbt.addChildTask(internalTask) - log.Info("loadBalanceTask: add a childTask", zap.String("task type", internalTask.msgType().String()), zap.Any("task", internalTask)) + log.Info("loadBalanceTask: add a childTask", zap.Int64("taskID", lbt.getTaskID()), zap.String("task type", internalTask.msgType().String()), zap.Any("task", internalTask)) } - log.Info("loadBalanceTask: assign child task done", zap.Int64s("sourceNodeIDs", lbt.SourceNodeIDs)) + log.Info("loadBalanceTask: assign child task done", zap.Int64("taskID", lbt.getTaskID()), zap.Int64s("sourceNodeIDs", lbt.SourceNodeIDs)) return nil } @@ -2250,7 +2257,8 @@ func (lbt *loadBalanceTask) postExecute(context.Context) error { zap.Int32("trigger type", int32(lbt.triggerCondition)), zap.Int64s("sourceNodeIDs", lbt.SourceNodeIDs), zap.Any("balanceReason", lbt.BalanceReason), - zap.Int64("taskID", lbt.getTaskID())) + zap.Int64("taskID", lbt.getTaskID()), + zap.Int("childTaskNum", len(lbt.childTasks))) return nil } @@ -2285,16 +2293,20 @@ func (lbt *loadBalanceTask) globalPostExecute(ctx context.Context) error { } log.Debug("removing offline nodes from replicas and segments...", - zap.Int("replicaNum", len(replicas)), - zap.Int("segmentNum", len(segments)), zap.Int64("triggerTaskID", lbt.getTaskID()), - ) + zap.Int("replicaNum", len(replicas)), + zap.Int("segmentNum", len(segments))) wg := errgroup.Group{} // Remove offline nodes from replica for replicaID := range replicas { replicaID := replicaID wg.Go(func() error { + log.Debug("remove offline nodes from replica", + zap.Int64("taskID", lbt.taskID), + zap.Int64("replicaID", replicaID), + zap.Int64s("offlineNodes", lbt.SourceNodeIDs)) + return lbt.meta.applyReplicaBalancePlan( NewRemoveBalancePlan(replicaID, lbt.SourceNodeIDs...)) }) diff --git a/internal/querycoord/util.go b/internal/querycoord/util.go index 9f2b0b1e73..c6ece959ce 100644 --- a/internal/querycoord/util.go +++ b/internal/querycoord/util.go @@ -136,6 +136,9 @@ func syncReplicaSegments(ctx context.Context, meta Meta, cluster Cluster, replic for _, shard := range replica.ShardReplicas { if len(shards) > 0 && !isInShards(shard.DmChannelName, shards) { + log.Debug("skip this shard", + zap.Int64("replicaID", replicaID), + zap.String("shard", shard.DmChannelName)) continue } @@ -155,6 +158,12 @@ func syncReplicaSegments(ctx context.Context, meta Meta, cluster Cluster, replic for i, j := 0, 0; i < len(segments); i = j { node := getNodeInReplica(replica, segments[i].NodeIds) + if node < 0 { + log.Warn("syncReplicaSegments: no segment node in replica", + zap.Int64("replicaID", replicaID), + zap.Any("segment", segments[i])) + } + partition := segments[i].PartitionID j++ @@ -207,7 +216,7 @@ func getNodeInReplica(replica *milvuspb.ReplicaInfo, nodes []UniqueID) UniqueID } } - return 0 + return -1 } func removeFromSlice(origin []UniqueID, del ...UniqueID) []UniqueID {