mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-02 03:48:37 +08:00
Support replace indexed field in QueryCoord (#25747)
Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
parent
fc6f0f97a6
commit
1045c88102
@ -80,7 +80,6 @@ SegmentSealedImpl::LoadVecIndex(const LoadIndexInfo& info) {
|
||||
AssertInfo(row_count > 0, "Index count is 0");
|
||||
|
||||
std::unique_lock lck(mutex_);
|
||||
// Don't allow vector raw data and index exist at the same time
|
||||
AssertInfo(
|
||||
!get_bit(index_ready_bitset_, field_id),
|
||||
"vector index has been exist at " + std::to_string(field_id.get()));
|
||||
@ -115,10 +114,6 @@ SegmentSealedImpl::LoadScalarIndex(const LoadIndexInfo& info) {
|
||||
AssertInfo(row_count > 0, "Index count is 0");
|
||||
|
||||
std::unique_lock lck(mutex_);
|
||||
// Don't allow scalar raw data and index exist at the same time
|
||||
AssertInfo(!get_bit(field_data_ready_bitset_, field_id),
|
||||
"scalar index can't be loaded when raw data exists at field " +
|
||||
std::to_string(field_id.get()));
|
||||
AssertInfo(
|
||||
!get_bit(index_ready_bitset_, field_id),
|
||||
"scalar index has been exist at " + std::to_string(field_id.get()));
|
||||
@ -166,6 +161,10 @@ SegmentSealedImpl::LoadScalarIndex(const LoadIndexInfo& info) {
|
||||
|
||||
set_bit(index_ready_bitset_, field_id, true);
|
||||
update_row_count(row_count);
|
||||
// release field column
|
||||
fields_.erase(field_id);
|
||||
set_bit(field_data_ready_bitset_, field_id, false);
|
||||
|
||||
lck.unlock();
|
||||
}
|
||||
|
||||
|
@ -199,24 +199,16 @@ func (h *ServerHandler) GetQueryVChanPositions(channel *channel, partitionIDs ..
|
||||
for retrieveUnIndexed() {
|
||||
}
|
||||
|
||||
for segId := range unIndexedIDs {
|
||||
segInfo := segmentInfos[segId]
|
||||
if segInfo.GetState() == commonpb.SegmentState_Dropped {
|
||||
unIndexedIDs.Remove(segId)
|
||||
indexedIDs.Insert(segId)
|
||||
}
|
||||
}
|
||||
|
||||
unIndexedIDs.Insert(growingIDs.Collect()...)
|
||||
// unindexed is flushed segments as well
|
||||
indexedIDs.Insert(unIndexedIDs.Collect()...)
|
||||
|
||||
return &datapb.VchannelInfo{
|
||||
CollectionID: channel.CollectionID,
|
||||
ChannelName: channel.Name,
|
||||
SeekPosition: h.GetChannelSeekPosition(channel, partitionIDs...),
|
||||
FlushedSegmentIds: indexedIDs.Collect(),
|
||||
UnflushedSegmentIds: unIndexedIDs.Collect(),
|
||||
UnflushedSegmentIds: growingIDs.Collect(),
|
||||
DroppedSegmentIds: droppedIDs.Collect(),
|
||||
// IndexedSegmentIds: indexed.Collect(),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -38,6 +38,7 @@ var (
|
||||
Segment_Checker = "segment_checker"
|
||||
Channel_Checker = "channel_checker"
|
||||
Balance_Checker = "balance_checker"
|
||||
Index_Checker = "index_checker"
|
||||
)
|
||||
|
||||
type CheckerController struct {
|
||||
@ -46,7 +47,7 @@ type CheckerController struct {
|
||||
meta *meta.Meta
|
||||
dist *meta.DistributionManager
|
||||
targetMgr *meta.TargetManager
|
||||
broker *meta.CoordinatorBroker
|
||||
broker meta.Broker
|
||||
nodeMgr *session.NodeManager
|
||||
balancer balance.Balance
|
||||
|
||||
@ -62,7 +63,9 @@ func NewCheckerController(
|
||||
targetMgr *meta.TargetManager,
|
||||
balancer balance.Balance,
|
||||
nodeMgr *session.NodeManager,
|
||||
scheduler task.Scheduler) *CheckerController {
|
||||
scheduler task.Scheduler,
|
||||
broker meta.Broker,
|
||||
) *CheckerController {
|
||||
|
||||
// CheckerController runs checkers with the order,
|
||||
// the former checker has higher priority
|
||||
@ -70,6 +73,7 @@ func NewCheckerController(
|
||||
Channel_Checker: NewChannelChecker(meta, dist, targetMgr, balancer),
|
||||
Segment_Checker: NewSegmentChecker(meta, dist, targetMgr, balancer, nodeMgr),
|
||||
Balance_Checker: NewBalanceChecker(meta, balancer, nodeMgr, scheduler),
|
||||
Index_Checker: NewIndexChecker(meta, dist, broker),
|
||||
}
|
||||
|
||||
id := 0
|
||||
@ -91,6 +95,7 @@ func NewCheckerController(
|
||||
targetMgr: targetMgr,
|
||||
scheduler: scheduler,
|
||||
checkers: checkers,
|
||||
broker: broker,
|
||||
}
|
||||
}
|
||||
|
||||
@ -108,6 +113,8 @@ func getCheckerInterval(checkerType string) time.Duration {
|
||||
return Params.QueryCoordCfg.ChannelCheckInterval.GetAsDuration(time.Millisecond)
|
||||
case Balance_Checker:
|
||||
return Params.QueryCoordCfg.BalanceCheckInterval.GetAsDuration(time.Millisecond)
|
||||
case Index_Checker:
|
||||
return Params.QueryCoordCfg.IndexCheckInterval.GetAsDuration(time.Millisecond)
|
||||
default:
|
||||
return Params.QueryCoordCfg.CheckInterval.GetAsDuration(time.Millisecond)
|
||||
}
|
||||
|
@ -79,7 +79,7 @@ func (suite *CheckerControllerSuite) SetupTest() {
|
||||
|
||||
suite.balancer = balance.NewMockBalancer(suite.T())
|
||||
suite.scheduler = task.NewMockScheduler(suite.T())
|
||||
suite.controller = NewCheckerController(suite.meta, suite.dist, suite.targetManager, suite.balancer, suite.nodeMgr, suite.scheduler)
|
||||
suite.controller = NewCheckerController(suite.meta, suite.dist, suite.targetManager, suite.balancer, suite.nodeMgr, suite.scheduler, suite.broker)
|
||||
}
|
||||
|
||||
func (suite *CheckerControllerSuite) TestBasic() {
|
||||
|
161
internal/querycoordv2/checkers/index_checker.go
Normal file
161
internal/querycoordv2/checkers/index_checker.go
Normal file
@ -0,0 +1,161 @@
|
||||
// Licensed to the LF AI & Data foundation under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package checkers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/params"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/task"
|
||||
"github.com/milvus-io/milvus/pkg/log"
|
||||
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
||||
"github.com/samber/lo"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
var _ Checker = (*IndexChecker)(nil)
|
||||
|
||||
// IndexChecker perform segment index check.
|
||||
type IndexChecker struct {
|
||||
baseChecker
|
||||
meta *meta.Meta
|
||||
dist *meta.DistributionManager
|
||||
broker meta.Broker
|
||||
}
|
||||
|
||||
func NewIndexChecker(
|
||||
meta *meta.Meta,
|
||||
dist *meta.DistributionManager,
|
||||
broker meta.Broker,
|
||||
) *IndexChecker {
|
||||
return &IndexChecker{
|
||||
meta: meta,
|
||||
dist: dist,
|
||||
broker: broker,
|
||||
}
|
||||
}
|
||||
|
||||
func (c *IndexChecker) Description() string {
|
||||
return "SegmentChecker checks index state change of segments and generates load index task"
|
||||
}
|
||||
|
||||
func (c *IndexChecker) Check(ctx context.Context) []task.Task {
|
||||
collectionIDs := c.meta.CollectionManager.GetAll()
|
||||
var tasks []task.Task
|
||||
|
||||
for _, collectionID := range collectionIDs {
|
||||
collection := c.meta.CollectionManager.GetCollection(collectionID)
|
||||
if collection == nil {
|
||||
log.Warn("collection released during check index", zap.Int64("collection", collectionID))
|
||||
continue
|
||||
}
|
||||
replicas := c.meta.ReplicaManager.GetByCollection(collectionID)
|
||||
for _, replica := range replicas {
|
||||
tasks = append(tasks, c.checkReplica(ctx, collection, replica)...)
|
||||
}
|
||||
}
|
||||
|
||||
return tasks
|
||||
}
|
||||
|
||||
func (c *IndexChecker) checkReplica(ctx context.Context, collection *meta.Collection, replica *meta.Replica) []task.Task {
|
||||
log := log.Ctx(ctx).With(
|
||||
zap.Int64("collectionID", collection.GetCollectionID()),
|
||||
)
|
||||
var tasks []task.Task
|
||||
|
||||
segments := c.getHistoricalSegmentsDist(replica)
|
||||
idSegments := make(map[int64]*meta.Segment)
|
||||
|
||||
targets := make(map[int64][]int64) // segmentID => FieldID
|
||||
for _, segment := range segments {
|
||||
missing := c.checkSegment(ctx, segment, collection)
|
||||
if len(missing) > 0 {
|
||||
targets[segment.GetID()] = missing
|
||||
idSegments[segment.GetID()] = segment
|
||||
}
|
||||
}
|
||||
|
||||
segmentsToUpdate := typeutil.NewSet[int64]()
|
||||
for segment, fields := range targets {
|
||||
missingFields := typeutil.NewSet(fields...)
|
||||
infos, err := c.broker.GetIndexInfo(ctx, collection.GetCollectionID(), segment)
|
||||
if err != nil {
|
||||
log.Warn("failed to get indexInfo for segment", zap.Int64("segmentID", segment), zap.Error(err))
|
||||
continue
|
||||
}
|
||||
for _, info := range infos {
|
||||
if missingFields.Contain(info.GetFieldID()) && info.GetEnableIndex() {
|
||||
segmentsToUpdate.Insert(segment)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
tasks = lo.FilterMap(segmentsToUpdate.Collect(), func(segmentID int64, _ int) (task.Task, bool) {
|
||||
return c.createSegmentUpdateTask(ctx, idSegments[segmentID], replica)
|
||||
})
|
||||
|
||||
return tasks
|
||||
}
|
||||
|
||||
func (c *IndexChecker) checkSegment(ctx context.Context, segment *meta.Segment, collection *meta.Collection) (fieldIDs []int64) {
|
||||
var result []int64
|
||||
for fieldID, indexID := range collection.GetFieldIndexID() {
|
||||
info, ok := segment.IndexInfo[fieldID]
|
||||
if !ok {
|
||||
result = append(result, fieldID)
|
||||
continue
|
||||
}
|
||||
if indexID != info.GetIndexID() || !info.GetEnableIndex() {
|
||||
result = append(result, fieldID)
|
||||
}
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
func (c *IndexChecker) getHistoricalSegmentsDist(replica *meta.Replica) []*meta.Segment {
|
||||
var ret []*meta.Segment
|
||||
for _, node := range replica.GetNodes() {
|
||||
ret = append(ret, c.dist.SegmentDistManager.GetByCollectionAndNode(replica.CollectionID, node)...)
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
func (c *IndexChecker) createSegmentUpdateTask(ctx context.Context, segment *meta.Segment, replica *meta.Replica) (task.Task, bool) {
|
||||
action := task.NewSegmentActionWithScope(segment.Node, task.ActionTypeUpdate, segment.GetInsertChannel(), segment.GetID(), querypb.DataScope_Historical)
|
||||
task, err := task.NewSegmentTask(
|
||||
ctx,
|
||||
params.Params.QueryCoordCfg.SegmentTaskTimeout.GetAsDuration(time.Millisecond),
|
||||
c.ID(),
|
||||
segment.GetCollectionID(),
|
||||
replica.GetID(),
|
||||
action,
|
||||
)
|
||||
if err != nil {
|
||||
log.Warn("create segment update task failed",
|
||||
zap.Int64("collection", segment.GetCollectionID()),
|
||||
zap.String("channel", segment.GetInsertChannel()),
|
||||
zap.Int64("node", segment.Node),
|
||||
zap.Error(err),
|
||||
)
|
||||
return nil, false
|
||||
}
|
||||
return task, true
|
||||
}
|
190
internal/querycoordv2/checkers/index_checker_test.go
Normal file
190
internal/querycoordv2/checkers/index_checker_test.go
Normal file
@ -0,0 +1,190 @@
|
||||
// Licensed to the LF AI & Data foundation under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package checkers
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
"github.com/cockroachdb/errors"
|
||||
"github.com/milvus-io/milvus/internal/kv"
|
||||
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
|
||||
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/params"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/session"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/task"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
|
||||
"github.com/milvus-io/milvus/pkg/util/etcd"
|
||||
"github.com/stretchr/testify/mock"
|
||||
"github.com/stretchr/testify/suite"
|
||||
)
|
||||
|
||||
type IndexCheckerSuite struct {
|
||||
suite.Suite
|
||||
kv kv.MetaKv
|
||||
checker *IndexChecker
|
||||
meta *meta.Meta
|
||||
broker *meta.MockBroker
|
||||
nodeMgr *session.NodeManager
|
||||
}
|
||||
|
||||
func (suite *IndexCheckerSuite) SetupSuite() {
|
||||
params.Params.Init()
|
||||
}
|
||||
|
||||
func (suite *IndexCheckerSuite) SetupTest() {
|
||||
var err error
|
||||
config := params.GenerateEtcdConfig()
|
||||
cli, err := etcd.GetEtcdClient(
|
||||
config.UseEmbedEtcd.GetAsBool(),
|
||||
config.EtcdUseSSL.GetAsBool(),
|
||||
config.Endpoints.GetAsStrings(),
|
||||
config.EtcdTLSCert.GetValue(),
|
||||
config.EtcdTLSKey.GetValue(),
|
||||
config.EtcdTLSCACert.GetValue(),
|
||||
config.EtcdTLSMinVersion.GetValue())
|
||||
suite.Require().NoError(err)
|
||||
suite.kv = etcdkv.NewEtcdKV(cli, config.MetaRootPath.GetValue())
|
||||
|
||||
// meta
|
||||
store := meta.NewMetaStore(suite.kv)
|
||||
idAllocator := params.RandomIncrementIDAllocator()
|
||||
suite.nodeMgr = session.NewNodeManager()
|
||||
suite.meta = meta.NewMeta(idAllocator, store, suite.nodeMgr)
|
||||
distManager := meta.NewDistributionManager()
|
||||
suite.broker = meta.NewMockBroker(suite.T())
|
||||
|
||||
suite.checker = NewIndexChecker(suite.meta, distManager, suite.broker)
|
||||
}
|
||||
|
||||
func (suite *IndexCheckerSuite) TearDownTest() {
|
||||
suite.kv.Close()
|
||||
}
|
||||
|
||||
func (suite *IndexCheckerSuite) TestLoadIndex() {
|
||||
checker := suite.checker
|
||||
|
||||
// meta
|
||||
coll := utils.CreateTestCollection(1, 1)
|
||||
coll.FieldIndexID = map[int64]int64{101: 1000}
|
||||
checker.meta.CollectionManager.PutCollection(coll)
|
||||
checker.meta.ReplicaManager.Put(utils.CreateTestReplica(200, 1, []int64{1, 2}))
|
||||
suite.nodeMgr.Add(session.NewNodeInfo(1, "localhost"))
|
||||
suite.nodeMgr.Add(session.NewNodeInfo(2, "localhost"))
|
||||
checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 1)
|
||||
checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 2)
|
||||
|
||||
// dist
|
||||
checker.dist.SegmentDistManager.Update(1, utils.CreateTestSegment(1, 1, 2, 1, 1, "test-insert-channel"))
|
||||
|
||||
// broker
|
||||
suite.broker.EXPECT().GetIndexInfo(mock.Anything, int64(1), int64(2)).
|
||||
Return([]*querypb.FieldIndexInfo{
|
||||
{
|
||||
FieldID: 101,
|
||||
IndexID: 1000,
|
||||
EnableIndex: true,
|
||||
},
|
||||
}, nil)
|
||||
|
||||
tasks := checker.Check(context.Background())
|
||||
suite.Require().Len(tasks, 1)
|
||||
|
||||
t := tasks[0]
|
||||
suite.Require().Len(t.Actions(), 1)
|
||||
|
||||
action, ok := t.Actions()[0].(*task.SegmentAction)
|
||||
suite.Require().True(ok)
|
||||
suite.EqualValues(200, t.ReplicaID())
|
||||
suite.Equal(task.ActionTypeUpdate, action.Type())
|
||||
suite.EqualValues(2, action.SegmentID())
|
||||
}
|
||||
|
||||
func (suite *IndexCheckerSuite) TestIndexInfoNotMatch() {
|
||||
checker := suite.checker
|
||||
|
||||
// meta
|
||||
coll := utils.CreateTestCollection(1, 1)
|
||||
coll.FieldIndexID = map[int64]int64{101: 1000}
|
||||
checker.meta.CollectionManager.PutCollection(coll)
|
||||
checker.meta.ReplicaManager.Put(utils.CreateTestReplica(200, 1, []int64{1, 2}))
|
||||
suite.nodeMgr.Add(session.NewNodeInfo(1, "localhost"))
|
||||
suite.nodeMgr.Add(session.NewNodeInfo(2, "localhost"))
|
||||
checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 1)
|
||||
checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 2)
|
||||
|
||||
// dist
|
||||
checker.dist.SegmentDistManager.Update(1, utils.CreateTestSegment(1, 1, 2, 1, 1, "test-insert-channel"))
|
||||
checker.dist.SegmentDistManager.Update(1, utils.CreateTestSegment(1, 1, 3, 1, 1, "test-insert-channel"))
|
||||
|
||||
// broker
|
||||
suite.broker.EXPECT().GetIndexInfo(mock.Anything, int64(1), mock.AnythingOfType("int64")).Call.
|
||||
Return(func(ctx context.Context, collectionID, segmentID int64) []*querypb.FieldIndexInfo {
|
||||
if segmentID == 2 {
|
||||
return []*querypb.FieldIndexInfo{
|
||||
{
|
||||
FieldID: 101,
|
||||
IndexID: 1000,
|
||||
EnableIndex: false,
|
||||
},
|
||||
}
|
||||
}
|
||||
if segmentID == 3 {
|
||||
return []*querypb.FieldIndexInfo{
|
||||
{
|
||||
FieldID: 101,
|
||||
IndexID: 1002,
|
||||
EnableIndex: false,
|
||||
},
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}, nil)
|
||||
|
||||
tasks := checker.Check(context.Background())
|
||||
suite.Require().Len(tasks, 0)
|
||||
}
|
||||
|
||||
func (suite *IndexCheckerSuite) TestGetIndexInfoFailed() {
|
||||
checker := suite.checker
|
||||
|
||||
// meta
|
||||
coll := utils.CreateTestCollection(1, 1)
|
||||
coll.FieldIndexID = map[int64]int64{101: 1000}
|
||||
checker.meta.CollectionManager.PutCollection(coll)
|
||||
checker.meta.ReplicaManager.Put(utils.CreateTestReplica(200, 1, []int64{1, 2}))
|
||||
suite.nodeMgr.Add(session.NewNodeInfo(1, "localhost"))
|
||||
suite.nodeMgr.Add(session.NewNodeInfo(2, "localhost"))
|
||||
checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 1)
|
||||
checker.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 2)
|
||||
|
||||
// dist
|
||||
checker.dist.SegmentDistManager.Update(1, utils.CreateTestSegment(1, 1, 2, 1, 1, "test-insert-channel"))
|
||||
checker.dist.SegmentDistManager.Update(1, utils.CreateTestSegment(1, 1, 3, 1, 1, "test-insert-channel"))
|
||||
|
||||
// broker
|
||||
suite.broker.EXPECT().GetIndexInfo(mock.Anything, int64(1), mock.AnythingOfType("int64")).
|
||||
Return(nil, errors.New("mocked error"))
|
||||
|
||||
tasks := checker.Check(context.Background())
|
||||
suite.Require().Len(tasks, 0)
|
||||
}
|
||||
|
||||
func TestIndexChecker(t *testing.T) {
|
||||
suite.Run(t, new(IndexCheckerSuite))
|
||||
}
|
@ -280,6 +280,7 @@ func (s *Server) initQueryCoord() error {
|
||||
s.balancer,
|
||||
s.nodeMgr,
|
||||
s.taskScheduler,
|
||||
s.broker,
|
||||
)
|
||||
|
||||
// Init observers
|
||||
|
@ -504,6 +504,7 @@ func (suite *ServerSuite) hackServer() {
|
||||
suite.server.balancer,
|
||||
suite.server.nodeMgr,
|
||||
suite.server.taskScheduler,
|
||||
suite.server.broker,
|
||||
)
|
||||
suite.server.targetObserver = observers.NewTargetObserver(
|
||||
suite.server.meta,
|
||||
|
@ -31,6 +31,7 @@ type ActionType = int32
|
||||
const (
|
||||
ActionTypeGrow ActionType = iota + 1
|
||||
ActionTypeReduce
|
||||
ActionTypeUpdate
|
||||
)
|
||||
|
||||
type Action interface {
|
||||
@ -71,7 +72,7 @@ type SegmentAction struct {
|
||||
segmentID UniqueID
|
||||
scope querypb.DataScope
|
||||
|
||||
isReleaseCommitted atomic.Bool
|
||||
rpcReturned atomic.Bool
|
||||
}
|
||||
|
||||
func NewSegmentAction(nodeID UniqueID, typ ActionType, shard string, segmentID UniqueID) *SegmentAction {
|
||||
@ -81,10 +82,10 @@ func NewSegmentAction(nodeID UniqueID, typ ActionType, shard string, segmentID U
|
||||
func NewSegmentActionWithScope(nodeID UniqueID, typ ActionType, shard string, segmentID UniqueID, scope querypb.DataScope) *SegmentAction {
|
||||
base := NewBaseAction(nodeID, typ, shard)
|
||||
return &SegmentAction{
|
||||
BaseAction: base,
|
||||
segmentID: segmentID,
|
||||
scope: scope,
|
||||
isReleaseCommitted: *atomic.NewBool(false),
|
||||
BaseAction: base,
|
||||
segmentID: segmentID,
|
||||
scope: scope,
|
||||
rpcReturned: *atomic.NewBool(false),
|
||||
}
|
||||
}
|
||||
|
||||
@ -119,7 +120,9 @@ func (action *SegmentAction) IsFinished(distMgr *meta.DistributionManager) bool
|
||||
if !funcutil.SliceContain(segments, action.SegmentID()) {
|
||||
return true
|
||||
}
|
||||
return action.isReleaseCommitted.Load()
|
||||
return action.rpcReturned.Load()
|
||||
} else if action.Type() == ActionTypeUpdate {
|
||||
return action.rpcReturned.Load()
|
||||
}
|
||||
|
||||
return true
|
||||
|
@ -213,7 +213,7 @@ func (ex *Executor) removeTask(task Task, step int) {
|
||||
|
||||
func (ex *Executor) executeSegmentAction(task *SegmentTask, step int) {
|
||||
switch task.Actions()[step].Type() {
|
||||
case ActionTypeGrow:
|
||||
case ActionTypeGrow, ActionTypeUpdate:
|
||||
ex.loadSegment(task, step)
|
||||
|
||||
case ActionTypeReduce:
|
||||
@ -225,6 +225,7 @@ func (ex *Executor) executeSegmentAction(task *SegmentTask, step int) {
|
||||
// not really executes the request
|
||||
func (ex *Executor) loadSegment(task *SegmentTask, step int) error {
|
||||
action := task.Actions()[step].(*SegmentAction)
|
||||
defer action.rpcReturned.Store(true)
|
||||
log := log.With(
|
||||
zap.Int64("taskID", task.ID()),
|
||||
zap.Int64("collectionID", task.CollectionID()),
|
||||
@ -276,7 +277,7 @@ func (ex *Executor) loadSegment(task *SegmentTask, step int) error {
|
||||
switch GetTaskType(task) {
|
||||
case TaskTypeGrow:
|
||||
readableVersion = ex.targetMgr.GetCollectionTargetVersion(task.CollectionID(), meta.NextTarget)
|
||||
case TaskTypeMove:
|
||||
case TaskTypeMove, TaskTypeUpdate:
|
||||
readableVersion = ex.targetMgr.GetCollectionTargetVersion(task.CollectionID(), meta.CurrentTarget)
|
||||
}
|
||||
loadInfo := utils.PackSegmentLoadInfo(resp, indexes, readableVersion)
|
||||
@ -309,7 +310,7 @@ func (ex *Executor) releaseSegment(task *SegmentTask, step int) {
|
||||
defer ex.removeTask(task, step)
|
||||
startTs := time.Now()
|
||||
action := task.Actions()[step].(*SegmentAction)
|
||||
defer action.isReleaseCommitted.Store(true)
|
||||
defer action.rpcReturned.Store(true)
|
||||
|
||||
log := log.With(
|
||||
zap.Int64("taskID", task.ID()),
|
||||
|
@ -44,6 +44,7 @@ const (
|
||||
TaskTypeGrow Type = iota + 1
|
||||
TaskTypeReduce
|
||||
TaskTypeMove
|
||||
TaskTypeUpdate
|
||||
)
|
||||
|
||||
type Type = int32
|
||||
@ -549,7 +550,7 @@ func (scheduler *taskScheduler) isRelated(task Task, node int64) bool {
|
||||
if task, ok := task.(*SegmentTask); ok {
|
||||
taskType := GetTaskType(task)
|
||||
var segment *datapb.SegmentInfo
|
||||
if taskType == TaskTypeMove {
|
||||
if taskType == TaskTypeMove || taskType == TaskTypeUpdate {
|
||||
segment = scheduler.targetMgr.GetHistoricalSegment(task.CollectionID(), task.SegmentID(), meta.CurrentTarget)
|
||||
} else {
|
||||
segment = scheduler.targetMgr.GetHistoricalSegment(task.CollectionID(), task.SegmentID(), meta.NextTarget)
|
||||
@ -748,7 +749,7 @@ func (scheduler *taskScheduler) checkSegmentTaskStale(task *SegmentTask) error {
|
||||
case ActionTypeGrow:
|
||||
taskType := GetTaskType(task)
|
||||
var segment *datapb.SegmentInfo
|
||||
if taskType == TaskTypeMove {
|
||||
if taskType == TaskTypeMove || taskType == TaskTypeUpdate {
|
||||
segment = scheduler.targetMgr.GetHistoricalSegment(task.CollectionID(), task.SegmentID(), meta.CurrentTarget)
|
||||
} else {
|
||||
segment = scheduler.targetMgr.GetHistoricalSegment(task.CollectionID(), task.SegmentID(), meta.NextTarget)
|
||||
|
@ -74,13 +74,17 @@ func SetReason(reason string, tasks ...Task) {
|
||||
// - only 1 reduce action -> Reduce
|
||||
// - 1 grow action, and ends with 1 reduce action -> Move
|
||||
func GetTaskType(task Task) Type {
|
||||
if len(task.Actions()) > 1 {
|
||||
switch {
|
||||
case len(task.Actions()) > 1:
|
||||
return TaskTypeMove
|
||||
} else if task.Actions()[0].Type() == ActionTypeGrow {
|
||||
case task.Actions()[0].Type() == ActionTypeGrow:
|
||||
return TaskTypeGrow
|
||||
} else {
|
||||
case task.Actions()[0].Type() == ActionTypeReduce:
|
||||
return TaskTypeReduce
|
||||
case task.Actions()[0].Type() == ActionTypeUpdate:
|
||||
return TaskTypeUpdate
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func packLoadSegmentRequest(
|
||||
@ -91,6 +95,10 @@ func packLoadSegmentRequest(
|
||||
loadInfo *querypb.SegmentLoadInfo,
|
||||
indexInfo []*indexpb.IndexInfo,
|
||||
) *querypb.LoadSegmentsRequest {
|
||||
loadScope := querypb.LoadScope_Full
|
||||
if action.Type() == ActionTypeUpdate {
|
||||
loadScope = querypb.LoadScope_Index
|
||||
}
|
||||
return &querypb.LoadSegmentsRequest{
|
||||
Base: commonpbutil.NewMsgBase(
|
||||
commonpbutil.WithMsgType(commonpb.MsgType_LoadSegments),
|
||||
@ -106,6 +114,7 @@ func packLoadSegmentRequest(
|
||||
Version: time.Now().UnixNano(),
|
||||
NeedTransfer: true,
|
||||
IndexInfoList: indexInfo,
|
||||
LoadScope: loadScope,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1156,6 +1156,7 @@ type queryCoordConfig struct {
|
||||
SegmentCheckInterval ParamItem `refreshable:"true"`
|
||||
ChannelCheckInterval ParamItem `refreshable:"true"`
|
||||
BalanceCheckInterval ParamItem `refreshable:"true"`
|
||||
IndexCheckInterval ParamItem `refreshable:"true"`
|
||||
ChannelTaskTimeout ParamItem `refreshable:"true"`
|
||||
SegmentTaskTimeout ParamItem `refreshable:"true"`
|
||||
DistPullInterval ParamItem `refreshable:"false"`
|
||||
@ -1334,6 +1335,15 @@ func (p *queryCoordConfig) init(base *BaseTable) {
|
||||
}
|
||||
p.BalanceCheckInterval.Init(base.mgr)
|
||||
|
||||
p.IndexCheckInterval = ParamItem{
|
||||
Key: "queryCoord.checkIndexInterval",
|
||||
Version: "2.3.0",
|
||||
DefaultValue: "10000",
|
||||
PanicIfEmpty: true,
|
||||
Export: true,
|
||||
}
|
||||
p.IndexCheckInterval.Init(base.mgr)
|
||||
|
||||
p.ChannelTaskTimeout = ParamItem{
|
||||
Key: "queryCoord.channelTaskTimeout",
|
||||
Version: "2.0.0",
|
||||
|
@ -296,6 +296,7 @@ func TestComponentParam(t *testing.T) {
|
||||
assert.Equal(t, 1000, Params.SegmentCheckInterval.GetAsInt())
|
||||
assert.Equal(t, 1000, Params.ChannelCheckInterval.GetAsInt())
|
||||
assert.Equal(t, 10000, Params.BalanceCheckInterval.GetAsInt())
|
||||
assert.Equal(t, 10000, Params.IndexCheckInterval.GetAsInt())
|
||||
})
|
||||
|
||||
t.Run("test queryNodeConfig", func(t *testing.T) {
|
||||
|
Loading…
Reference in New Issue
Block a user