enhance: refactor leader_observer to leader_checker (#29454)

issue: #29453 

sync distribution by rpc will also call loadSegment/releaseSegment,
which may cause all kinds of concurrent case on same segment, such as
concurrent load and release on one segment.
This PR add leader_checker which generate load/release task to correct
the leader view, instead of calling sync distribution by rpc

---------

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
This commit is contained in:
wei liu 2024-01-05 15:54:55 +08:00 committed by GitHub
parent 9e2e7157e9
commit e98c62abbb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 698 additions and 942 deletions

View File

@ -30,6 +30,7 @@ import (
. "github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/internal/querycoordv2/task"
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -55,8 +56,8 @@ func NewBalanceChecker(meta *meta.Meta, balancer balance.Balance, nodeMgr *sessi
}
}
func (b *BalanceChecker) ID() CheckerType {
return balanceChecker
func (b *BalanceChecker) ID() utils.CheckerType {
return utils.BalanceChecker
}
func (b *BalanceChecker) Description() string {

View File

@ -56,8 +56,8 @@ func NewChannelChecker(
}
}
func (c *ChannelChecker) ID() CheckerType {
return channelChecker
func (c *ChannelChecker) ID() utils.CheckerType {
return utils.ChannelChecker
}
func (c *ChannelChecker) Description() string {

View File

@ -21,10 +21,11 @@ import (
"sync/atomic"
"github.com/milvus-io/milvus/internal/querycoordv2/task"
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
)
type Checker interface {
ID() CheckerType
ID() utils.CheckerType
Description() string
Check(ctx context.Context) []task.Task
IsActive() bool

View File

@ -29,44 +29,15 @@ import (
. "github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/internal/querycoordv2/task"
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
"github.com/milvus-io/milvus/pkg/log"
)
const (
segmentCheckerName = "segment_checker"
channelCheckerName = "channel_checker"
balanceCheckerName = "balance_checker"
indexCheckerName = "index_checker"
)
type CheckerType int32
const (
channelChecker CheckerType = iota + 1
segmentChecker
balanceChecker
indexChecker
)
var (
checkRoundTaskNumLimit = 256
checkerOrder = []string{channelCheckerName, segmentCheckerName, balanceCheckerName, indexCheckerName}
checkerNames = map[CheckerType]string{
segmentChecker: segmentCheckerName,
channelChecker: channelCheckerName,
balanceChecker: balanceCheckerName,
indexChecker: indexCheckerName,
}
errTypeNotFound = errors.New("checker type not found")
)
func (s CheckerType) String() string {
return checkerNames[s]
}
var errTypeNotFound = errors.New("checker type not found")
type CheckerController struct {
cancel context.CancelFunc
manualCheckChs map[CheckerType]chan struct{}
manualCheckChs map[utils.CheckerType]chan struct{}
meta *meta.Meta
dist *meta.DistributionManager
targetMgr *meta.TargetManager
@ -75,7 +46,7 @@ type CheckerController struct {
balancer balance.Balance
scheduler task.Scheduler
checkers map[CheckerType]Checker
checkers map[utils.CheckerType]Checker
stopOnce sync.Once
}
@ -91,17 +62,18 @@ func NewCheckerController(
) *CheckerController {
// CheckerController runs checkers with the order,
// the former checker has higher priority
checkers := map[CheckerType]Checker{
channelChecker: NewChannelChecker(meta, dist, targetMgr, balancer),
segmentChecker: NewSegmentChecker(meta, dist, targetMgr, balancer, nodeMgr),
balanceChecker: NewBalanceChecker(meta, balancer, nodeMgr, scheduler),
indexChecker: NewIndexChecker(meta, dist, broker, nodeMgr),
checkers := map[utils.CheckerType]Checker{
utils.ChannelChecker: NewChannelChecker(meta, dist, targetMgr, balancer),
utils.SegmentChecker: NewSegmentChecker(meta, dist, targetMgr, balancer, nodeMgr),
utils.BalanceChecker: NewBalanceChecker(meta, balancer, nodeMgr, scheduler),
utils.IndexChecker: NewIndexChecker(meta, dist, broker, nodeMgr),
utils.LeaderChecker: NewLeaderChecker(meta, dist, targetMgr, nodeMgr),
}
manualCheckChs := map[CheckerType]chan struct{}{
channelChecker: make(chan struct{}, 1),
segmentChecker: make(chan struct{}, 1),
balanceChecker: make(chan struct{}, 1),
manualCheckChs := map[utils.CheckerType]chan struct{}{
utils.ChannelChecker: make(chan struct{}, 1),
utils.SegmentChecker: make(chan struct{}, 1),
utils.BalanceChecker: make(chan struct{}, 1),
}
return &CheckerController{
@ -124,22 +96,24 @@ func (controller *CheckerController) Start() {
}
}
func getCheckerInterval(checker CheckerType) time.Duration {
func getCheckerInterval(checker utils.CheckerType) time.Duration {
switch checker {
case segmentChecker:
case utils.SegmentChecker:
return Params.QueryCoordCfg.SegmentCheckInterval.GetAsDuration(time.Millisecond)
case channelChecker:
case utils.ChannelChecker:
return Params.QueryCoordCfg.ChannelCheckInterval.GetAsDuration(time.Millisecond)
case balanceChecker:
case utils.BalanceChecker:
return Params.QueryCoordCfg.BalanceCheckInterval.GetAsDuration(time.Millisecond)
case indexChecker:
case utils.IndexChecker:
return Params.QueryCoordCfg.IndexCheckInterval.GetAsDuration(time.Millisecond)
case utils.LeaderChecker:
return Params.QueryCoordCfg.LeaderViewUpdateInterval.GetAsDuration(time.Millisecond)
default:
return Params.QueryCoordCfg.CheckInterval.GetAsDuration(time.Millisecond)
}
}
func (controller *CheckerController) startChecker(ctx context.Context, checker CheckerType) {
func (controller *CheckerController) startChecker(ctx context.Context, checker utils.CheckerType) {
interval := getCheckerInterval(checker)
ticker := time.NewTicker(interval)
defer ticker.Stop()
@ -180,7 +154,7 @@ func (controller *CheckerController) Check() {
}
// check is the real implementation of Check
func (controller *CheckerController) check(ctx context.Context, checkType CheckerType) {
func (controller *CheckerController) check(ctx context.Context, checkType utils.CheckerType) {
checker := controller.checkers[checkType]
tasks := checker.Check(ctx)
@ -193,7 +167,7 @@ func (controller *CheckerController) check(ctx context.Context, checkType Checke
}
}
func (controller *CheckerController) Deactivate(typ CheckerType) error {
func (controller *CheckerController) Deactivate(typ utils.CheckerType) error {
for _, checker := range controller.checkers {
if checker.ID() == typ {
checker.Deactivate()
@ -203,7 +177,7 @@ func (controller *CheckerController) Deactivate(typ CheckerType) error {
return errTypeNotFound
}
func (controller *CheckerController) Activate(typ CheckerType) error {
func (controller *CheckerController) Activate(typ utils.CheckerType) error {
for _, checker := range controller.checkers {
if checker.ID() == typ {
checker.Activate()
@ -213,7 +187,7 @@ func (controller *CheckerController) Activate(typ CheckerType) error {
return errTypeNotFound
}
func (controller *CheckerController) IsActive(typ CheckerType) (bool, error) {
func (controller *CheckerController) IsActive(typ utils.CheckerType) (bool, error) {
for _, checker := range controller.checkers {
if checker.ID() == typ {
return checker.IsActive(), nil

View File

@ -29,6 +29,7 @@ import (
. "github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/internal/querycoordv2/task"
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
"github.com/milvus-io/milvus/pkg/util/etcd"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
@ -80,28 +81,28 @@ func (suite *ControllerBaseTestSuite) SetupTest() {
}
func (s *ControllerBaseTestSuite) TestActivation() {
active, err := s.controller.IsActive(segmentChecker)
active, err := s.controller.IsActive(utils.SegmentChecker)
s.NoError(err)
s.True(active)
err = s.controller.Deactivate(segmentChecker)
err = s.controller.Deactivate(utils.SegmentChecker)
s.NoError(err)
active, err = s.controller.IsActive(segmentChecker)
active, err = s.controller.IsActive(utils.SegmentChecker)
s.NoError(err)
s.False(active)
err = s.controller.Activate(segmentChecker)
err = s.controller.Activate(utils.SegmentChecker)
s.NoError(err)
active, err = s.controller.IsActive(segmentChecker)
active, err = s.controller.IsActive(utils.SegmentChecker)
s.NoError(err)
s.True(active)
invalidTyp := -1
_, err = s.controller.IsActive(CheckerType(invalidTyp))
_, err = s.controller.IsActive(utils.CheckerType(invalidTyp))
s.Equal(errTypeNotFound, err)
}
func (s *ControllerBaseTestSuite) TestListCheckers() {
checkers := s.controller.Checkers()
s.Equal(4, len(checkers))
s.Equal(5, len(checkers))
}
func TestControllerBaseTestSuite(t *testing.T) {

View File

@ -28,6 +28,7 @@ import (
"github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/internal/querycoordv2/task"
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
@ -58,8 +59,8 @@ func NewIndexChecker(
}
}
func (c *IndexChecker) ID() CheckerType {
return indexChecker
func (c *IndexChecker) ID() utils.CheckerType {
return utils.IndexChecker
}
func (c *IndexChecker) Description() string {

View File

@ -0,0 +1,205 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package checkers
import (
"context"
"time"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
"github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/internal/querycoordv2/task"
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
var _ Checker = (*LeaderChecker)(nil)
// LeaderChecker perform segment index check.
type LeaderChecker struct {
*checkerActivation
meta *meta.Meta
dist *meta.DistributionManager
target *meta.TargetManager
nodeMgr *session.NodeManager
}
func NewLeaderChecker(
meta *meta.Meta,
dist *meta.DistributionManager,
target *meta.TargetManager,
nodeMgr *session.NodeManager,
) *LeaderChecker {
return &LeaderChecker{
checkerActivation: newCheckerActivation(),
meta: meta,
dist: dist,
target: target,
nodeMgr: nodeMgr,
}
}
func (c *LeaderChecker) ID() utils.CheckerType {
return utils.LeaderChecker
}
func (c *LeaderChecker) Description() string {
return "LeaderChecker checks the difference of leader view between dist, and try to correct it"
}
func (c *LeaderChecker) Check(ctx context.Context) []task.Task {
if !c.IsActive() {
return nil
}
collectionIDs := c.meta.CollectionManager.GetAll()
tasks := make([]task.Task, 0)
for _, collectionID := range collectionIDs {
collection := c.meta.CollectionManager.GetCollection(collectionID)
if collection == nil {
log.Warn("collection released during check leader", zap.Int64("collection", collectionID))
continue
}
replicas := c.meta.ReplicaManager.GetByCollection(collectionID)
for _, replica := range replicas {
for _, node := range replica.GetNodes() {
if ok, _ := c.nodeMgr.IsStoppingNode(node); ok {
// no need to correct leader's view which is loaded on stopping node
continue
}
leaderViews := c.dist.LeaderViewManager.GetByCollectionAndNode(replica.GetCollectionID(), node)
for ch, leaderView := range leaderViews {
dist := c.dist.SegmentDistManager.GetByShardWithReplica(ch, replica)
tasks = append(tasks, c.findNeedLoadedSegments(ctx, replica.ID, leaderView, dist)...)
tasks = append(tasks, c.findNeedRemovedSegments(ctx, replica.ID, leaderView, dist)...)
}
}
}
}
return tasks
}
func (c *LeaderChecker) findNeedLoadedSegments(ctx context.Context, replica int64, leaderView *meta.LeaderView, dist []*meta.Segment) []task.Task {
log := log.Ctx(ctx).With(
zap.Int64("collectionID", leaderView.CollectionID),
zap.Int64("replica", replica),
zap.String("channel", leaderView.Channel),
zap.Int64("leaderViewID", leaderView.ID),
)
ret := make([]task.Task, 0)
dist = utils.FindMaxVersionSegments(dist)
for _, s := range dist {
version, ok := leaderView.Segments[s.GetID()]
currentTarget := c.target.GetSealedSegment(s.CollectionID, s.GetID(), meta.CurrentTarget)
existInCurrentTarget := currentTarget != nil
existInNextTarget := c.target.GetSealedSegment(s.CollectionID, s.GetID(), meta.NextTarget) != nil
if !existInCurrentTarget && !existInNextTarget {
continue
}
leaderWithOldVersion := version.GetVersion() < s.Version
// leader has newer version, but the query node which loaded the newer version has been shutdown
leaderWithDirtyVersion := version.GetVersion() > s.Version && c.nodeMgr.Get(version.GetNodeID()) == nil
if !ok || leaderWithOldVersion || leaderWithDirtyVersion {
log.Debug("leader checker append a segment to set",
zap.Int64("segmentID", s.GetID()),
zap.Int64("nodeID", s.Node))
action := task.NewSegmentActionWithScope(s.Node, task.ActionTypeGrow, s.GetInsertChannel(), s.GetID(), querypb.DataScope_Historical)
t, err := task.NewSegmentTask(
ctx,
params.Params.QueryCoordCfg.SegmentTaskTimeout.GetAsDuration(time.Millisecond),
c.ID(),
s.GetCollectionID(),
replica,
action,
)
if err != nil {
log.Warn("create segment update task failed",
zap.Int64("segmentID", s.GetID()),
zap.Int64("node", s.Node),
zap.Error(err),
)
continue
}
// index task shall have lower or equal priority than balance task
t.SetPriority(task.TaskPriorityHigh)
t.SetReason("add segment to leader view")
ret = append(ret, t)
}
}
return ret
}
func (c *LeaderChecker) findNeedRemovedSegments(ctx context.Context, replica int64, leaderView *meta.LeaderView, dists []*meta.Segment) []task.Task {
log := log.Ctx(ctx).With(
zap.Int64("collectionID", leaderView.CollectionID),
zap.Int64("replica", replica),
zap.String("channel", leaderView.Channel),
zap.Int64("leaderViewID", leaderView.ID),
)
ret := make([]task.Task, 0)
distMap := make(map[int64]struct{})
for _, s := range dists {
distMap[s.GetID()] = struct{}{}
}
for sid, s := range leaderView.Segments {
_, ok := distMap[sid]
existInCurrentTarget := c.target.GetSealedSegment(leaderView.CollectionID, sid, meta.CurrentTarget) != nil
existInNextTarget := c.target.GetSealedSegment(leaderView.CollectionID, sid, meta.NextTarget) != nil
if ok || existInCurrentTarget || existInNextTarget {
continue
}
log.Debug("leader checker append a segment to remove",
zap.Int64("segmentID", sid),
zap.Int64("nodeID", s.NodeID))
action := task.NewSegmentActionWithScope(leaderView.ID, task.ActionTypeReduce, leaderView.Channel, sid, querypb.DataScope_Historical)
t, err := task.NewSegmentTask(
ctx,
paramtable.Get().QueryCoordCfg.SegmentTaskTimeout.GetAsDuration(time.Millisecond),
c.ID(),
leaderView.CollectionID,
replica,
action,
)
if err != nil {
log.Warn("create segment reduce task failed",
zap.Int64("segmentID", sid),
zap.Int64("nodeID", s.NodeID),
zap.Error(err))
continue
}
t.SetPriority(task.TaskPriorityHigh)
t.SetReason("remove segment from leader view")
ret = append(ret, t)
}
return ret
}

View File

@ -0,0 +1,410 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package checkers
import (
"context"
"testing"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus/internal/kv"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/metastore/kv/querycoord"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
. "github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/internal/querycoordv2/task"
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
"github.com/milvus-io/milvus/pkg/util/etcd"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
type LeaderCheckerTestSuite struct {
suite.Suite
checker *LeaderChecker
kv kv.MetaKv
meta *meta.Meta
broker *meta.MockBroker
nodeMgr *session.NodeManager
}
func (suite *LeaderCheckerTestSuite) SetupSuite() {
paramtable.Init()
}
func (suite *LeaderCheckerTestSuite) SetupTest() {
var err error
config := GenerateEtcdConfig()
cli, err := etcd.GetEtcdClient(
config.UseEmbedEtcd.GetAsBool(),
config.EtcdUseSSL.GetAsBool(),
config.Endpoints.GetAsStrings(),
config.EtcdTLSCert.GetValue(),
config.EtcdTLSKey.GetValue(),
config.EtcdTLSCACert.GetValue(),
config.EtcdTLSMinVersion.GetValue())
suite.Require().NoError(err)
suite.kv = etcdkv.NewEtcdKV(cli, config.MetaRootPath.GetValue())
// meta
store := querycoord.NewCatalog(suite.kv)
idAllocator := RandomIncrementIDAllocator()
suite.nodeMgr = session.NewNodeManager()
suite.meta = meta.NewMeta(idAllocator, store, suite.nodeMgr)
suite.broker = meta.NewMockBroker(suite.T())
distManager := meta.NewDistributionManager()
targetManager := meta.NewTargetManager(suite.broker, suite.meta)
suite.checker = NewLeaderChecker(suite.meta, distManager, targetManager, suite.nodeMgr)
}
func (suite *LeaderCheckerTestSuite) TearDownTest() {
suite.kv.Close()
}
func (suite *LeaderCheckerTestSuite) TestSyncLoadedSegments() {
observer := suite.checker
observer.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 1))
observer.meta.CollectionManager.PutPartition(utils.CreateTestPartition(1, 1))
observer.meta.ReplicaManager.Put(utils.CreateTestReplica(1, 1, []int64{1, 2}))
segments := []*datapb.SegmentInfo{
{
ID: 1,
PartitionID: 1,
InsertChannel: "test-insert-channel",
},
}
channels := []*datapb.VchannelInfo{
{
CollectionID: 1,
ChannelName: "test-insert-channel",
},
}
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return(
channels, segments, nil)
observer.target.UpdateCollectionNextTarget(int64(1))
observer.target.UpdateCollectionCurrentTarget(1)
observer.dist.SegmentDistManager.Update(1, utils.CreateTestSegment(1, 1, 1, 2, 1, "test-insert-channel"))
observer.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 1, "test-insert-channel"))
view := utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{}, map[int64]*meta.Segment{})
view.TargetVersion = observer.target.GetCollectionTargetVersion(1, meta.CurrentTarget)
observer.dist.LeaderViewManager.Update(2, view)
tasks := suite.checker.Check(context.TODO())
suite.Len(tasks, 1)
suite.Equal(tasks[0].Source(), utils.LeaderChecker)
suite.Len(tasks[0].Actions(), 1)
suite.Equal(tasks[0].Actions()[0].Type(), task.ActionTypeGrow)
suite.Equal(tasks[0].Actions()[0].Node(), int64(1))
suite.Equal(tasks[0].Actions()[0].(*task.SegmentAction).SegmentID(), int64(1))
suite.Equal(tasks[0].Priority(), task.TaskPriorityHigh)
}
func (suite *LeaderCheckerTestSuite) TestActivation() {
observer := suite.checker
observer.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 1))
observer.meta.CollectionManager.PutPartition(utils.CreateTestPartition(1, 1))
observer.meta.ReplicaManager.Put(utils.CreateTestReplica(1, 1, []int64{1, 2}))
segments := []*datapb.SegmentInfo{
{
ID: 1,
PartitionID: 1,
InsertChannel: "test-insert-channel",
},
}
channels := []*datapb.VchannelInfo{
{
CollectionID: 1,
ChannelName: "test-insert-channel",
},
}
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return(
channels, segments, nil)
observer.target.UpdateCollectionNextTarget(int64(1))
observer.target.UpdateCollectionCurrentTarget(1)
observer.dist.SegmentDistManager.Update(1, utils.CreateTestSegment(1, 1, 1, 2, 1, "test-insert-channel"))
observer.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 1, "test-insert-channel"))
view := utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{}, map[int64]*meta.Segment{})
view.TargetVersion = observer.target.GetCollectionTargetVersion(1, meta.CurrentTarget)
observer.dist.LeaderViewManager.Update(2, view)
suite.checker.Deactivate()
tasks := suite.checker.Check(context.TODO())
suite.Len(tasks, 0)
suite.checker.Activate()
tasks = suite.checker.Check(context.TODO())
suite.Len(tasks, 1)
suite.Equal(tasks[0].Source(), utils.LeaderChecker)
suite.Len(tasks[0].Actions(), 1)
suite.Equal(tasks[0].Actions()[0].Type(), task.ActionTypeGrow)
suite.Equal(tasks[0].Actions()[0].Node(), int64(1))
suite.Equal(tasks[0].Actions()[0].(*task.SegmentAction).SegmentID(), int64(1))
suite.Equal(tasks[0].Priority(), task.TaskPriorityHigh)
}
func (suite *LeaderCheckerTestSuite) TestStoppingNode() {
observer := suite.checker
observer.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 1))
observer.meta.CollectionManager.PutPartition(utils.CreateTestPartition(1, 1))
observer.meta.ReplicaManager.Put(utils.CreateTestReplica(1, 1, []int64{1, 2}))
segments := []*datapb.SegmentInfo{
{
ID: 1,
PartitionID: 1,
InsertChannel: "test-insert-channel",
},
}
channels := []*datapb.VchannelInfo{
{
CollectionID: 1,
ChannelName: "test-insert-channel",
},
}
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return(
channels, segments, nil)
observer.target.UpdateCollectionNextTarget(int64(1))
observer.target.UpdateCollectionCurrentTarget(1)
observer.dist.SegmentDistManager.Update(1, utils.CreateTestSegment(1, 1, 1, 2, 1, "test-insert-channel"))
observer.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 1, "test-insert-channel"))
view := utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{}, map[int64]*meta.Segment{})
view.TargetVersion = observer.target.GetCollectionTargetVersion(1, meta.CurrentTarget)
observer.dist.LeaderViewManager.Update(2, view)
suite.nodeMgr.Add(session.NewNodeInfo(2, "localhost"))
suite.nodeMgr.Stopping(2)
tasks := suite.checker.Check(context.TODO())
suite.Len(tasks, 0)
}
func (suite *LeaderCheckerTestSuite) TestIgnoreSyncLoadedSegments() {
observer := suite.checker
observer.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 1))
observer.meta.CollectionManager.PutPartition(utils.CreateTestPartition(1, 1))
observer.meta.ReplicaManager.Put(utils.CreateTestReplica(1, 1, []int64{1, 2}))
segments := []*datapb.SegmentInfo{
{
ID: 1,
PartitionID: 1,
InsertChannel: "test-insert-channel",
},
}
channels := []*datapb.VchannelInfo{
{
CollectionID: 1,
ChannelName: "test-insert-channel",
},
}
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return(
channels, segments, nil)
observer.target.UpdateCollectionNextTarget(int64(1))
observer.target.UpdateCollectionCurrentTarget(1)
observer.target.UpdateCollectionNextTarget(int64(1))
observer.dist.SegmentDistManager.Update(1, utils.CreateTestSegment(1, 1, 1, 2, 1, "test-insert-channel"),
utils.CreateTestSegment(1, 1, 2, 2, 1, "test-insert-channel"))
observer.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 1, "test-insert-channel"))
view := utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{}, map[int64]*meta.Segment{})
view.TargetVersion = observer.target.GetCollectionTargetVersion(1, meta.CurrentTarget)
observer.dist.LeaderViewManager.Update(2, view)
tasks := suite.checker.Check(context.TODO())
suite.Len(tasks, 1)
suite.Equal(tasks[0].Source(), utils.LeaderChecker)
suite.Len(tasks[0].Actions(), 1)
suite.Equal(tasks[0].Actions()[0].Type(), task.ActionTypeGrow)
suite.Equal(tasks[0].Actions()[0].Node(), int64(1))
suite.Equal(tasks[0].Actions()[0].(*task.SegmentAction).SegmentID(), int64(1))
suite.Equal(tasks[0].Priority(), task.TaskPriorityHigh)
}
func (suite *LeaderCheckerTestSuite) TestIgnoreBalancedSegment() {
observer := suite.checker
observer.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 1))
observer.meta.CollectionManager.PutPartition(utils.CreateTestPartition(1, 1))
observer.meta.ReplicaManager.Put(utils.CreateTestReplica(1, 1, []int64{1, 2}))
segments := []*datapb.SegmentInfo{
{
ID: 1,
PartitionID: 1,
InsertChannel: "test-insert-channel",
},
}
channels := []*datapb.VchannelInfo{
{
CollectionID: 1,
ChannelName: "test-insert-channel",
},
}
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return(
channels, segments, nil)
observer.target.UpdateCollectionNextTarget(int64(1))
observer.target.UpdateCollectionCurrentTarget(1)
observer.dist.SegmentDistManager.Update(1, utils.CreateTestSegment(1, 1, 1, 1, 1, "test-insert-channel"))
observer.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 1, "test-insert-channel"))
// dist with older version and leader view with newer version
leaderView := utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{}, map[int64]*meta.Segment{})
leaderView.Segments[1] = &querypb.SegmentDist{
NodeID: 2,
Version: 2,
}
leaderView.TargetVersion = observer.target.GetCollectionTargetVersion(1, meta.CurrentTarget)
observer.dist.LeaderViewManager.Update(2, leaderView)
// test querynode-1 and querynode-2 exist
suite.nodeMgr.Add(session.NewNodeInfo(1, "localhost"))
suite.nodeMgr.Add(session.NewNodeInfo(2, "localhost"))
tasks := suite.checker.Check(context.TODO())
suite.Len(tasks, 0)
// test querynode-2 crash
suite.nodeMgr.Remove(2)
tasks = suite.checker.Check(context.TODO())
suite.Len(tasks, 1)
suite.Equal(tasks[0].Source(), utils.LeaderChecker)
suite.Len(tasks[0].Actions(), 1)
suite.Equal(tasks[0].Actions()[0].Type(), task.ActionTypeGrow)
suite.Equal(tasks[0].Actions()[0].Node(), int64(1))
suite.Equal(tasks[0].Actions()[0].(*task.SegmentAction).SegmentID(), int64(1))
suite.Equal(tasks[0].Priority(), task.TaskPriorityHigh)
}
func (suite *LeaderCheckerTestSuite) TestSyncLoadedSegmentsWithReplicas() {
observer := suite.checker
observer.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 2))
observer.meta.CollectionManager.PutPartition(utils.CreateTestPartition(1, 1))
observer.meta.ReplicaManager.Put(utils.CreateTestReplica(1, 1, []int64{1, 2}))
observer.meta.ReplicaManager.Put(utils.CreateTestReplica(2, 1, []int64{3, 4}))
segments := []*datapb.SegmentInfo{
{
ID: 1,
PartitionID: 1,
InsertChannel: "test-insert-channel",
},
}
channels := []*datapb.VchannelInfo{
{
CollectionID: 1,
ChannelName: "test-insert-channel",
},
}
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return(
channels, segments, nil)
observer.target.UpdateCollectionNextTarget(int64(1))
observer.target.UpdateCollectionCurrentTarget(1)
observer.dist.SegmentDistManager.Update(1, utils.CreateTestSegment(1, 1, 1, 1, 0, "test-insert-channel"))
observer.dist.SegmentDistManager.Update(4, utils.CreateTestSegment(1, 1, 1, 4, 0, "test-insert-channel"))
observer.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 1, "test-insert-channel"))
observer.dist.ChannelDistManager.Update(4, utils.CreateTestChannel(1, 4, 2, "test-insert-channel"))
view := utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{}, map[int64]*meta.Segment{})
view.TargetVersion = observer.target.GetCollectionTargetVersion(1, meta.CurrentTarget)
observer.dist.LeaderViewManager.Update(2, view)
view2 := utils.CreateTestLeaderView(4, 1, "test-insert-channel", map[int64]int64{1: 4}, map[int64]*meta.Segment{})
view.TargetVersion = observer.target.GetCollectionTargetVersion(1, meta.CurrentTarget)
observer.dist.LeaderViewManager.Update(4, view2)
tasks := suite.checker.Check(context.TODO())
suite.Len(tasks, 1)
suite.Equal(tasks[0].Source(), utils.LeaderChecker)
suite.Equal(tasks[0].ReplicaID(), int64(1))
suite.Len(tasks[0].Actions(), 1)
suite.Equal(tasks[0].Actions()[0].Type(), task.ActionTypeGrow)
suite.Equal(tasks[0].Actions()[0].Node(), int64(1))
suite.Equal(tasks[0].Actions()[0].(*task.SegmentAction).SegmentID(), int64(1))
suite.Equal(tasks[0].Priority(), task.TaskPriorityHigh)
}
func (suite *LeaderCheckerTestSuite) TestSyncRemovedSegments() {
observer := suite.checker
observer.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 1))
observer.meta.CollectionManager.PutPartition(utils.CreateTestPartition(1, 1))
observer.meta.ReplicaManager.Put(utils.CreateTestReplica(1, 1, []int64{1, 2}))
channels := []*datapb.VchannelInfo{
{
CollectionID: 1,
ChannelName: "test-insert-channel",
},
}
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return(
channels, nil, nil)
observer.target.UpdateCollectionNextTarget(int64(1))
observer.target.UpdateCollectionCurrentTarget(1)
observer.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 1, "test-insert-channel"))
view := utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{3: 2}, map[int64]*meta.Segment{})
view.TargetVersion = observer.target.GetCollectionTargetVersion(1, meta.CurrentTarget)
observer.dist.LeaderViewManager.Update(2, view)
tasks := suite.checker.Check(context.TODO())
suite.Len(tasks, 1)
suite.Equal(tasks[0].Source(), utils.LeaderChecker)
suite.Equal(tasks[0].ReplicaID(), int64(1))
suite.Len(tasks[0].Actions(), 1)
suite.Equal(tasks[0].Actions()[0].Type(), task.ActionTypeReduce)
suite.Equal(tasks[0].Actions()[0].Node(), int64(2))
suite.Equal(tasks[0].Actions()[0].(*task.SegmentAction).SegmentID(), int64(3))
suite.Equal(tasks[0].Priority(), task.TaskPriorityHigh)
}
func (suite *LeaderCheckerTestSuite) TestIgnoreSyncRemovedSegments() {
observer := suite.checker
observer.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 1))
observer.meta.CollectionManager.PutPartition(utils.CreateTestPartition(1, 1))
observer.meta.ReplicaManager.Put(utils.CreateTestReplica(1, 1, []int64{1, 2}))
segments := []*datapb.SegmentInfo{
{
ID: 2,
PartitionID: 1,
InsertChannel: "test-insert-channel",
},
}
channels := []*datapb.VchannelInfo{
{
CollectionID: 1,
ChannelName: "test-insert-channel",
},
}
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return(
channels, segments, nil)
observer.target.UpdateCollectionNextTarget(int64(1))
observer.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 1, "test-insert-channel"))
observer.dist.LeaderViewManager.Update(2, utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{3: 2, 2: 2}, map[int64]*meta.Segment{}))
tasks := suite.checker.Check(context.TODO())
suite.Len(tasks, 1)
suite.Equal(tasks[0].Source(), utils.LeaderChecker)
suite.Equal(tasks[0].ReplicaID(), int64(1))
suite.Len(tasks[0].Actions(), 1)
suite.Equal(tasks[0].Actions()[0].Type(), task.ActionTypeReduce)
suite.Equal(tasks[0].Actions()[0].Node(), int64(2))
suite.Equal(tasks[0].Actions()[0].(*task.SegmentAction).SegmentID(), int64(3))
suite.Equal(tasks[0].Priority(), task.TaskPriorityHigh)
}
func TestLeaderCheckerSuite(t *testing.T) {
suite.Run(t, new(LeaderCheckerTestSuite))
}

