diff --git a/internal/core/src/segcore/SegmentSealedImpl.cpp b/internal/core/src/segcore/SegmentSealedImpl.cpp index a148640d72..94789843ce 100644 --- a/internal/core/src/segcore/SegmentSealedImpl.cpp +++ b/internal/core/src/segcore/SegmentSealedImpl.cpp @@ -80,7 +80,6 @@ SegmentSealedImpl::LoadVecIndex(const LoadIndexInfo& info) { AssertInfo(row_count > 0, "Index count is 0"); std::unique_lock lck(mutex_); - // Don't allow vector raw data and index exist at the same time AssertInfo( !get_bit(index_ready_bitset_, field_id), "vector index has been exist at " + std::to_string(field_id.get())); @@ -115,10 +114,6 @@ SegmentSealedImpl::LoadScalarIndex(const LoadIndexInfo& info) { AssertInfo(row_count > 0, "Index count is 0"); std::unique_lock lck(mutex_); - // Don't allow scalar raw data and index exist at the same time - AssertInfo(!get_bit(field_data_ready_bitset_, field_id), - "scalar index can't be loaded when raw data exists at field " + - std::to_string(field_id.get())); AssertInfo( !get_bit(index_ready_bitset_, field_id), "scalar index has been exist at " + std::to_string(field_id.get())); @@ -166,6 +161,10 @@ SegmentSealedImpl::LoadScalarIndex(const LoadIndexInfo& info) { set_bit(index_ready_bitset_, field_id, true); update_row_count(row_count); + // release field column + fields_.erase(field_id); + set_bit(field_data_ready_bitset_, field_id, false); + lck.unlock(); } diff --git a/internal/datacoord/handler.go b/internal/datacoord/handler.go index b9578b4373..ddb574fd67 100644 --- a/internal/datacoord/handler.go +++ b/internal/datacoord/handler.go @@ -199,24 +199,16 @@ func (h *ServerHandler) GetQueryVChanPositions(channel *channel, partitionIDs .. for retrieveUnIndexed() { } - for segId := range unIndexedIDs { - segInfo := segmentInfos[segId] - if segInfo.GetState() == commonpb.SegmentState_Dropped { - unIndexedIDs.Remove(segId) - indexedIDs.Insert(segId) - } - } - - unIndexedIDs.Insert(growingIDs.Collect()...) + // unindexed is flushed segments as well + indexedIDs.Insert(unIndexedIDs.Collect()...) return &datapb.VchannelInfo{ CollectionID: channel.CollectionID, ChannelName: channel.Name, SeekPosition: h.GetChannelSeekPosition(channel, partitionIDs...), FlushedSegmentIds: indexedIDs.Collect(), - UnflushedSegmentIds: unIndexedIDs.Collect(), + UnflushedSegmentIds: growingIDs.Collect(), DroppedSegmentIds: droppedIDs.Collect(), - // IndexedSegmentIds: indexed.Collect(), } } diff --git a/internal/querycoordv2/checkers/controller.go b/internal/querycoordv2/checkers/controller.go index 3956b7e966..39f0c2f585 100644 --- a/internal/querycoordv2/checkers/controller.go +++ b/internal/querycoordv2/checkers/controller.go @@ -38,6 +38,7 @@ var ( Segment_Checker = "segment_checker" Channel_Checker = "channel_checker" Balance_Checker = "balance_checker" + Index_Checker = "index_checker" ) type CheckerController struct { @@ -46,7 +47,7 @@ type CheckerController struct { meta *meta.Meta dist *meta.DistributionManager targetMgr *meta.TargetManager - broker *meta.CoordinatorBroker + broker meta.Broker nodeMgr *session.NodeManager balancer balance.Balance @@ -62,7 +63,9 @@ func NewCheckerController( targetMgr *meta.TargetManager, balancer balance.Balance, nodeMgr *session.NodeManager, - scheduler task.Scheduler) *CheckerController { + scheduler task.Scheduler, + broker meta.Broker, +) *CheckerController { // CheckerController runs checkers with the order, // the former checker has higher priority @@ -70,6 +73,7 @@ func NewCheckerController( Channel_Checker: NewChannelChecker(meta, dist, targetMgr, balancer), Segment_Checker: NewSegmentChecker(meta, dist, targetMgr, balancer, nodeMgr), Balance_Checker: NewBalanceChecker(meta, balancer, nodeMgr, scheduler), + Index_Checker: NewIndexChecker(meta, dist, broker), } id := 0 @@ -91,6 +95,7 @@ func NewCheckerController( targetMgr: targetMgr, scheduler: scheduler, checkers: checkers, + broker: broker, } } @@ -108,6 +113,8 @@ func getCheckerInterval(checkerType string) time.Duration { return Params.QueryCoordCfg.ChannelCheckInterval.GetAsDuration(time.Millisecond) case Balance_Checker: return Params.QueryCoordCfg.BalanceCheckInterval.GetAsDuration(time.Millisecond) + case Index_Checker: + return Params.QueryCoordCfg.IndexCheckInterval.GetAsDuration(time.Millisecond) default: return Params.QueryCoordCfg.CheckInterval.GetAsDuration(time.Millisecond) } diff --git a/internal/querycoordv2/checkers/controller_test.go b/internal/querycoordv2/checkers/controller_test.go index 87c4db4af6..a4bddbbe55 100644 --- a/internal/querycoordv2/checkers/controller_test.go +++ b/internal/querycoordv2/checkers/controller_test.go @@ -79,7 +79,7 @@ func (suite *CheckerControllerSuite) SetupTest() { suite.balancer = balance.NewMockBalancer(suite.T()) suite.scheduler = task.NewMockScheduler(suite.T()) - suite.controller = NewCheckerController(suite.meta, suite.dist, suite.targetManager, suite.balancer, suite.nodeMgr, suite.scheduler) + suite.controller = NewCheckerController(suite.meta, suite.dist, suite.targetManager, suite.balancer, suite.nodeMgr, suite.scheduler, suite.broker) } func (suite *CheckerControllerSuite) TestBasic() { diff --git a/internal/querycoordv2/checkers/index_checker.go b/internal/querycoordv2/checkers/index_checker.go new file mode 100644 index 0000000000..88d0576ea5 --- /dev/null +++ b/internal/querycoordv2/checkers/index_checker.go @@ -0,0 +1,161 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package checkers + +import ( + "context" + "time" + + "github.com/milvus-io/milvus/internal/proto/querypb" + "github.com/milvus-io/milvus/internal/querycoordv2/meta" + "github.com/milvus-io/milvus/internal/querycoordv2/params" + "github.com/milvus-io/milvus/internal/querycoordv2/task" + "github.com/milvus-io/milvus/pkg/log" + "github.com/milvus-io/milvus/pkg/util/typeutil" + "github.com/samber/lo" + "go.uber.org/zap" +) + +var _ Checker = (*IndexChecker)(nil) + +// IndexChecker perform segment index check. +type IndexChecker struct { + baseChecker + meta *meta.Meta + dist *meta.DistributionManager + broker meta.Broker +} + +func NewIndexChecker( + meta *meta.Meta, + dist *meta.DistributionManager, + broker meta.Broker, +) *IndexChecker { + return &IndexChecker{ + meta: meta, + dist: dist, + broker: broker, + } +} + +func (c *IndexChecker) Description() string { + return "SegmentChecker checks index state change of segments and generates load index task" +} + +func (c *IndexChecker) Check(ctx context.Context) []task.Task { + collectionIDs := c.meta.CollectionManager.GetAll() + var tasks []task.Task + + for _, collectionID := range collectionIDs { + collection := c.meta.CollectionManager.GetCollection(collectionID) + if collection == nil { + log.Warn("collection released during check index", zap.Int64("collection", collectionID)) + continue + } + replicas := c.meta.ReplicaManager.GetByCollection(collectionID) + for _, replica := range replicas { + tasks = append(tasks, c.checkReplica(ctx, collection, replica)...) + } + } + + return tasks +} + +func (c *IndexChecker) checkReplica(ctx context.Context, collection *meta.Collection, replica *meta.Replica) []task.Task { + log := log.Ctx(ctx).With( + zap.Int64("collectionID", collection.GetCollectionID()), + ) + var tasks []task.Task + + segments := c.getHistoricalSegmentsDist(replica) + idSegments := make(map[int64]*meta.Segment) + + targets := make(map[int64][]int64) // segmentID => FieldID + for _, segment := range segments { + missing := c.checkSegment(ctx, segment, collection) + if len(missing) > 0 { + targets[segment.GetID()] = missing + idSegments[segment.GetID()] = segment + } + } + + segmentsToUpdate := typeutil.NewSet[int64]() + for segment, fields := range targets { + missingFields := typeutil.NewSet(fields...) + infos, err := c.broker.GetIndexInfo(ctx, collection.GetCollectionID(), segment) + if err != nil { + log.Warn("failed to get indexInfo for segment", zap.Int64("segmentID", segment), zap.Error(err)) + continue + } + for _, info := range infos { + if missingFields.Contain(info.GetFieldID()) && info.GetEnableIndex() { + segmentsToUpdate.Insert(segment) + } + } + } + + tasks = lo.FilterMap(segmentsToUpdate.Collect(), func(segmentID int64, _ int) (task.Task, bool) { + return c.createSegmentUpdateTask(ctx, idSegments[segmentID], replica) + }) + + return tasks +} + +func (c *IndexChecker) checkSegment(ctx context.Context, segment *meta.Segment, collection *meta.Collection) (fieldIDs []int64) { + var result []int64 + for fieldID, indexID := range collection.GetFieldIndexID() { + info, ok := segment.IndexInfo[fieldID] + if !ok { + result = append(result, fieldID) + continue + } + if indexID != info.GetIndexID() || !info.GetEnableIndex() { + result = append(result, fieldID) + } + } + return result +} + +func (c *IndexChecker) getHistoricalSegmentsDist(replica *meta.Replica) []*meta.Segment { + var ret []*meta.Segment + for _, node := range replica.GetNodes() { + ret = append(ret, c.dist.SegmentDistManager.GetByCollectionAndNode(replica.CollectionID, node)...) + } + return ret +} + +func (c *IndexChecker) createSegmentUpdateTask(ctx context.Context, segment *meta.Segment, replica *meta.Replica) (task.Task, bool) { + action := task.NewSegmentActionWithScope(segment.Node, task.ActionTypeUpdate, segment.GetInsertChannel(), segment.GetID(), querypb.DataScope_Historical) + task, err := task.NewSegmentTask( + ctx, + params.Params.QueryCoordCfg.SegmentTaskTimeout.GetAsDuration(time.Millisecond), + c.ID(), + segment.GetCollectionID(), + replica.GetID(), + action, + ) + if err != nil { + log.Warn("create segment update task failed", + zap.Int64("collection", segment.GetCollectionID()), + zap.String("channel", segment.GetInsertChannel()), + zap.Int64("node", segment.Node), + zap.Error(err), + ) + return nil, false + } + return task, true +} diff --git a/internal/querycoordv2/checkers/index_checker_test.go b/internal/querycoordv2/checkers/index_checker_test.go new file mode 100644 index 0000000000..c2119cffcc --- /dev/null +++ b/internal/querycoordv2/checkers/index_checker_test.go @@ -0,0 +1,190 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package checkers + +import ( + "context" + "testing" + + "github.com/cockroachdb/errors" + "github.com/milvus-io/milvus/internal/kv" + etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" + "github.com/milvus-io/milvus/internal/proto/querypb" + "github.com/milvus-io/milvus/internal/querycoordv2/meta" + "github.com/milvus-io/milvus/internal/querycoordv2/params" + "github.com/milvus-io/milvus/internal/querycoordv2/session" + "github.com/milvus-io/milvus/internal/querycoordv2/task" + "github.com/milvus-io/milvus/internal/querycoordv2/utils" + "github.com/milvus-io/milvus/pkg/util/etcd" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/suite" +) + +type IndexCheckerSuite struct { + suite.Suite + kv kv.MetaKv + checker *IndexChecker + meta *meta.Meta + broker *meta.MockBroker + nodeMgr *session.NodeManager +} + +func (suite *IndexCheckerSuite) SetupSuite() { + params.Params.Init() +} + +func (suite *IndexCheckerSuite) SetupTest() { + var err error + config := params.GenerateEtcdConfig() + cli, err := etcd.GetEtcdClient( + config.UseEmbedEtcd.GetAsBool(), + config.EtcdUseSSL.GetAsBool(), + config.Endpoints.GetAsStrings(), + config.EtcdTLSCert.GetValue(), + config.EtcdTLSKey.GetValue(), + config.EtcdTLSCACert.GetValue(), + config.EtcdTLSMinVersion.GetValue()) + suite.Require().NoError(err) + suite.kv = etcdkv.NewEtcdKV(cli, config.MetaRootPath.GetValue()) + + // meta + store := meta.NewMetaStore(suite.kv) + idAllocator := params.RandomIncrementIDAllocator() + suite.nodeMgr = session.NewNodeManager() + suite.meta = meta.NewMeta(idAllocator, store, suite.nodeMgr) + distManager := meta.NewDistributionManager() + suite.broker = meta.NewMockBroker(suite.T()) + + suite.checker = NewIndexChecker(suite.meta, distManager, suite.broker) +} + +func (suite *IndexCheckerSuite) TearDownTest() { + suite.kv.Close() +} + +func (suite *IndexCheckerSuite) TestLoadIndex() { + checker := suite.checker + + // meta + coll := utils.CreateTestCollection(1, 1) + coll.FieldIndexID = map[int64]int64{101: 1000} + checker.meta.CollectionManager.PutCollection(coll) + checker.meta.ReplicaManager.Put(utils.CreateTestReplica(200, 1, []int64{1, 2})) + suite.nodeMgr.Add(session.NewNodeInfo(1, "localhost")) + suite.nodeMgr.Add(session.NewNodeInfo(2, "localhost")) + checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 1) + checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 2) + + // dist + checker.dist.SegmentDistManager.Update(1, utils.CreateTestSegment(1, 1, 2, 1, 1, "test-insert-channel")) + + // broker + suite.broker.EXPECT().GetIndexInfo(mock.Anything, int64(1), int64(2)). + Return([]*querypb.FieldIndexInfo{ + { + FieldID: 101, + IndexID: 1000, + EnableIndex: true, + }, + }, nil) + + tasks := checker.Check(context.Background()) + suite.Require().Len(tasks, 1) + + t := tasks[0] + suite.Require().Len(t.Actions(), 1) + + action, ok := t.Actions()[0].(*task.SegmentAction) + suite.Require().True(ok) + suite.EqualValues(200, t.ReplicaID()) + suite.Equal(task.ActionTypeUpdate, action.Type()) + suite.EqualValues(2, action.SegmentID()) +} + +func (suite *IndexCheckerSuite) TestIndexInfoNotMatch() { + checker := suite.checker + + // meta + coll := utils.CreateTestCollection(1, 1) + coll.FieldIndexID = map[int64]int64{101: 1000} + checker.meta.CollectionManager.PutCollection(coll) + checker.meta.ReplicaManager.Put(utils.CreateTestReplica(200, 1, []int64{1, 2})) + suite.nodeMgr.Add(session.NewNodeInfo(1, "localhost")) + suite.nodeMgr.Add(session.NewNodeInfo(2, "localhost")) + checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 1) + checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 2) + + // dist + checker.dist.SegmentDistManager.Update(1, utils.CreateTestSegment(1, 1, 2, 1, 1, "test-insert-channel")) + checker.dist.SegmentDistManager.Update(1, utils.CreateTestSegment(1, 1, 3, 1, 1, "test-insert-channel")) + + // broker + suite.broker.EXPECT().GetIndexInfo(mock.Anything, int64(1), mock.AnythingOfType("int64")).Call. + Return(func(ctx context.Context, collectionID, segmentID int64) []*querypb.FieldIndexInfo { + if segmentID == 2 { + return []*querypb.FieldIndexInfo{ + { + FieldID: 101, + IndexID: 1000, + EnableIndex: false, + }, + } + } + if segmentID == 3 { + return []*querypb.FieldIndexInfo{ + { + FieldID: 101, + IndexID: 1002, + EnableIndex: false, + }, + } + } + return nil + }, nil) + + tasks := checker.Check(context.Background()) + suite.Require().Len(tasks, 0) +} + +func (suite *IndexCheckerSuite) TestGetIndexInfoFailed() { + checker := suite.checker + + // meta + coll := utils.CreateTestCollection(1, 1) + coll.FieldIndexID = map[int64]int64{101: 1000} + checker.meta.CollectionManager.PutCollection(coll) + checker.meta.ReplicaManager.Put(utils.CreateTestReplica(200, 1, []int64{1, 2})) + suite.nodeMgr.Add(session.NewNodeInfo(1, "localhost")) + suite.nodeMgr.Add(session.NewNodeInfo(2, "localhost")) + checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 1) + checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 2) + + // dist + checker.dist.SegmentDistManager.Update(1, utils.CreateTestSegment(1, 1, 2, 1, 1, "test-insert-channel")) + checker.dist.SegmentDistManager.Update(1, utils.CreateTestSegment(1, 1, 3, 1, 1, "test-insert-channel")) + + // broker + suite.broker.EXPECT().GetIndexInfo(mock.Anything, int64(1), mock.AnythingOfType("int64")). + Return(nil, errors.New("mocked error")) + + tasks := checker.Check(context.Background()) + suite.Require().Len(tasks, 0) +} + +func TestIndexChecker(t *testing.T) { + suite.Run(t, new(IndexCheckerSuite)) +} diff --git a/internal/querycoordv2/server.go b/internal/querycoordv2/server.go index 3b162a2d47..fad88d7162 100644 --- a/internal/querycoordv2/server.go +++ b/internal/querycoordv2/server.go @@ -280,6 +280,7 @@ func (s *Server) initQueryCoord() error { s.balancer, s.nodeMgr, s.taskScheduler, + s.broker, ) // Init observers diff --git a/internal/querycoordv2/server_test.go b/internal/querycoordv2/server_test.go index 6bdeeb2f09..32f8d9c034 100644 --- a/internal/querycoordv2/server_test.go +++ b/internal/querycoordv2/server_test.go @@ -504,6 +504,7 @@ func (suite *ServerSuite) hackServer() { suite.server.balancer, suite.server.nodeMgr, suite.server.taskScheduler, + suite.server.broker, ) suite.server.targetObserver = observers.NewTargetObserver( suite.server.meta, diff --git a/internal/querycoordv2/task/action.go b/internal/querycoordv2/task/action.go index 092efd42cf..28cee84bdb 100644 --- a/internal/querycoordv2/task/action.go +++ b/internal/querycoordv2/task/action.go @@ -31,6 +31,7 @@ type ActionType = int32 const ( ActionTypeGrow ActionType = iota + 1 ActionTypeReduce + ActionTypeUpdate ) type Action interface { @@ -71,7 +72,7 @@ type SegmentAction struct { segmentID UniqueID scope querypb.DataScope - isReleaseCommitted atomic.Bool + rpcReturned atomic.Bool } func NewSegmentAction(nodeID UniqueID, typ ActionType, shard string, segmentID UniqueID) *SegmentAction { @@ -81,10 +82,10 @@ func NewSegmentAction(nodeID UniqueID, typ ActionType, shard string, segmentID U func NewSegmentActionWithScope(nodeID UniqueID, typ ActionType, shard string, segmentID UniqueID, scope querypb.DataScope) *SegmentAction { base := NewBaseAction(nodeID, typ, shard) return &SegmentAction{ - BaseAction: base, - segmentID: segmentID, - scope: scope, - isReleaseCommitted: *atomic.NewBool(false), + BaseAction: base, + segmentID: segmentID, + scope: scope, + rpcReturned: *atomic.NewBool(false), } } @@ -119,7 +120,9 @@ func (action *SegmentAction) IsFinished(distMgr *meta.DistributionManager) bool if !funcutil.SliceContain(segments, action.SegmentID()) { return true } - return action.isReleaseCommitted.Load() + return action.rpcReturned.Load() + } else if action.Type() == ActionTypeUpdate { + return action.rpcReturned.Load() } return true diff --git a/internal/querycoordv2/task/executor.go b/internal/querycoordv2/task/executor.go index 879176ce29..55b8856449 100644 --- a/internal/querycoordv2/task/executor.go +++ b/internal/querycoordv2/task/executor.go @@ -213,7 +213,7 @@ func (ex *Executor) removeTask(task Task, step int) { func (ex *Executor) executeSegmentAction(task *SegmentTask, step int) { switch task.Actions()[step].Type() { - case ActionTypeGrow: + case ActionTypeGrow, ActionTypeUpdate: ex.loadSegment(task, step) case ActionTypeReduce: @@ -225,6 +225,7 @@ func (ex *Executor) executeSegmentAction(task *SegmentTask, step int) { // not really executes the request func (ex *Executor) loadSegment(task *SegmentTask, step int) error { action := task.Actions()[step].(*SegmentAction) + defer action.rpcReturned.Store(true) log := log.With( zap.Int64("taskID", task.ID()), zap.Int64("collectionID", task.CollectionID()), @@ -276,7 +277,7 @@ func (ex *Executor) loadSegment(task *SegmentTask, step int) error { switch GetTaskType(task) { case TaskTypeGrow: readableVersion = ex.targetMgr.GetCollectionTargetVersion(task.CollectionID(), meta.NextTarget) - case TaskTypeMove: + case TaskTypeMove, TaskTypeUpdate: readableVersion = ex.targetMgr.GetCollectionTargetVersion(task.CollectionID(), meta.CurrentTarget) } loadInfo := utils.PackSegmentLoadInfo(resp, indexes, readableVersion) @@ -309,7 +310,7 @@ func (ex *Executor) releaseSegment(task *SegmentTask, step int) { defer ex.removeTask(task, step) startTs := time.Now() action := task.Actions()[step].(*SegmentAction) - defer action.isReleaseCommitted.Store(true) + defer action.rpcReturned.Store(true) log := log.With( zap.Int64("taskID", task.ID()), diff --git a/internal/querycoordv2/task/scheduler.go b/internal/querycoordv2/task/scheduler.go index 826ece19f7..99f18e234c 100644 --- a/internal/querycoordv2/task/scheduler.go +++ b/internal/querycoordv2/task/scheduler.go @@ -44,6 +44,7 @@ const ( TaskTypeGrow Type = iota + 1 TaskTypeReduce TaskTypeMove + TaskTypeUpdate ) type Type = int32 @@ -549,7 +550,7 @@ func (scheduler *taskScheduler) isRelated(task Task, node int64) bool { if task, ok := task.(*SegmentTask); ok { taskType := GetTaskType(task) var segment *datapb.SegmentInfo - if taskType == TaskTypeMove { + if taskType == TaskTypeMove || taskType == TaskTypeUpdate { segment = scheduler.targetMgr.GetHistoricalSegment(task.CollectionID(), task.SegmentID(), meta.CurrentTarget) } else { segment = scheduler.targetMgr.GetHistoricalSegment(task.CollectionID(), task.SegmentID(), meta.NextTarget) @@ -748,7 +749,7 @@ func (scheduler *taskScheduler) checkSegmentTaskStale(task *SegmentTask) error { case ActionTypeGrow: taskType := GetTaskType(task) var segment *datapb.SegmentInfo - if taskType == TaskTypeMove { + if taskType == TaskTypeMove || taskType == TaskTypeUpdate { segment = scheduler.targetMgr.GetHistoricalSegment(task.CollectionID(), task.SegmentID(), meta.CurrentTarget) } else { segment = scheduler.targetMgr.GetHistoricalSegment(task.CollectionID(), task.SegmentID(), meta.NextTarget) diff --git a/internal/querycoordv2/task/utils.go b/internal/querycoordv2/task/utils.go index 7942a323cc..3d95c3903e 100644 --- a/internal/querycoordv2/task/utils.go +++ b/internal/querycoordv2/task/utils.go @@ -74,13 +74,17 @@ func SetReason(reason string, tasks ...Task) { // - only 1 reduce action -> Reduce // - 1 grow action, and ends with 1 reduce action -> Move func GetTaskType(task Task) Type { - if len(task.Actions()) > 1 { + switch { + case len(task.Actions()) > 1: return TaskTypeMove - } else if task.Actions()[0].Type() == ActionTypeGrow { + case task.Actions()[0].Type() == ActionTypeGrow: return TaskTypeGrow - } else { + case task.Actions()[0].Type() == ActionTypeReduce: return TaskTypeReduce + case task.Actions()[0].Type() == ActionTypeUpdate: + return TaskTypeUpdate } + return 0 } func packLoadSegmentRequest( @@ -91,6 +95,10 @@ func packLoadSegmentRequest( loadInfo *querypb.SegmentLoadInfo, indexInfo []*indexpb.IndexInfo, ) *querypb.LoadSegmentsRequest { + loadScope := querypb.LoadScope_Full + if action.Type() == ActionTypeUpdate { + loadScope = querypb.LoadScope_Index + } return &querypb.LoadSegmentsRequest{ Base: commonpbutil.NewMsgBase( commonpbutil.WithMsgType(commonpb.MsgType_LoadSegments), @@ -106,6 +114,7 @@ func packLoadSegmentRequest( Version: time.Now().UnixNano(), NeedTransfer: true, IndexInfoList: indexInfo, + LoadScope: loadScope, } } diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index c6a432058b..e430bf94ac 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -1156,6 +1156,7 @@ type queryCoordConfig struct { SegmentCheckInterval ParamItem `refreshable:"true"` ChannelCheckInterval ParamItem `refreshable:"true"` BalanceCheckInterval ParamItem `refreshable:"true"` + IndexCheckInterval ParamItem `refreshable:"true"` ChannelTaskTimeout ParamItem `refreshable:"true"` SegmentTaskTimeout ParamItem `refreshable:"true"` DistPullInterval ParamItem `refreshable:"false"` @@ -1334,6 +1335,15 @@ func (p *queryCoordConfig) init(base *BaseTable) { } p.BalanceCheckInterval.Init(base.mgr) + p.IndexCheckInterval = ParamItem{ + Key: "queryCoord.checkIndexInterval", + Version: "2.3.0", + DefaultValue: "10000", + PanicIfEmpty: true, + Export: true, + } + p.IndexCheckInterval.Init(base.mgr) + p.ChannelTaskTimeout = ParamItem{ Key: "queryCoord.channelTaskTimeout", Version: "2.0.0", diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index 8a2cf3a99a..eaf7f0eef8 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -296,6 +296,7 @@ func TestComponentParam(t *testing.T) { assert.Equal(t, 1000, Params.SegmentCheckInterval.GetAsInt()) assert.Equal(t, 1000, Params.ChannelCheckInterval.GetAsInt()) assert.Equal(t, 10000, Params.BalanceCheckInterval.GetAsInt()) + assert.Equal(t, 10000, Params.IndexCheckInterval.GetAsInt()) }) t.Run("test queryNodeConfig", func(t *testing.T) {