From 6534396b3dea8d5d14abcffb1670166af3b1cc59 Mon Sep 17 00:00:00 2001 From: wei liu Date: Wed, 19 Jul 2023 16:50:57 +0800 Subject: [PATCH] enable config different interval for different checker (#25514) Signed-off-by: Wei Liu --- internal/querycoordv2/checkers/controller.go | 135 +++++++++++------- .../querycoordv2/checkers/controller_test.go | 132 +++++++++++++++++ pkg/util/paramtable/component_param.go | 46 +++++- pkg/util/paramtable/component_param_test.go | 4 + 4 files changed, 262 insertions(+), 55 deletions(-) create mode 100644 internal/querycoordv2/checkers/controller_test.go diff --git a/internal/querycoordv2/checkers/controller.go b/internal/querycoordv2/checkers/controller.go index 865c3fd1f6..3956b7e966 100644 --- a/internal/querycoordv2/checkers/controller.go +++ b/internal/querycoordv2/checkers/controller.go @@ -27,24 +27,31 @@ import ( "github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/internal/querycoordv2/task" "github.com/milvus-io/milvus/pkg/log" + "go.uber.org/zap" ) var ( checkRoundTaskNumLimit = 256 ) +var ( + Segment_Checker = "segment_checker" + Channel_Checker = "channel_checker" + Balance_Checker = "balance_checker" +) + type CheckerController struct { - stopCh chan struct{} - checkCh chan struct{} - meta *meta.Meta - dist *meta.DistributionManager - targetMgr *meta.TargetManager - broker *meta.CoordinatorBroker - nodeMgr *session.NodeManager - balancer balance.Balance + stopCh chan struct{} + manualCheckChs map[string]chan struct{} + meta *meta.Meta + dist *meta.DistributionManager + targetMgr *meta.TargetManager + broker *meta.CoordinatorBroker + nodeMgr *session.NodeManager + balancer balance.Balance scheduler task.Scheduler - checkers []Checker + checkers map[string]Checker stopOnce sync.Once } @@ -59,50 +66,80 @@ func NewCheckerController( // CheckerController runs checkers with the order, // the former checker has higher priority - checkers := []Checker{ - NewChannelChecker(meta, dist, targetMgr, balancer), - NewSegmentChecker(meta, dist, targetMgr, balancer, nodeMgr), - NewBalanceChecker(meta, balancer, nodeMgr, scheduler), + checkers := map[string]Checker{ + Channel_Checker: NewChannelChecker(meta, dist, targetMgr, balancer), + Segment_Checker: NewSegmentChecker(meta, dist, targetMgr, balancer, nodeMgr), + Balance_Checker: NewBalanceChecker(meta, balancer, nodeMgr, scheduler), } - for i, checker := range checkers { - checker.SetID(int64(i + 1)) + + id := 0 + for _, checker := range checkers { + checker.SetID(int64(id + 1)) + } + + manualCheckChs := map[string]chan struct{}{ + Channel_Checker: make(chan struct{}, 1), + Segment_Checker: make(chan struct{}, 1), + Balance_Checker: make(chan struct{}, 1), } return &CheckerController{ - stopCh: make(chan struct{}), - checkCh: make(chan struct{}, 1), - meta: meta, - dist: dist, - targetMgr: targetMgr, - scheduler: scheduler, - checkers: checkers, + stopCh: make(chan struct{}), + manualCheckChs: manualCheckChs, + meta: meta, + dist: dist, + targetMgr: targetMgr, + scheduler: scheduler, + checkers: checkers, } } func (controller *CheckerController) Start(ctx context.Context) { - go func() { - ticker := time.NewTicker(Params.QueryCoordCfg.CheckInterval.GetAsDuration(time.Millisecond)) - defer ticker.Stop() - for { - select { - case <-ctx.Done(): - log.Info("CheckerController stopped due to context canceled") - return + for checkerType := range controller.checkers { + go controller.StartChecker(ctx, checkerType) + } +} - case <-controller.stopCh: - log.Info("CheckerController stopped") - return +func getCheckerInterval(checkerType string) time.Duration { + switch checkerType { + case Segment_Checker: + return Params.QueryCoordCfg.SegmentCheckInterval.GetAsDuration(time.Millisecond) + case Channel_Checker: + return Params.QueryCoordCfg.ChannelCheckInterval.GetAsDuration(time.Millisecond) + case Balance_Checker: + return Params.QueryCoordCfg.BalanceCheckInterval.GetAsDuration(time.Millisecond) + default: + return Params.QueryCoordCfg.CheckInterval.GetAsDuration(time.Millisecond) + } - case <-ticker.C: - controller.check(ctx) +} - case <-controller.checkCh: - ticker.Stop() - controller.check(ctx) - ticker.Reset(Params.QueryCoordCfg.CheckInterval.GetAsDuration(time.Millisecond)) - } +func (controller *CheckerController) StartChecker(ctx context.Context, checkerType string) { + interval := getCheckerInterval(checkerType) + ticker := time.NewTicker(interval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + log.Info("Checker stopped due to context canceled", + zap.String("type", checkerType)) + return + + case <-controller.stopCh: + log.Info("Checker stopped", + zap.String("type", checkerType)) + return + + case <-ticker.C: + controller.check(ctx, checkerType) + + case <-controller.manualCheckChs[checkerType]: + ticker.Stop() + controller.check(ctx, checkerType) + ticker.Reset(Params.QueryCoordCfg.CheckInterval.GetAsDuration(time.Millisecond)) } - }() + } } func (controller *CheckerController) Stop() { @@ -112,18 +149,18 @@ func (controller *CheckerController) Stop() { } func (controller *CheckerController) Check() { - select { - case controller.checkCh <- struct{}{}: - default: + for _, checkCh := range controller.manualCheckChs { + select { + case checkCh <- struct{}{}: + default: + } } } // check is the real implementation of Check -func (controller *CheckerController) check(ctx context.Context) { - tasks := make([]task.Task, 0) - for _, checker := range controller.checkers { - tasks = append(tasks, checker.Check(ctx)...) - } +func (controller *CheckerController) check(ctx context.Context, checkerType string) { + checker := controller.checkers[checkerType] + tasks := checker.Check(ctx) for _, task := range tasks { err := controller.scheduler.Add(task) diff --git a/internal/querycoordv2/checkers/controller_test.go b/internal/querycoordv2/checkers/controller_test.go new file mode 100644 index 0000000000..87c4db4af6 --- /dev/null +++ b/internal/querycoordv2/checkers/controller_test.go @@ -0,0 +1,132 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package checkers + +import ( + "context" + "testing" + "time" + + "github.com/milvus-io/milvus/internal/kv" + etcdkv "github.com/milvus-io/milvus/internal/kv/etcd" + "github.com/milvus-io/milvus/internal/proto/datapb" + "github.com/milvus-io/milvus/internal/querycoordv2/balance" + "github.com/milvus-io/milvus/internal/querycoordv2/meta" + . "github.com/milvus-io/milvus/internal/querycoordv2/params" + "github.com/milvus-io/milvus/internal/querycoordv2/session" + "github.com/milvus-io/milvus/internal/querycoordv2/task" + "github.com/milvus-io/milvus/internal/querycoordv2/utils" + "github.com/milvus-io/milvus/pkg/util/etcd" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/suite" + "go.uber.org/atomic" +) + +type CheckerControllerSuite struct { + suite.Suite + kv kv.MetaKv + meta *meta.Meta + broker *meta.MockBroker + nodeMgr *session.NodeManager + dist *meta.DistributionManager + targetManager *meta.TargetManager + scheduler *task.MockScheduler + balancer *balance.MockBalancer + + controller *CheckerController +} + +func (suite *CheckerControllerSuite) SetupSuite() { + Params.Init() +} + +func (suite *CheckerControllerSuite) SetupTest() { + var err error + config := GenerateEtcdConfig() + cli, err := etcd.GetEtcdClient( + config.UseEmbedEtcd.GetAsBool(), + config.EtcdUseSSL.GetAsBool(), + config.Endpoints.GetAsStrings(), + config.EtcdTLSCert.GetValue(), + config.EtcdTLSKey.GetValue(), + config.EtcdTLSCACert.GetValue(), + config.EtcdTLSMinVersion.GetValue()) + suite.Require().NoError(err) + suite.kv = etcdkv.NewEtcdKV(cli, config.MetaRootPath.GetValue()) + + // meta + store := meta.NewMetaStore(suite.kv) + idAllocator := RandomIncrementIDAllocator() + suite.nodeMgr = session.NewNodeManager() + suite.meta = meta.NewMeta(idAllocator, store, suite.nodeMgr) + suite.dist = meta.NewDistributionManager() + suite.broker = meta.NewMockBroker(suite.T()) + suite.targetManager = meta.NewTargetManager(suite.broker, suite.meta) + + suite.balancer = balance.NewMockBalancer(suite.T()) + suite.scheduler = task.NewMockScheduler(suite.T()) + suite.controller = NewCheckerController(suite.meta, suite.dist, suite.targetManager, suite.balancer, suite.nodeMgr, suite.scheduler) +} + +func (suite *CheckerControllerSuite) TestBasic() { + + // set meta + suite.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 1)) + suite.meta.ReplicaManager.Put(utils.CreateTestReplica(1, 1, []int64{1, 2})) + suite.nodeMgr.Add(session.NewNodeInfo(1, "localhost")) + suite.nodeMgr.Add(session.NewNodeInfo(2, "localhost")) + suite.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 1) + suite.meta.ResourceManager.AssignNode(meta.DefaultResourceGroupName, 2) + + // set target + segments := []*datapb.SegmentInfo{ + { + ID: 1, + PartitionID: 1, + InsertChannel: "test-insert-channel", + }, + } + suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return( + nil, segments, nil) + suite.targetManager.UpdateCollectionNextTargetWithPartitions(int64(1), int64(1)) + + // set dist + suite.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 1, "test-insert-channel")) + suite.dist.LeaderViewManager.Update(2, utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{}, map[int64]*meta.Segment{})) + + counter := atomic.NewInt64(0) + suite.scheduler.EXPECT().Add(mock.Anything).Run(func(task task.Task) { + counter.Inc() + }).Return(nil) + suite.scheduler.EXPECT().GetSegmentTaskNum().Return(0).Maybe() + suite.scheduler.EXPECT().GetChannelTaskNum().Return(0).Maybe() + + suite.balancer.EXPECT().AssignSegment(mock.Anything, mock.Anything, mock.Anything).Return(nil) + suite.balancer.EXPECT().AssignChannel(mock.Anything, mock.Anything).Return(nil) + ctx := context.Background() + suite.controller.Start(ctx) + defer suite.controller.Stop() + + suite.Eventually(func() bool { + suite.controller.Check() + return counter.Load() > 0 + }, 5*time.Second, 1*time.Second) +} + +func TestCheckControllerSuite(t *testing.T) { + suite.Run(t, new(CheckerControllerSuite)) +} diff --git a/pkg/util/paramtable/component_param.go b/pkg/util/paramtable/component_param.go index 8d6dde5c3f..c6a432058b 100644 --- a/pkg/util/paramtable/component_param.go +++ b/pkg/util/paramtable/component_param.go @@ -1152,16 +1152,23 @@ type queryCoordConfig struct { OverloadedMemoryThresholdPercentage ParamItem `refreshable:"true"` BalanceIntervalSeconds ParamItem `refreshable:"true"` MemoryUsageMaxDifferencePercentage ParamItem `refreshable:"true"` - CheckInterval ParamItem `refreshable:"true"` - ChannelTaskTimeout ParamItem `refreshable:"true"` - SegmentTaskTimeout ParamItem `refreshable:"true"` - DistPullInterval ParamItem `refreshable:"false"` - HeartbeatAvailableInterval ParamItem `refreshable:"true"` - LoadTimeoutSeconds ParamItem `refreshable:"true"` + + SegmentCheckInterval ParamItem `refreshable:"true"` + ChannelCheckInterval ParamItem `refreshable:"true"` + BalanceCheckInterval ParamItem `refreshable:"true"` + ChannelTaskTimeout ParamItem `refreshable:"true"` + SegmentTaskTimeout ParamItem `refreshable:"true"` + DistPullInterval ParamItem `refreshable:"false"` + HeartbeatAvailableInterval ParamItem `refreshable:"true"` + LoadTimeoutSeconds ParamItem `refreshable:"true"` + // Deprecated: Since 2.2.2, QueryCoord do not use HandOff logic anymore CheckHandoffInterval ParamItem `refreshable:"true"` EnableActiveStandby ParamItem `refreshable:"false"` + // Deprecated: Since 2.2.2, use different interval for different checker + CheckInterval ParamItem `refreshable:"true"` + NextTargetSurviveTime ParamItem `refreshable:"true"` UpdateNextTargetInterval ParamItem `refreshable:"false"` CheckNodeInReplicaInterval ParamItem `refreshable:"false"` @@ -1300,6 +1307,33 @@ func (p *queryCoordConfig) init(base *BaseTable) { } p.CheckInterval.Init(base.mgr) + p.SegmentCheckInterval = ParamItem{ + Key: "queryCoord.checkSegmentInterval", + Version: "2.3.0", + DefaultValue: "1000", + PanicIfEmpty: true, + Export: true, + } + p.SegmentCheckInterval.Init(base.mgr) + + p.ChannelCheckInterval = ParamItem{ + Key: "queryCoord.checkChannelInterval", + Version: "2.3.0", + DefaultValue: "1000", + PanicIfEmpty: true, + Export: true, + } + p.ChannelCheckInterval.Init(base.mgr) + + p.BalanceCheckInterval = ParamItem{ + Key: "queryCoord.checkChannelInterval", + Version: "2.3.0", + DefaultValue: "10000", + PanicIfEmpty: true, + Export: true, + } + p.BalanceCheckInterval.Init(base.mgr) + p.ChannelTaskTimeout = ParamItem{ Key: "queryCoord.channelTaskTimeout", Version: "2.0.0", diff --git a/pkg/util/paramtable/component_param_test.go b/pkg/util/paramtable/component_param_test.go index 23111655ab..8a2cf3a99a 100644 --- a/pkg/util/paramtable/component_param_test.go +++ b/pkg/util/paramtable/component_param_test.go @@ -292,6 +292,10 @@ func TestComponentParam(t *testing.T) { assert.Equal(t, 1.3, Params.ReverseUnbalanceTolerationFactor.GetAsFloat()) params.Save("queryCoord.reverseUnBalanceTolerationFactor", "1.5") assert.Equal(t, 1.5, Params.ReverseUnbalanceTolerationFactor.GetAsFloat()) + + assert.Equal(t, 1000, Params.SegmentCheckInterval.GetAsInt()) + assert.Equal(t, 1000, Params.ChannelCheckInterval.GetAsInt()) + assert.Equal(t, 10000, Params.BalanceCheckInterval.GetAsInt()) }) t.Run("test queryNodeConfig", func(t *testing.T) {