View File

@ -61,8 +61,8 @@ func NewSegmentChecker(
}
}
func (c *SegmentChecker) ID() CheckerType {
return segmentChecker
func (c *SegmentChecker) ID() utils.CheckerType {
return utils.SegmentChecker
}
func (c *SegmentChecker) Description() string {

View File

@ -41,7 +41,6 @@ type CollectionObserver struct {
meta *meta.Meta
targetMgr *meta.TargetManager
targetObserver *TargetObserver
leaderObserver *LeaderObserver
checkerController *checkers.CheckerController
partitionLoadedCount map[int64]int
@ -53,7 +52,6 @@ func NewCollectionObserver(
meta *meta.Meta,
targetMgr *meta.TargetManager,
targetObserver *TargetObserver,
leaderObserver *LeaderObserver,
checherController *checkers.CheckerController,
) *CollectionObserver {
return &CollectionObserver{
@ -61,7 +59,6 @@ func NewCollectionObserver(
meta: meta,
targetMgr: targetMgr,
targetObserver: targetObserver,
leaderObserver: leaderObserver,
checkerController: checherController,
partitionLoadedCount: make(map[int64]int),
}

View File

@ -66,7 +66,6 @@ type CollectionObserverSuite struct {
meta *meta.Meta
targetMgr *meta.TargetManager
targetObserver *TargetObserver
leaderObserver *LeaderObserver
checkerController *checkers.CheckerController
// Test object
@ -200,7 +199,6 @@ func (suite *CollectionObserverSuite) SetupTest() {
suite.checkerController = &checkers.CheckerController{}
mockCluster := session.NewMockCluster(suite.T())
suite.leaderObserver = NewLeaderObserver(suite.dist, suite.meta, suite.targetMgr, suite.broker, mockCluster, nodeMgr)
mockCluster.EXPECT().SyncDistribution(mock.Anything, mock.Anything, mock.Anything).Return(merr.Success(), nil).Maybe()
// Test object
@ -209,7 +207,6 @@ func (suite *CollectionObserverSuite) SetupTest() {
suite.meta,
suite.targetMgr,
suite.targetObserver,
suite.leaderObserver,
suite.checkerController,
)
@ -217,7 +214,6 @@ func (suite *CollectionObserverSuite) SetupTest() {
suite.broker.EXPECT().GetPartitions(mock.Anything, collection).Return(suite.partitions[collection], nil).Maybe()
}
suite.targetObserver.Start()
suite.leaderObserver.Start()
suite.ob.Start()
suite.loadAll()
}

View File

@ -1,290 +0,0 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package observers
import (
"context"
"sync"
"time"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
// LeaderObserver is to sync the distribution with leader
type LeaderObserver struct {
wg sync.WaitGroup
cancel context.CancelFunc
dist *meta.DistributionManager
meta *meta.Meta
target *meta.TargetManager
broker meta.Broker
cluster session.Cluster
nodeMgr *session.NodeManager
dispatcher *taskDispatcher[int64]
stopOnce sync.Once
}
func (o *LeaderObserver) Start() {
ctx, cancel := context.WithCancel(context.Background())
o.cancel = cancel
o.dispatcher.Start()
o.wg.Add(1)
go func() {
defer o.wg.Done()
o.schedule(ctx)
}()
}
func (o *LeaderObserver) Stop() {
o.stopOnce.Do(func() {
if o.cancel != nil {
o.cancel()
}
o.wg.Wait()
o.dispatcher.Stop()
})
}
func (o *LeaderObserver) schedule(ctx context.Context) {
ticker := time.NewTicker(paramtable.Get().QueryCoordCfg.LeaderViewUpdateInterval.GetAsDuration(time.Second))
defer ticker.Stop()
for {
select {
case <-ctx.Done():
log.Info("stop leader observer")
return
case <-ticker.C:
o.observe(ctx)
}
}
}
func (o *LeaderObserver) observe(ctx context.Context) {
o.observeSegmentsDist(ctx)
}
func (o *LeaderObserver) readyToObserve(collectionID int64) bool {
metaExist := (o.meta.GetCollection(collectionID) != nil)
targetExist := o.target.IsNextTargetExist(collectionID) || o.target.IsCurrentTargetExist(collectionID)
return metaExist && targetExist
}
func (o *LeaderObserver) observeSegmentsDist(ctx context.Context) {
collectionIDs := o.meta.CollectionManager.GetAll()
for _, cid := range collectionIDs {
if o.readyToObserve(cid) {
o.dispatcher.AddTask(cid)
}
}
}
func (o *LeaderObserver) observeCollection(ctx context.Context, collection int64) {
replicas := o.meta.ReplicaManager.GetByCollection(collection)
for _, replica := range replicas {
leaders := o.dist.ChannelDistManager.GetShardLeadersByReplica(replica)
for ch, leaderID := range leaders {
if ok, _ := o.nodeMgr.IsStoppingNode(leaderID); ok {
// no need to correct leader's view which is loaded on stopping node
continue
}
leaderView := o.dist.LeaderViewManager.GetLeaderShardView(leaderID, ch)
if leaderView == nil {
continue
}
dists := o.dist.SegmentDistManager.GetByShardWithReplica(ch, replica)
actions := o.findNeedLoadedSegments(leaderView, dists)
actions = append(actions, o.findNeedRemovedSegments(leaderView, dists)...)
o.sync(ctx, replica.GetID(), leaderView, actions)
}
}
}
func (o *LeaderObserver) findNeedLoadedSegments(leaderView *meta.LeaderView, dists []*meta.Segment) []*querypb.SyncAction {
ret := make([]*querypb.SyncAction, 0)
dists = utils.FindMaxVersionSegments(dists)
for _, s := range dists {
version, ok := leaderView.Segments[s.GetID()]
currentTarget := o.target.GetSealedSegment(s.CollectionID, s.GetID(), meta.CurrentTarget)
existInCurrentTarget := currentTarget != nil
existInNextTarget := o.target.GetSealedSegment(s.CollectionID, s.GetID(), meta.NextTarget) != nil
if !existInCurrentTarget && !existInNextTarget {
continue
}
if !ok || version.GetVersion() < s.Version { // Leader misses this segment
ctx := context.Background()
resp, err := o.broker.GetSegmentInfo(ctx, s.GetID())
if err != nil || len(resp.GetInfos()) == 0 {
log.Warn("failed to get segment info from DataCoord", zap.Error(err))
continue
}
if channel := o.target.GetDmChannel(s.GetCollectionID(), s.GetInsertChannel(), meta.CurrentTargetFirst); channel != nil {
loadInfo := utils.PackSegmentLoadInfo(resp.GetInfos()[0], channel.GetSeekPosition(), nil)
log.Debug("leader observer append a segment to set",
zap.Int64("collectionID", leaderView.CollectionID),
zap.String("channel", leaderView.Channel),
zap.Int64("leaderViewID", leaderView.ID),
zap.Int64("segmentID", s.GetID()),
zap.Int64("nodeID", s.Node))
ret = append(ret, &querypb.SyncAction{
Type: querypb.SyncType_Set,
PartitionID: s.GetPartitionID(),
SegmentID: s.GetID(),
NodeID: s.Node,
Version: s.Version,
Info: loadInfo,
})
}
}
}
return ret
}
func (o *LeaderObserver) findNeedRemovedSegments(leaderView *meta.LeaderView, dists []*meta.Segment) []*querypb.SyncAction {
ret := make([]*querypb.SyncAction, 0)
distMap := make(map[int64]struct{})
for _, s := range dists {
distMap[s.GetID()] = struct{}{}
}
for sid, s := range leaderView.Segments {
_, ok := distMap[sid]
existInCurrentTarget := o.target.GetSealedSegment(leaderView.CollectionID, sid, meta.CurrentTarget) != nil
existInNextTarget := o.target.GetSealedSegment(leaderView.CollectionID, sid, meta.NextTarget) != nil
if ok || existInCurrentTarget || existInNextTarget {
continue
}
log.Debug("leader observer append a segment to remove",
zap.Int64("collectionID", leaderView.CollectionID),
zap.String("channel", leaderView.Channel),
zap.Int64("leaderViewID", leaderView.ID),
zap.Int64("segmentID", sid),
zap.Int64("nodeID", s.NodeID))
ret = append(ret, &querypb.SyncAction{
Type: querypb.SyncType_Remove,
SegmentID: sid,
NodeID: s.NodeID,
})
}
return ret
}
func (o *LeaderObserver) sync(ctx context.Context, replicaID int64, leaderView *meta.LeaderView, diffs []*querypb.SyncAction) bool {
if len(diffs) == 0 {
return true
}
log := log.With(
zap.Int64("leaderID", leaderView.ID),
zap.Int64("collectionID", leaderView.CollectionID),
zap.String("channel", leaderView.Channel),
)
collectionInfo, err := o.broker.DescribeCollection(ctx, leaderView.CollectionID)
if err != nil {
log.Warn("failed to get collection info", zap.Error(err))
return false
}
// Get collection index info
indexInfo, err := o.broker.DescribeIndex(ctx, collectionInfo.CollectionID)
if err != nil {
log.Warn("fail to get index info of collection", zap.Error(err))
return false
}
partitions, err := utils.GetPartitions(o.meta.CollectionManager, leaderView.CollectionID)
if err != nil {
log.Warn("failed to get partitions", zap.Error(err))
return false
}
req := &querypb.SyncDistributionRequest{
Base: commonpbutil.NewMsgBase(
commonpbutil.WithMsgType(commonpb.MsgType_SyncDistribution),
),
CollectionID: leaderView.CollectionID,
ReplicaID: replicaID,
Channel: leaderView.Channel,
Actions: diffs,
Schema: collectionInfo.GetSchema(),
LoadMeta: &querypb.LoadMetaInfo{
LoadType: o.meta.GetLoadType(leaderView.CollectionID),
CollectionID: leaderView.CollectionID,
PartitionIDs: partitions,
},
Version: time.Now().UnixNano(),
IndexInfoList: indexInfo,
}
ctx, cancel := context.WithTimeout(ctx, paramtable.Get().QueryCoordCfg.SegmentTaskTimeout.GetAsDuration(time.Millisecond))
defer cancel()
resp, err := o.cluster.SyncDistribution(ctx, leaderView.ID, req)
if err != nil {
log.Warn("failed to sync distribution", zap.Error(err))
return false
}
if resp.ErrorCode != commonpb.ErrorCode_Success {
log.Warn("failed to sync distribution", zap.String("reason", resp.GetReason()))
return false
}
return true
}
func NewLeaderObserver(
dist *meta.DistributionManager,
meta *meta.Meta,
targetMgr *meta.TargetManager,
broker meta.Broker,
cluster session.Cluster,
nodeMgr *session.NodeManager,
) *LeaderObserver {
ob := &LeaderObserver{
dist: dist,
meta: meta,
target: targetMgr,
broker: broker,
cluster: cluster,
nodeMgr: nodeMgr,
}
dispatcher := newTaskDispatcher[int64](ob.observeCollection)
ob.dispatcher = dispatcher
return ob
}

View File

@ -1,558 +0,0 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package observers
import (
"context"
"fmt"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/suite"
"go.uber.org/atomic"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus-proto/go-api/v2/milvuspb"
"github.com/milvus-io/milvus/internal/kv"
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/metastore/kv/querycoord"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
. "github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
"github.com/milvus-io/milvus/pkg/util/etcd"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
type LeaderObserverTestSuite struct {
suite.Suite
observer *LeaderObserver
kv kv.MetaKv
mockCluster *session.MockCluster
meta *meta.Meta
broker *meta.MockBroker
}
func (suite *LeaderObserverTestSuite) SetupSuite() {
paramtable.Init()
}
func (suite *LeaderObserverTestSuite) SetupTest() {
var err error
config := GenerateEtcdConfig()
cli, err := etcd.GetEtcdClient(
config.UseEmbedEtcd.GetAsBool(),
config.EtcdUseSSL.GetAsBool(),
config.Endpoints.GetAsStrings(),
config.EtcdTLSCert.GetValue(),
config.EtcdTLSKey.GetValue(),
config.EtcdTLSCACert.GetValue(),
config.EtcdTLSMinVersion.GetValue())
suite.Require().NoError(err)
suite.kv = etcdkv.NewEtcdKV(cli, config.MetaRootPath.GetValue())
// meta
store := querycoord.NewCatalog(suite.kv)
idAllocator := RandomIncrementIDAllocator()
nodeMgr := session.NewNodeManager()
suite.meta = meta.NewMeta(idAllocator, store, nodeMgr)
suite.broker = meta.NewMockBroker(suite.T())
suite.mockCluster = session.NewMockCluster(suite.T())
// suite.mockCluster.EXPECT().SyncDistribution(mock.Anything, mock.Anything, mock.Anything).Return(&commonpb.Status{
// ErrorCode: commonpb.ErrorCode_Success,
// }, nil).Maybe()
distManager := meta.NewDistributionManager()
targetManager := meta.NewTargetManager(suite.broker, suite.meta)
suite.observer = NewLeaderObserver(distManager, suite.meta, targetManager, suite.broker, suite.mockCluster, nodeMgr)
}
func (suite *LeaderObserverTestSuite) TearDownTest() {
suite.observer.Stop()
suite.kv.Close()
}
func (suite *LeaderObserverTestSuite) TestSyncLoadedSegments() {
observer := suite.observer
observer.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 1))
observer.meta.CollectionManager.PutPartition(utils.CreateTestPartition(1, 1))
observer.meta.ReplicaManager.Put(utils.CreateTestReplica(1, 1, []int64{1, 2}))
segments := []*datapb.SegmentInfo{
{
ID: 1,
PartitionID: 1,
InsertChannel: "test-insert-channel",
},
}
channels := []*datapb.VchannelInfo{
{
CollectionID: 1,
ChannelName: "test-insert-channel",
},
}
info := &datapb.SegmentInfo{
ID: 1,
CollectionID: 1,
PartitionID: 1,
InsertChannel: "test-insert-channel",
}
schema := utils.CreateTestSchema()
suite.broker.EXPECT().DescribeCollection(mock.Anything, int64(1)).Return(&milvuspb.DescribeCollectionResponse{Schema: schema}, nil)
suite.broker.EXPECT().GetSegmentInfo(mock.Anything, int64(1)).Return(
&datapb.GetSegmentInfoResponse{Infos: []*datapb.SegmentInfo{info}}, nil)
// will cause sync failed once
suite.broker.EXPECT().DescribeIndex(mock.Anything, mock.Anything).Return(nil, fmt.Errorf("mock error")).Once()
suite.broker.EXPECT().DescribeIndex(mock.Anything, mock.Anything).Return([]*indexpb.IndexInfo{
{IndexName: "test"},
}, nil)
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return(
channels, segments, nil)
observer.target.UpdateCollectionNextTarget(int64(1))
observer.target.UpdateCollectionCurrentTarget(1)
observer.dist.SegmentDistManager.Update(1, utils.CreateTestSegment(1, 1, 1, 2, 1, "test-insert-channel"))
observer.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 1, "test-insert-channel"))
view := utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{}, map[int64]*meta.Segment{})
view.TargetVersion = observer.target.GetCollectionTargetVersion(1, meta.CurrentTarget)
observer.dist.LeaderViewManager.Update(2, view)
loadInfo := utils.PackSegmentLoadInfo(info, nil, nil)
expectReqeustFunc := func(version int64) *querypb.SyncDistributionRequest {
return &querypb.SyncDistributionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_SyncDistribution,
},
CollectionID: 1,
ReplicaID: 1,
Channel: "test-insert-channel",
Actions: []*querypb.SyncAction{
{
Type: querypb.SyncType_Set,
PartitionID: 1,
SegmentID: 1,
NodeID: 1,
Version: 1,
Info: loadInfo,
},
},
Schema: schema,
LoadMeta: &querypb.LoadMetaInfo{
CollectionID: 1,
PartitionIDs: []int64{1},
},
Version: version,
IndexInfoList: []*indexpb.IndexInfo{{IndexName: "test"}},
}
}
called := atomic.NewBool(false)
suite.mockCluster.EXPECT().SyncDistribution(mock.Anything, int64(2),
mock.AnythingOfType("*querypb.SyncDistributionRequest")).
Run(func(ctx context.Context, nodeID int64, req *querypb.SyncDistributionRequest) {
assert.ElementsMatch(suite.T(), []*querypb.SyncDistributionRequest{req},
[]*querypb.SyncDistributionRequest{expectReqeustFunc(req.GetVersion())})
called.Store(true)
}).
Return(&commonpb.Status{}, nil)
observer.Start()
suite.Eventually(
func() bool {
return called.Load()
},
10*time.Second,
500*time.Millisecond,
)
}
func (suite *LeaderObserverTestSuite) TestIgnoreSyncLoadedSegments() {
observer := suite.observer
observer.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 1))
observer.meta.CollectionManager.PutPartition(utils.CreateTestPartition(1, 1))
observer.meta.ReplicaManager.Put(utils.CreateTestReplica(1, 1, []int64{1, 2}))
segments := []*datapb.SegmentInfo{
{
ID: 1,
PartitionID: 1,
InsertChannel: "test-insert-channel",
},
}
channels := []*datapb.VchannelInfo{
{
CollectionID: 1,
ChannelName: "test-insert-channel",
},
}
schema := utils.CreateTestSchema()
suite.broker.EXPECT().DescribeCollection(mock.Anything, int64(1)).Return(&milvuspb.DescribeCollectionResponse{Schema: schema}, nil)
info := &datapb.SegmentInfo{
ID: 1,
CollectionID: 1,
PartitionID: 1,
InsertChannel: "test-insert-channel",
}
suite.broker.EXPECT().GetSegmentInfo(mock.Anything, int64(1)).Return(
&datapb.GetSegmentInfoResponse{Infos: []*datapb.SegmentInfo{info}}, nil)
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return(
channels, segments, nil)
suite.broker.EXPECT().DescribeIndex(mock.Anything, mock.Anything).Return([]*indexpb.IndexInfo{
{IndexName: "test"},
}, nil)
observer.target.UpdateCollectionNextTarget(int64(1))
observer.target.UpdateCollectionCurrentTarget(1)
observer.target.UpdateCollectionNextTarget(int64(1))
observer.dist.SegmentDistManager.Update(1, utils.CreateTestSegment(1, 1, 1, 2, 1, "test-insert-channel"),
utils.CreateTestSegment(1, 1, 2, 2, 1, "test-insert-channel"))
observer.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 1, "test-insert-channel"))
view := utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{}, map[int64]*meta.Segment{})
view.TargetVersion = observer.target.GetCollectionTargetVersion(1, meta.CurrentTarget)
observer.dist.LeaderViewManager.Update(2, view)
loadInfo := utils.PackSegmentLoadInfo(info, nil, nil)
expectReqeustFunc := func(version int64) *querypb.SyncDistributionRequest {
return &querypb.SyncDistributionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_SyncDistribution,
},
CollectionID: 1,
ReplicaID: 1,
Channel: "test-insert-channel",
Actions: []*querypb.SyncAction{
{
Type: querypb.SyncType_Set,
PartitionID: 1,
SegmentID: 1,
NodeID: 1,
Version: 1,
Info: loadInfo,
},
},
Schema: schema,
LoadMeta: &querypb.LoadMetaInfo{
CollectionID: 1,
PartitionIDs: []int64{1},
},
Version: version,
IndexInfoList: []*indexpb.IndexInfo{{IndexName: "test"}},
}
}
called := atomic.NewBool(false)
suite.mockCluster.EXPECT().SyncDistribution(mock.Anything, int64(2), mock.AnythingOfType("*querypb.SyncDistributionRequest")).
Run(func(ctx context.Context, nodeID int64, req *querypb.SyncDistributionRequest) {
assert.ElementsMatch(suite.T(), []*querypb.SyncDistributionRequest{req},
[]*querypb.SyncDistributionRequest{expectReqeustFunc(req.GetVersion())})
called.Store(true)
}).
Return(&commonpb.Status{}, nil)
observer.Start()
suite.Eventually(
func() bool {
return called.Load()
},
10*time.Second,
500*time.Millisecond,
)
}
func (suite *LeaderObserverTestSuite) TestIgnoreBalancedSegment() {
observer := suite.observer
observer.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 1))
observer.meta.CollectionManager.PutPartition(utils.CreateTestPartition(1, 1))
observer.meta.ReplicaManager.Put(utils.CreateTestReplica(1, 1, []int64{1, 2}))
segments := []*datapb.SegmentInfo{
{
ID: 1,
PartitionID: 1,
InsertChannel: "test-insert-channel",
},
}
channels := []*datapb.VchannelInfo{
{
CollectionID: 1,
ChannelName: "test-insert-channel",
},
}
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return(
channels, segments, nil)
observer.target.UpdateCollectionNextTarget(int64(1))
observer.target.UpdateCollectionCurrentTarget(1)
observer.dist.SegmentDistManager.Update(1, utils.CreateTestSegment(1, 1, 1, 1, 1, "test-insert-channel"))
observer.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 1, "test-insert-channel"))
// The leader view saw the segment on new node,
// but another nodes not yet
leaderView := utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{}, map[int64]*meta.Segment{})
leaderView.Segments[1] = &querypb.SegmentDist{
NodeID: 2,
Version: 2,
}
leaderView.TargetVersion = observer.target.GetCollectionTargetVersion(1, meta.CurrentTarget)
observer.dist.LeaderViewManager.Update(2, leaderView)
observer.Start()
// Nothing should happen
time.Sleep(2 * time.Second)
}
func (suite *LeaderObserverTestSuite) TestSyncLoadedSegmentsWithReplicas() {
observer := suite.observer
observer.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 2))
observer.meta.CollectionManager.PutPartition(utils.CreateTestPartition(1, 1))
observer.meta.ReplicaManager.Put(utils.CreateTestReplica(1, 1, []int64{1, 2}))
observer.meta.ReplicaManager.Put(utils.CreateTestReplica(2, 1, []int64{3, 4}))
segments := []*datapb.SegmentInfo{
{
ID: 1,
PartitionID: 1,
InsertChannel: "test-insert-channel",
},
}
channels := []*datapb.VchannelInfo{
{
CollectionID: 1,
ChannelName: "test-insert-channel",
},
}
info := &datapb.SegmentInfo{
ID: 1,
CollectionID: 1,
PartitionID: 1,
InsertChannel: "test-insert-channel",
}
schema := utils.CreateTestSchema()
suite.broker.EXPECT().GetSegmentInfo(mock.Anything, int64(1)).Return(
&datapb.GetSegmentInfoResponse{Infos: []*datapb.SegmentInfo{info}}, nil)
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return(
channels, segments, nil)
suite.broker.EXPECT().DescribeIndex(mock.Anything, mock.Anything).Return([]*indexpb.IndexInfo{{IndexName: "test"}}, nil)
suite.broker.EXPECT().DescribeCollection(mock.Anything, int64(1)).Return(&milvuspb.DescribeCollectionResponse{Schema: schema}, nil)
observer.target.UpdateCollectionNextTarget(int64(1))
observer.target.UpdateCollectionCurrentTarget(1)
observer.dist.SegmentDistManager.Update(1, utils.CreateTestSegment(1, 1, 1, 1, 1, "test-insert-channel"))
observer.dist.SegmentDistManager.Update(4, utils.CreateTestSegment(1, 1, 1, 4, 2, "test-insert-channel"))
observer.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 1, "test-insert-channel"))
view := utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{}, map[int64]*meta.Segment{})
view.TargetVersion = observer.target.GetCollectionTargetVersion(1, meta.CurrentTarget)
observer.dist.LeaderViewManager.Update(2, view)
view2 := utils.CreateTestLeaderView(4, 1, "test-insert-channel", map[int64]int64{1: 4}, map[int64]*meta.Segment{})
view.TargetVersion = observer.target.GetCollectionTargetVersion(1, meta.CurrentTarget)
observer.dist.LeaderViewManager.Update(4, view2)
loadInfo := utils.PackSegmentLoadInfo(info, nil, nil)
expectReqeustFunc := func(version int64) *querypb.SyncDistributionRequest {
return &querypb.SyncDistributionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_SyncDistribution,
},
CollectionID: 1,
ReplicaID: 1,
Channel: "test-insert-channel",
Actions: []*querypb.SyncAction{
{
Type: querypb.SyncType_Set,
PartitionID: 1,
SegmentID: 1,
NodeID: 1,
Version: 1,
Info: loadInfo,
},
},
Schema: schema,
LoadMeta: &querypb.LoadMetaInfo{
CollectionID: 1,
PartitionIDs: []int64{1},
},
Version: version,
IndexInfoList: []*indexpb.IndexInfo{{IndexName: "test"}},
}
}
called := atomic.NewBool(false)
suite.mockCluster.EXPECT().SyncDistribution(mock.Anything, int64(2),
mock.AnythingOfType("*querypb.SyncDistributionRequest")).
Run(func(ctx context.Context, nodeID int64, req *querypb.SyncDistributionRequest) {
assert.ElementsMatch(suite.T(), []*querypb.SyncDistributionRequest{req},
[]*querypb.SyncDistributionRequest{expectReqeustFunc(req.GetVersion())})
called.Store(true)
}).
Return(&commonpb.Status{}, nil)
observer.Start()
suite.Eventually(
func() bool {
return called.Load()
},
10*time.Second,
500*time.Millisecond,
)
}
func (suite *LeaderObserverTestSuite) TestSyncRemovedSegments() {
observer := suite.observer
observer.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 1))
observer.meta.CollectionManager.PutPartition(utils.CreateTestPartition(1, 1))
observer.meta.ReplicaManager.Put(utils.CreateTestReplica(1, 1, []int64{1, 2}))
schema := utils.CreateTestSchema()
suite.broker.EXPECT().DescribeCollection(mock.Anything, int64(1)).Return(&milvuspb.DescribeCollectionResponse{Schema: schema}, nil)
suite.broker.EXPECT().DescribeIndex(mock.Anything, mock.Anything).Return([]*indexpb.IndexInfo{
{IndexName: "test"},
}, nil)
channels := []*datapb.VchannelInfo{
{
CollectionID: 1,
ChannelName: "test-insert-channel",
},
}
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return(
channels, nil, nil)
observer.target.UpdateCollectionNextTarget(int64(1))
observer.target.UpdateCollectionCurrentTarget(1)
observer.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 1, "test-insert-channel"))
view := utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{3: 2}, map[int64]*meta.Segment{})
view.TargetVersion = observer.target.GetCollectionTargetVersion(1, meta.CurrentTarget)
observer.dist.LeaderViewManager.Update(2, view)
expectReqeustFunc := func(version int64) *querypb.SyncDistributionRequest {
return &querypb.SyncDistributionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_SyncDistribution,
},
CollectionID: 1,
ReplicaID: 1,
Channel: "test-insert-channel",
Actions: []*querypb.SyncAction{
{
Type: querypb.SyncType_Remove,
SegmentID: 3,
NodeID: 2,
},
},
Schema: schema,
LoadMeta: &querypb.LoadMetaInfo{
CollectionID: 1,
PartitionIDs: []int64{1},
},
Version: version,
IndexInfoList: []*indexpb.IndexInfo{{IndexName: "test"}},
}
}
ch := make(chan struct{})
suite.mockCluster.EXPECT().SyncDistribution(mock.Anything, int64(2),
mock.AnythingOfType("*querypb.SyncDistributionRequest")).
Run(func(ctx context.Context, nodeID int64, req *querypb.SyncDistributionRequest) {
assert.ElementsMatch(suite.T(), []*querypb.SyncDistributionRequest{req},
[]*querypb.SyncDistributionRequest{expectReqeustFunc(req.GetVersion())})
close(ch)
}).
Return(&commonpb.Status{}, nil)
observer.Start()
select {
case <-ch:
case <-time.After(2 * time.Second):
}
}
func (suite *LeaderObserverTestSuite) TestIgnoreSyncRemovedSegments() {
observer := suite.observer
observer.meta.CollectionManager.PutCollection(utils.CreateTestCollection(1, 1))
observer.meta.CollectionManager.PutPartition(utils.CreateTestPartition(1, 1))
observer.meta.ReplicaManager.Put(utils.CreateTestReplica(1, 1, []int64{1, 2}))
segments := []*datapb.SegmentInfo{
{
ID: 2,
PartitionID: 1,
InsertChannel: "test-insert-channel",
},
}
channels := []*datapb.VchannelInfo{
{
CollectionID: 1,
ChannelName: "test-insert-channel",
},
}
schema := utils.CreateTestSchema()
suite.broker.EXPECT().DescribeCollection(mock.Anything, int64(1)).Return(&milvuspb.DescribeCollectionResponse{Schema: schema}, nil)
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return(
channels, segments, nil)
suite.broker.EXPECT().DescribeIndex(mock.Anything, mock.Anything).Return([]*indexpb.IndexInfo{
{IndexName: "test"},
}, nil)
observer.target.UpdateCollectionNextTarget(int64(1))
observer.dist.ChannelDistManager.Update(2, utils.CreateTestChannel(1, 2, 1, "test-insert-channel"))
observer.dist.LeaderViewManager.Update(2, utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{3: 2, 2: 2}, map[int64]*meta.Segment{}))
expectReqeustFunc := func(version int64) *querypb.SyncDistributionRequest {
return &querypb.SyncDistributionRequest{
Base: &commonpb.MsgBase{
MsgType: commonpb.MsgType_SyncDistribution,
},
CollectionID: 1,
ReplicaID: 1,
Channel: "test-insert-channel",
Actions: []*querypb.SyncAction{
{
Type: querypb.SyncType_Remove,
SegmentID: 3,
NodeID: 2,
},
},
Schema: schema,
LoadMeta: &querypb.LoadMetaInfo{
CollectionID: 1,
PartitionIDs: []int64{1},
},
Version: version,
IndexInfoList: []*indexpb.IndexInfo{{IndexName: "test"}},
}
}
called := atomic.NewBool(false)
suite.mockCluster.EXPECT().SyncDistribution(mock.Anything, int64(2), mock.AnythingOfType("*querypb.SyncDistributionRequest")).
Run(func(ctx context.Context, nodeID int64, req *querypb.SyncDistributionRequest) {
assert.ElementsMatch(suite.T(), []*querypb.SyncDistributionRequest{req},
[]*querypb.SyncDistributionRequest{expectReqeustFunc(req.GetVersion())})
called.Store(true)
}).
Return(&commonpb.Status{}, nil)
observer.Start()
suite.Eventually(func() bool {
return called.Load()
},
10*time.Second,
500*time.Millisecond,
)
}
func TestLeaderObserverSuite(t *testing.T) {
suite.Run(t, new(LeaderObserverTestSuite))
}

View File

@ -23,7 +23,7 @@ import (
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querycoordv2/checkers"
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/typeutil"
@ -73,7 +73,7 @@ func (s *Server) ActivateChecker(ctx context.Context, req *querypb.ActivateCheck
log.Warn("failed to activate checker", zap.Error(err))
return merr.Status(err), nil
}
if err := s.checkerController.Activate(checkers.CheckerType(req.CheckerID)); err != nil {
if err := s.checkerController.Activate(utils.CheckerType(req.CheckerID)); err != nil {
log.Warn("failed to activate checker", zap.Error(err))
return merr.Status(merr.WrapErrServiceInternal(err.Error())), nil
}
@ -87,7 +87,7 @@ func (s *Server) DeactivateChecker(ctx context.Context, req *querypb.DeactivateC
log.Warn("failed to deactivate checker", zap.Error(err))
return merr.Status(err), nil
}
if err := s.checkerController.Deactivate(checkers.CheckerType(req.CheckerID)); err != nil {
if err := s.checkerController.Deactivate(utils.CheckerType(req.CheckerID)); err != nil {
log.Warn("failed to deactivate checker", zap.Error(err))
return merr.Status(merr.WrapErrServiceInternal(err.Error())), nil
}

View File

@ -110,7 +110,6 @@ type Server struct {
// Observers
collectionObserver *observers.CollectionObserver
leaderObserver *observers.LeaderObserver
targetObserver *observers.TargetObserver
replicaObserver *observers.ReplicaObserver
resourceObserver *observers.ResourceObserver
@ -363,14 +362,6 @@ func (s *Server) initMeta() error {
func (s *Server) initObserver() {
log.Info("init observers")
s.leaderObserver = observers.NewLeaderObserver(
s.dist,
s.meta,
s.targetMgr,
s.broker,
s.cluster,
s.nodeMgr,
)
s.targetObserver = observers.NewTargetObserver(
s.meta,
s.targetMgr,
@ -383,7 +374,6 @@ func (s *Server) initObserver() {
s.meta,
s.targetMgr,
s.targetObserver,
s.leaderObserver,
s.checkerController,
)
@ -451,7 +441,6 @@ func (s *Server) startServerLoop() {
log.Info("start observers...")
s.collectionObserver.Start()
s.leaderObserver.Start()
s.targetObserver.Start()
s.replicaObserver.Start()
s.resourceObserver.Start()
@ -495,9 +484,6 @@ func (s *Server) Stop() error {
if s.collectionObserver != nil {
s.collectionObserver.Stop()
}
if s.leaderObserver != nil {
s.leaderObserver.Stop()
}
if s.targetObserver != nil {
s.targetObserver.Stop()
}

View File

@ -582,7 +582,6 @@ func (suite *ServerSuite) hackServer() {
suite.server.meta,
suite.server.targetMgr,
suite.server.targetObserver,
suite.server.leaderObserver,
suite.server.checkerController,
)

View File

@ -196,7 +196,6 @@ func (suite *ServiceSuite) SetupTest() {
suite.server.meta,
suite.server.targetMgr,
suite.targetObserver,
suite.server.leaderObserver,
&checkers.CheckerController{},
)

View File

@ -30,6 +30,7 @@ import (
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
"github.com/milvus-io/milvus/internal/querycoordv2/utils"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/util/commonpbutil"
"github.com/milvus-io/milvus/pkg/util/funcutil"
@ -112,6 +113,9 @@ func packLoadSegmentRequest(
loadScope = querypb.LoadScope_Index
}
if task.Source() == utils.LeaderChecker {
loadScope = querypb.LoadScope_Delta
}
// field mmap enabled if collection-level mmap enabled or the field mmap enabled
collectionMmapEnabled := common.IsMmapEnabled(collectionProperties...)
for _, field := range schema.GetFields() {

View File

@ -21,6 +21,36 @@ import (
"github.com/milvus-io/milvus/pkg/util/typeutil"
)
const (
SegmentCheckerName = "segment_checker"
ChannelCheckerName = "channel_checker"
BalanceCheckerName = "balance_checker"
IndexCheckerName = "index_checker"
LeaderCheckerName = "leader_checker"
)
type CheckerType int32
const (
ChannelChecker CheckerType = iota + 1
SegmentChecker
BalanceChecker
IndexChecker
LeaderChecker
)
var checkerNames = map[CheckerType]string{
SegmentChecker: SegmentCheckerName,
ChannelChecker: ChannelCheckerName,
BalanceChecker: BalanceCheckerName,
IndexChecker: IndexCheckerName,
LeaderChecker: LeaderCheckerName,
}
func (s CheckerType) String() string {
return checkerNames[s]
}
func FilterReleased[E interface{ GetCollectionID() int64 }](elems []E, collections []int64) []E {
collectionSet := typeutil.NewUniqueSet(collections...)
ret := make([]E, 0, len(elems))