fix: Search may return less result after qn recover (#36549)

issue: #36293 #36242
after qn recover, delegator may be loaded in new node, after all segment
has been loaded, delegator becomes serviceable. but delegator's target
version hasn't been synced, and if search/query comes, delegator will
use wrong target version to filter out a empty segment list, which
caused empty search result.

This pr will block delegator's serviceable status until target version
is synced

---------

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
This commit is contained in:
wei liu 2024-11-12 16:34:28 +08:00 committed by GitHub
parent b5b003551e
commit 266f8ef1f5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
18 changed files with 266 additions and 158 deletions

View File

@ -43,6 +43,7 @@ type ControllerImpl struct {
dist *meta.DistributionManager dist *meta.DistributionManager
targetMgr meta.TargetManagerInterface targetMgr meta.TargetManagerInterface
scheduler task.Scheduler scheduler task.Scheduler
syncTargetVersionFn TriggerUpdateTargetVersion
} }
func (dc *ControllerImpl) StartDistInstance(ctx context.Context, nodeID int64) { func (dc *ControllerImpl) StartDistInstance(ctx context.Context, nodeID int64) {
@ -52,7 +53,7 @@ func (dc *ControllerImpl) StartDistInstance(ctx context.Context, nodeID int64) {
log.Info("node has started", zap.Int64("nodeID", nodeID)) log.Info("node has started", zap.Int64("nodeID", nodeID))
return return
} }
h := newDistHandler(ctx, nodeID, dc.client, dc.nodeManager, dc.scheduler, dc.dist, dc.targetMgr) h := newDistHandler(ctx, nodeID, dc.client, dc.nodeManager, dc.scheduler, dc.dist, dc.targetMgr, dc.syncTargetVersionFn)
dc.handlers[nodeID] = h dc.handlers[nodeID] = h
} }
@ -100,6 +101,7 @@ func NewDistController(
dist *meta.DistributionManager, dist *meta.DistributionManager,
targetMgr meta.TargetManagerInterface, targetMgr meta.TargetManagerInterface,
scheduler task.Scheduler, scheduler task.Scheduler,
syncTargetVersionFn TriggerUpdateTargetVersion,
) *ControllerImpl { ) *ControllerImpl {
return &ControllerImpl{ return &ControllerImpl{
handlers: make(map[int64]*distHandler), handlers: make(map[int64]*distHandler),
@ -108,5 +110,6 @@ func NewDistController(
dist: dist, dist: dist,
targetMgr: targetMgr, targetMgr: targetMgr,
scheduler: scheduler, scheduler: scheduler,
syncTargetVersionFn: syncTargetVersionFn,
} }
} }

View File

@ -81,7 +81,8 @@ func (suite *DistControllerTestSuite) SetupTest() {
targetManager := meta.NewTargetManager(suite.broker, suite.meta) targetManager := meta.NewTargetManager(suite.broker, suite.meta)
suite.mockScheduler = task.NewMockScheduler(suite.T()) suite.mockScheduler = task.NewMockScheduler(suite.T())
suite.mockScheduler.EXPECT().GetExecutedFlag(mock.Anything).Return(nil).Maybe() suite.mockScheduler.EXPECT().GetExecutedFlag(mock.Anything).Return(nil).Maybe()
suite.controller = NewDistController(suite.mockCluster, suite.nodeMgr, distManager, targetManager, suite.mockScheduler) syncTargetVersionFn := func(collectionID int64) {}
suite.controller = NewDistController(suite.mockCluster, suite.nodeMgr, distManager, targetManager, suite.mockScheduler, syncTargetVersionFn)
} }
func (suite *DistControllerTestSuite) TearDownSuite() { func (suite *DistControllerTestSuite) TearDownSuite() {

View File

@ -18,6 +18,7 @@ package dist
import ( import (
"context" "context"
"fmt"
"sync" "sync"
"time" "time"
@ -39,6 +40,8 @@ import (
"github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/paramtable"
) )
type TriggerUpdateTargetVersion = func(collectionID int64)
type distHandler struct { type distHandler struct {
nodeID int64 nodeID int64
c chan struct{} c chan struct{}
@ -51,6 +54,8 @@ type distHandler struct {
mu sync.Mutex mu sync.Mutex
stopOnce sync.Once stopOnce sync.Once
lastUpdateTs int64 lastUpdateTs int64
syncTargetVersionFn TriggerUpdateTargetVersion
} }
func (dh *distHandler) start(ctx context.Context) { func (dh *distHandler) start(ctx context.Context) {
@ -222,12 +227,35 @@ func (dh *distHandler) updateLeaderView(resp *querypb.GetDataDistributionRespons
NumOfGrowingRows: lview.GetNumOfGrowingRows(), NumOfGrowingRows: lview.GetNumOfGrowingRows(),
PartitionStatsVersions: lview.PartitionStatsVersions, PartitionStatsVersions: lview.PartitionStatsVersions,
} }
// check leader serviceable
// todo by weiliu1031: serviceable status should be maintained by delegator, to avoid heavy check here
if err := utils.CheckLeaderAvailable(dh.nodeManager, dh.target, view); err != nil {
view.UnServiceableError = err
}
updates = append(updates, view) updates = append(updates, view)
// check leader serviceable
if err := utils.CheckDelegatorDataReady(dh.nodeManager, dh.target, view, meta.CurrentTarget); err != nil {
view.UnServiceableError = err
log.Info("leader is not available due to distribution not ready",
zap.Int64("collectionID", view.CollectionID),
zap.Int64("nodeID", view.ID),
zap.String("channel", view.Channel),
zap.Error(err))
continue
}
// if target version hasn't been synced, delegator will get empty readable segment list
// so shard leader should be unserviceable until target version is synced
currentTargetVersion := dh.target.GetCollectionTargetVersion(lview.GetCollection(), meta.CurrentTarget)
if lview.TargetVersion <= 0 {
err := merr.WrapErrServiceInternal(fmt.Sprintf("target version mismatch, collection: %d, channel: %s, current target version: %v, leader version: %v",
lview.GetCollection(), lview.GetChannel(), currentTargetVersion, lview.TargetVersion))
// segment and channel already loaded, trigger target observer to check target version
dh.syncTargetVersionFn(lview.GetCollection())
view.UnServiceableError = err
log.Info("leader is not available due to target version not ready",
zap.Int64("collectionID", view.CollectionID),
zap.Int64("nodeID", view.ID),
zap.String("channel", view.Channel),
zap.Error(err))
}
} }
dh.dist.LeaderViewManager.Update(resp.GetNodeID(), updates...) dh.dist.LeaderViewManager.Update(resp.GetNodeID(), updates...)
@ -273,6 +301,7 @@ func newDistHandler(
scheduler task.Scheduler, scheduler task.Scheduler,
dist *meta.DistributionManager, dist *meta.DistributionManager,
targetMgr meta.TargetManagerInterface, targetMgr meta.TargetManagerInterface,
syncTargetVersionFn TriggerUpdateTargetVersion,
) *distHandler { ) *distHandler {
h := &distHandler{ h := &distHandler{
nodeID: nodeID, nodeID: nodeID,
@ -282,6 +311,7 @@ func newDistHandler(
scheduler: scheduler, scheduler: scheduler,
dist: dist, dist: dist,
target: targetMgr, target: targetMgr,
syncTargetVersionFn: syncTargetVersionFn,
} }
h.wg.Add(1) h.wg.Add(1)
go h.start(ctx) go h.start(ctx)

View File

@ -68,6 +68,7 @@ func (suite *DistHandlerSuite) SetupSuite() {
suite.scheduler.EXPECT().GetExecutedFlag(mock.Anything).Return(suite.executedFlagChan).Maybe() suite.scheduler.EXPECT().GetExecutedFlag(mock.Anything).Return(suite.executedFlagChan).Maybe()
suite.target.EXPECT().GetSealedSegment(mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() suite.target.EXPECT().GetSealedSegment(mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe()
suite.target.EXPECT().GetDmChannel(mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() suite.target.EXPECT().GetDmChannel(mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe()
suite.target.EXPECT().GetCollectionTargetVersion(mock.Anything, mock.Anything).Return(1011).Maybe()
} }
func (suite *DistHandlerSuite) TestBasic() { func (suite *DistHandlerSuite) TestBasic() {
@ -107,12 +108,14 @@ func (suite *DistHandlerSuite) TestBasic() {
{ {
Collection: 1, Collection: 1,
Channel: "test-channel-1", Channel: "test-channel-1",
TargetVersion: 1011,
}, },
}, },
LastModifyTs: 1, LastModifyTs: 1,
}, nil) }, nil)
suite.handler = newDistHandler(suite.ctx, suite.nodeID, suite.client, suite.nodeManager, suite.scheduler, suite.dist, suite.target) syncTargetVersionFn := func(collectionID int64) {}
suite.handler = newDistHandler(suite.ctx, suite.nodeID, suite.client, suite.nodeManager, suite.scheduler, suite.dist, suite.target, syncTargetVersionFn)
defer suite.handler.stop() defer suite.handler.stop()
time.Sleep(3 * time.Second) time.Sleep(3 * time.Second)
@ -132,7 +135,8 @@ func (suite *DistHandlerSuite) TestGetDistributionFailed() {
})) }))
suite.client.EXPECT().GetDataDistribution(mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("fake error")) suite.client.EXPECT().GetDataDistribution(mock.Anything, mock.Anything, mock.Anything).Return(nil, errors.New("fake error"))
suite.handler = newDistHandler(suite.ctx, suite.nodeID, suite.client, suite.nodeManager, suite.scheduler, suite.dist, suite.target) syncTargetVersionFn := func(collectionID int64) {}
suite.handler = newDistHandler(suite.ctx, suite.nodeID, suite.client, suite.nodeManager, suite.scheduler, suite.dist, suite.target, syncTargetVersionFn)
defer suite.handler.stop() defer suite.handler.stop()
time.Sleep(3 * time.Second) time.Sleep(3 * time.Second)
@ -180,7 +184,8 @@ func (suite *DistHandlerSuite) TestForcePullDist() {
LastModifyTs: 1, LastModifyTs: 1,
}, nil) }, nil)
suite.executedFlagChan <- struct{}{} suite.executedFlagChan <- struct{}{}
suite.handler = newDistHandler(suite.ctx, suite.nodeID, suite.client, suite.nodeManager, suite.scheduler, suite.dist, suite.target) syncTargetVersionFn := func(collectionID int64) {}
suite.handler = newDistHandler(suite.ctx, suite.nodeID, suite.client, suite.nodeManager, suite.scheduler, suite.dist, suite.target, syncTargetVersionFn)
defer suite.handler.stop() defer suite.handler.stop()
time.Sleep(300 * time.Millisecond) time.Sleep(300 * time.Millisecond)

View File

@ -171,6 +171,7 @@ func (suite *JobSuite) SetupTest() {
suite.dist, suite.dist,
suite.broker, suite.broker,
suite.cluster, suite.cluster,
suite.nodeMgr,
) )
suite.targetObserver.Start() suite.targetObserver.Start()
suite.scheduler = NewScheduler() suite.scheduler = NewScheduler()

View File

@ -150,6 +150,7 @@ func (view *LeaderView) Clone() *LeaderView {
TargetVersion: view.TargetVersion, TargetVersion: view.TargetVersion,
NumOfGrowingRows: view.NumOfGrowingRows, NumOfGrowingRows: view.NumOfGrowingRows,
PartitionStatsVersions: view.PartitionStatsVersions, PartitionStatsVersions: view.PartitionStatsVersions,
UnServiceableError: view.UnServiceableError,
} }
} }

View File

@ -197,9 +197,7 @@ func (mgr *TargetManager) UpdateCollectionNextTarget(collectionID int64) error {
mgr.next.updateCollectionTarget(collectionID, allocatedTarget) mgr.next.updateCollectionTarget(collectionID, allocatedTarget)
log.Debug("finish to update next targets for collection", log.Debug("finish to update next targets for collection",
zap.Int64("collectionID", collectionID), zap.Int64("collectionID", collectionID),
zap.Int64s("PartitionIDs", partitionIDs), zap.Int64s("PartitionIDs", partitionIDs))
zap.Int64s("segments", allocatedTarget.GetAllSegmentIDs()),
zap.Strings("channels", allocatedTarget.GetAllDmChannelNames()))
return nil return nil
} }
@ -606,6 +604,7 @@ func (mgr *TargetManager) Recover(catalog metastore.QueryCoordCatalog) error {
zap.Int64("collectionID", t.GetCollectionID()), zap.Int64("collectionID", t.GetCollectionID()),
zap.Strings("channels", newTarget.GetAllDmChannelNames()), zap.Strings("channels", newTarget.GetAllDmChannelNames()),
zap.Int("segmentNum", len(newTarget.GetAllSegmentIDs())), zap.Int("segmentNum", len(newTarget.GetAllSegmentIDs())),
zap.Int64("version", newTarget.GetTargetVersion()),
) )
// clear target info in meta store // clear target info in meta store

View File

@ -71,6 +71,8 @@ type CollectionObserverSuite struct {
targetObserver *TargetObserver targetObserver *TargetObserver
checkerController *checkers.CheckerController checkerController *checkers.CheckerController
nodeMgr *session.NodeManager
// Test object // Test object
ob *CollectionObserver ob *CollectionObserver
} }
@ -191,8 +193,8 @@ func (suite *CollectionObserverSuite) SetupTest() {
// Dependencies // Dependencies
suite.dist = meta.NewDistributionManager() suite.dist = meta.NewDistributionManager()
nodeMgr := session.NewNodeManager() suite.nodeMgr = session.NewNodeManager()
suite.meta = meta.NewMeta(suite.idAllocator, suite.store, nodeMgr) suite.meta = meta.NewMeta(suite.idAllocator, suite.store, suite.nodeMgr)
suite.broker = meta.NewMockBroker(suite.T()) suite.broker = meta.NewMockBroker(suite.T())
suite.targetMgr = meta.NewTargetManager(suite.broker, suite.meta) suite.targetMgr = meta.NewTargetManager(suite.broker, suite.meta)
suite.cluster = session.NewMockCluster(suite.T()) suite.cluster = session.NewMockCluster(suite.T())
@ -201,6 +203,7 @@ func (suite *CollectionObserverSuite) SetupTest() {
suite.dist, suite.dist,
suite.broker, suite.broker,
suite.cluster, suite.cluster,
suite.nodeMgr,
) )
suite.checkerController = &checkers.CheckerController{} suite.checkerController = &checkers.CheckerController{}
@ -223,6 +226,16 @@ func (suite *CollectionObserverSuite) SetupTest() {
suite.targetObserver.Start() suite.targetObserver.Start()
suite.ob.Start() suite.ob.Start()
suite.loadAll() suite.loadAll()
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 1,
}))
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 2,
}))
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 3,
}))
} }
func (suite *CollectionObserverSuite) TearDownTest() { func (suite *CollectionObserverSuite) TearDownTest() {
@ -248,12 +261,19 @@ func (suite *CollectionObserverSuite) TestObserve() {
Channel: "100-dmc0", Channel: "100-dmc0",
Segments: map[int64]*querypb.SegmentDist{1: {NodeID: 1, Version: 0}}, Segments: map[int64]*querypb.SegmentDist{1: {NodeID: 1, Version: 0}},
}) })
view := &meta.LeaderView{
ID: 2,
CollectionID: 103,
Channel: "103-dmc0",
Segments: make(map[int64]*querypb.SegmentDist),
}
suite.dist.LeaderViewManager.Update(2, &meta.LeaderView{ suite.dist.LeaderViewManager.Update(2, &meta.LeaderView{
ID: 2, ID: 2,
CollectionID: 100, CollectionID: 100,
Channel: "100-dmc1", Channel: "100-dmc1",
Segments: map[int64]*querypb.SegmentDist{2: {NodeID: 2, Version: 0}}, Segments: map[int64]*querypb.SegmentDist{2: {NodeID: 2, Version: 0}},
}) }, view)
view1 := &meta.LeaderView{ view1 := &meta.LeaderView{
ID: 3, ID: 3,
CollectionID: 102, CollectionID: 102,
@ -265,7 +285,7 @@ func (suite *CollectionObserverSuite) TestObserve() {
suite.True(ok) suite.True(ok)
view2 := &meta.LeaderView{ view2 := &meta.LeaderView{
ID: 3, ID: 3,
CollectionID: 13, CollectionID: 103,
Channel: "103-dmc0", Channel: "103-dmc0",
Segments: make(map[int64]*querypb.SegmentDist), Segments: make(map[int64]*querypb.SegmentDist),
} }
@ -273,9 +293,16 @@ func (suite *CollectionObserverSuite) TestObserve() {
view2.Segments[segment.GetID()] = &querypb.SegmentDist{ view2.Segments[segment.GetID()] = &querypb.SegmentDist{
NodeID: 3, Version: 0, NodeID: 3, Version: 0,
} }
view.Segments[segment.GetID()] = &querypb.SegmentDist{
NodeID: 2, Version: 0,
}
} }
suite.dist.LeaderViewManager.Update(3, view1, view2) suite.dist.LeaderViewManager.Update(3, view1, view2)
suite.broker.EXPECT().DescribeCollection(mock.Anything, mock.Anything).Return(nil, nil).Maybe()
suite.broker.EXPECT().ListIndexes(mock.Anything, mock.Anything).Return(nil, nil).Maybe()
suite.cluster.EXPECT().SyncDistribution(mock.Anything, mock.Anything, mock.Anything).Return(merr.Success(), nil).Maybe()
suite.Eventually(func() bool { suite.Eventually(func() bool {
return suite.isCollectionLoadedContinue(suite.collections[2], time) return suite.isCollectionLoadedContinue(suite.collections[2], time)
}, timeout-1, timeout/10) }, timeout-1, timeout/10)

View File

@ -79,6 +79,7 @@ type TargetObserver struct {
distMgr *meta.DistributionManager distMgr *meta.DistributionManager
broker meta.Broker broker meta.Broker
cluster session.Cluster cluster session.Cluster
nodeMgr *session.NodeManager
initChan chan initRequest initChan chan initRequest
// nextTargetLastUpdate map[int64]time.Time // nextTargetLastUpdate map[int64]time.Time
@ -104,6 +105,7 @@ func NewTargetObserver(
distMgr *meta.DistributionManager, distMgr *meta.DistributionManager,
broker meta.Broker, broker meta.Broker,
cluster session.Cluster, cluster session.Cluster,
nodeMgr *session.NodeManager,
) *TargetObserver { ) *TargetObserver {
result := &TargetObserver{ result := &TargetObserver{
meta: meta, meta: meta,
@ -111,6 +113,7 @@ func NewTargetObserver(
distMgr: distMgr, distMgr: distMgr,
broker: broker, broker: broker,
cluster: cluster, cluster: cluster,
nodeMgr: nodeMgr,
nextTargetLastUpdate: typeutil.NewConcurrentMap[int64, time.Time](), nextTargetLastUpdate: typeutil.NewConcurrentMap[int64, time.Time](),
updateChan: make(chan targetUpdateRequest, 10), updateChan: make(chan targetUpdateRequest, 10),
readyNotifiers: make(map[int64][]chan struct{}), readyNotifiers: make(map[int64][]chan struct{}),
@ -234,6 +237,10 @@ func (ob *TargetObserver) Check(ctx context.Context, collectionID int64, partiti
return result return result
} }
func (ob *TargetObserver) TriggerUpdateCurrentTarget(collectionID int64) {
ob.loadingDispatcher.AddTask(collectionID)
}
func (ob *TargetObserver) check(ctx context.Context, collectionID int64) { func (ob *TargetObserver) check(ctx context.Context, collectionID int64) {
ob.keylocks.Lock(collectionID) ob.keylocks.Lock(collectionID)
defer ob.keylocks.Unlock(collectionID) defer ob.keylocks.Unlock(collectionID)
@ -374,58 +381,37 @@ func (ob *TargetObserver) shouldUpdateCurrentTarget(ctx context.Context, collect
return false return false
} }
for _, channel := range channelNames { collectionReadyLeaders := make([]*meta.LeaderView, 0)
views := ob.distMgr.LeaderViewManager.GetByFilter(meta.WithChannelName2LeaderView(channel.GetChannelName())) for channel := range channelNames {
nodes := lo.Map(views, func(v *meta.LeaderView, _ int) int64 { return v.ID }) channelReadyLeaders := lo.Filter(ob.distMgr.LeaderViewManager.GetByFilter(meta.WithChannelName2LeaderView(channel)), func(leader *meta.LeaderView, _ int) bool {
return utils.CheckDelegatorDataReady(ob.nodeMgr, ob.targetMgr, leader, meta.NextTarget) == nil
})
collectionReadyLeaders = append(collectionReadyLeaders, channelReadyLeaders...)
nodes := lo.Map(channelReadyLeaders, func(view *meta.LeaderView, _ int) int64 { return view.ID })
group := utils.GroupNodesByReplica(ob.meta.ReplicaManager, collectionID, nodes) group := utils.GroupNodesByReplica(ob.meta.ReplicaManager, collectionID, nodes)
if int32(len(group)) < replicaNum { if int32(len(group)) < replicaNum {
log.RatedInfo(10, "channel not ready", log.RatedInfo(10, "channel not ready",
zap.Int("readyReplicaNum", len(group)), zap.Int("readyReplicaNum", len(channelReadyLeaders)),
zap.String("channelName", channel.GetChannelName()), zap.String("channelName", channel),
) )
return false return false
} }
} }
// and last check historical segment
SealedSegments := ob.targetMgr.GetSealedSegmentsByCollection(collectionID, meta.NextTarget)
for _, segment := range SealedSegments {
views := ob.distMgr.LeaderViewManager.GetByFilter(meta.WithSegment2LeaderView(segment.GetID(), false))
nodes := lo.Map(views, func(view *meta.LeaderView, _ int) int64 { return view.ID })
group := utils.GroupNodesByReplica(ob.meta.ReplicaManager, collectionID, nodes)
if int32(len(group)) < replicaNum {
log.RatedInfo(10, "segment not ready",
zap.Int("readyReplicaNum", len(group)),
zap.Int64("segmentID", segment.GetID()),
)
return false
}
}
replicas := ob.meta.ReplicaManager.GetByCollection(collectionID)
actions := make([]*querypb.SyncAction, 0, 1)
var collectionInfo *milvuspb.DescribeCollectionResponse var collectionInfo *milvuspb.DescribeCollectionResponse
var partitions []int64 var partitions []int64
var indexInfo []*indexpb.IndexInfo var indexInfo []*indexpb.IndexInfo
var err error var err error
for _, replica := range replicas { newVersion := ob.targetMgr.GetCollectionTargetVersion(collectionID, meta.NextTarget)
leaders := ob.distMgr.ChannelDistManager.GetShardLeadersByReplica(replica) for _, leader := range collectionReadyLeaders {
for ch, leaderID := range leaders { updateVersionAction := ob.checkNeedUpdateTargetVersion(ctx, leader, newVersion)
actions = actions[:0] if updateVersionAction == nil {
leaderView := ob.distMgr.LeaderViewManager.GetLeaderShardView(leaderID, ch)
if leaderView == nil {
log.RatedInfo(10, "leader view not ready",
zap.Int64("nodeID", leaderID),
zap.String("channel", ch),
)
continue continue
} }
updateVersionAction := ob.checkNeedUpdateTargetVersion(ctx, leaderView) replica := ob.meta.ReplicaManager.GetByCollectionAndNode(collectionID, leader.ID)
if updateVersionAction != nil { if replica == nil {
actions = append(actions, updateVersionAction) log.Warn("replica not found", zap.Int64("nodeID", leader.ID), zap.Int64("collectionID", collectionID))
}
if len(actions) == 0 {
continue continue
} }
@ -451,12 +437,10 @@ func (ob *TargetObserver) shouldUpdateCurrentTarget(ctx context.Context, collect
} }
} }
if !ob.sync(ctx, replica, leaderView, actions, collectionInfo, partitions, indexInfo) { if !ob.sync(ctx, replica, leader, []*querypb.SyncAction{updateVersionAction}, collectionInfo, partitions, indexInfo) {
return false return false
} }
} }
}
return true return true
} }
@ -509,29 +493,8 @@ func (ob *TargetObserver) sync(ctx context.Context, replica *meta.Replica, leade
return true return true
} }
func (ob *TargetObserver) checkCollectionLeaderVersionIsCurrent(ctx context.Context, collectionID int64) bool { func (ob *TargetObserver) checkNeedUpdateTargetVersion(ctx context.Context, leaderView *meta.LeaderView, targetVersion int64) *querypb.SyncAction {
replicas := ob.meta.ReplicaManager.GetByCollection(collectionID)
for _, replica := range replicas {
leaders := ob.distMgr.ChannelDistManager.GetShardLeadersByReplica(replica)
for ch, leaderID := range leaders {
leaderView := ob.distMgr.LeaderViewManager.GetLeaderShardView(leaderID, ch)
if leaderView == nil {
return false
}
action := ob.checkNeedUpdateTargetVersion(ctx, leaderView)
if action != nil {
return false
}
}
}
return true
}
func (ob *TargetObserver) checkNeedUpdateTargetVersion(ctx context.Context, leaderView *meta.LeaderView) *querypb.SyncAction {
log.Ctx(ctx).WithRateGroup("qcv2.LeaderObserver", 1, 60) log.Ctx(ctx).WithRateGroup("qcv2.LeaderObserver", 1, 60)
targetVersion := ob.targetMgr.GetCollectionTargetVersion(leaderView.CollectionID, meta.NextTarget)
if targetVersion <= leaderView.TargetVersion { if targetVersion <= leaderView.TargetVersion {
return nil return nil
} }

View File

@ -36,6 +36,7 @@ import (
"github.com/milvus-io/milvus/pkg/common" "github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/kv" "github.com/milvus-io/milvus/pkg/kv"
"github.com/milvus-io/milvus/pkg/util/etcd" "github.com/milvus-io/milvus/pkg/util/etcd"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/paramtable" "github.com/milvus-io/milvus/pkg/util/paramtable"
) )
@ -78,15 +79,22 @@ func (suite *TargetObserverSuite) SetupTest() {
suite.kv = etcdkv.NewEtcdKV(cli, config.MetaRootPath.GetValue()) suite.kv = etcdkv.NewEtcdKV(cli, config.MetaRootPath.GetValue())
// meta // meta
nodeMgr := session.NewNodeManager()
nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 1,
}))
nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 2,
}))
store := querycoord.NewCatalog(suite.kv) store := querycoord.NewCatalog(suite.kv)
idAllocator := RandomIncrementIDAllocator() idAllocator := RandomIncrementIDAllocator()
suite.meta = meta.NewMeta(idAllocator, store, session.NewNodeManager()) suite.meta = meta.NewMeta(idAllocator, store, nodeMgr)
suite.broker = meta.NewMockBroker(suite.T()) suite.broker = meta.NewMockBroker(suite.T())
suite.targetMgr = meta.NewTargetManager(suite.broker, suite.meta) suite.targetMgr = meta.NewTargetManager(suite.broker, suite.meta)
suite.distMgr = meta.NewDistributionManager() suite.distMgr = meta.NewDistributionManager()
suite.cluster = session.NewMockCluster(suite.T()) suite.cluster = session.NewMockCluster(suite.T())
suite.observer = NewTargetObserver(suite.meta, suite.targetMgr, suite.distMgr, suite.broker, suite.cluster) suite.observer = NewTargetObserver(suite.meta, suite.targetMgr, suite.distMgr, suite.broker, suite.cluster, nodeMgr)
suite.collectionID = int64(1000) suite.collectionID = int64(1000)
suite.partitionID = int64(100) suite.partitionID = int64(100)
@ -127,6 +135,7 @@ func (suite *TargetObserverSuite) SetupTest() {
} }
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, mock.Anything).Return(suite.nextTargetChannels, suite.nextTargetSegments, nil) suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, mock.Anything).Return(suite.nextTargetChannels, suite.nextTargetSegments, nil)
suite.broker.EXPECT().DescribeCollection(mock.Anything, mock.Anything).Return(nil, nil).Maybe()
suite.observer.Start() suite.observer.Start()
} }
@ -173,6 +182,7 @@ func (suite *TargetObserverSuite) TestTriggerUpdateTarget() {
suite.broker.EXPECT(). suite.broker.EXPECT().
GetRecoveryInfoV2(mock.Anything, mock.Anything). GetRecoveryInfoV2(mock.Anything, mock.Anything).
Return(suite.nextTargetChannels, suite.nextTargetSegments, nil) Return(suite.nextTargetChannels, suite.nextTargetSegments, nil)
suite.Eventually(func() bool { suite.Eventually(func() bool {
return len(suite.targetMgr.GetSealedSegmentsByCollection(suite.collectionID, meta.NextTarget)) == 3 && return len(suite.targetMgr.GetSealedSegmentsByCollection(suite.collectionID, meta.NextTarget)) == 3 &&
len(suite.targetMgr.GetDmChannelsByCollection(suite.collectionID, meta.NextTarget)) == 2 len(suite.targetMgr.GetDmChannelsByCollection(suite.collectionID, meta.NextTarget)) == 2
@ -203,6 +213,10 @@ func (suite *TargetObserverSuite) TestTriggerUpdateTarget() {
}, },
) )
suite.broker.EXPECT().DescribeCollection(mock.Anything, mock.Anything).Return(nil, nil).Maybe()
suite.broker.EXPECT().ListIndexes(mock.Anything, mock.Anything).Return(nil, nil).Maybe()
suite.cluster.EXPECT().SyncDistribution(mock.Anything, mock.Anything, mock.Anything).Return(merr.Success(), nil).Maybe()
// Able to update current if it's not empty // Able to update current if it's not empty
suite.Eventually(func() bool { suite.Eventually(func() bool {
isReady := false isReady := false
@ -274,7 +288,8 @@ func (suite *TargetObserverCheckSuite) SetupTest() {
// meta // meta
store := querycoord.NewCatalog(suite.kv) store := querycoord.NewCatalog(suite.kv)
idAllocator := RandomIncrementIDAllocator() idAllocator := RandomIncrementIDAllocator()
suite.meta = meta.NewMeta(idAllocator, store, session.NewNodeManager()) nodeMgr := session.NewNodeManager()
suite.meta = meta.NewMeta(idAllocator, store, nodeMgr)
suite.broker = meta.NewMockBroker(suite.T()) suite.broker = meta.NewMockBroker(suite.T())
suite.targetMgr = meta.NewTargetManager(suite.broker, suite.meta) suite.targetMgr = meta.NewTargetManager(suite.broker, suite.meta)
@ -286,6 +301,7 @@ func (suite *TargetObserverCheckSuite) SetupTest() {
suite.distMgr, suite.distMgr,
suite.broker, suite.broker,
suite.cluster, suite.cluster,
nodeMgr,
) )
suite.collectionID = int64(1000) suite.collectionID = int64(1000)
suite.partitionID = int64(100) suite.partitionID = int64(100)

View File

@ -109,6 +109,7 @@ func (suite *OpsServiceSuite) SetupTest() {
suite.dist, suite.dist,
suite.broker, suite.broker,
suite.cluster, suite.cluster,
suite.nodeMgr,
) )
suite.cluster = session.NewMockCluster(suite.T()) suite.cluster = session.NewMockCluster(suite.T())
suite.jobScheduler = job.NewScheduler() suite.jobScheduler = job.NewScheduler()

View File

@ -337,16 +337,6 @@ func (s *Server) initQueryCoord() error {
s.proxyWatcher.DelSessionFunc(s.proxyClientManager.DelProxyClient) s.proxyWatcher.DelSessionFunc(s.proxyClientManager.DelProxyClient)
log.Info("init proxy manager done") log.Info("init proxy manager done")
// Init heartbeat
log.Info("init dist controller")
s.distController = dist.NewDistController(
s.cluster,
s.nodeMgr,
s.dist,
s.targetMgr,
s.taskScheduler,
)
// Init checker controller // Init checker controller
log.Info("init checker controller") log.Info("init checker controller")
s.getBalancerFunc = func() balance.Balance { s.getBalancerFunc = func() balance.Balance {
@ -392,6 +382,20 @@ func (s *Server) initQueryCoord() error {
// Init observers // Init observers
s.initObserver() s.initObserver()
// Init heartbeat
syncTargetVersionFn := func(collectionID int64) {
s.targetObserver.TriggerUpdateCurrentTarget(collectionID)
}
log.Info("init dist controller")
s.distController = dist.NewDistController(
s.cluster,
s.nodeMgr,
s.dist,
s.targetMgr,
s.taskScheduler,
syncTargetVersionFn,
)
// Init load status cache // Init load status cache
meta.GlobalFailedLoadCache = meta.NewFailedLoadCache() meta.GlobalFailedLoadCache = meta.NewFailedLoadCache()
@ -460,6 +464,7 @@ func (s *Server) initObserver() {
s.dist, s.dist,
s.broker, s.broker,
s.cluster, s.cluster,
s.nodeMgr,
) )
s.collectionObserver = observers.NewCollectionObserver( s.collectionObserver = observers.NewCollectionObserver(
s.dist, s.dist,

View File

@ -45,6 +45,7 @@ import (
"github.com/milvus-io/milvus/internal/querycoordv2/session" "github.com/milvus-io/milvus/internal/querycoordv2/session"
"github.com/milvus-io/milvus/internal/querycoordv2/task" "github.com/milvus-io/milvus/internal/querycoordv2/task"
"github.com/milvus-io/milvus/internal/util/sessionutil" "github.com/milvus-io/milvus/internal/util/sessionutil"
"github.com/milvus-io/milvus/pkg/common"
"github.com/milvus-io/milvus/pkg/log" "github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/commonpbutil" "github.com/milvus-io/milvus/pkg/util/commonpbutil"
"github.com/milvus-io/milvus/pkg/util/etcd" "github.com/milvus-io/milvus/pkg/util/etcd"
@ -560,12 +561,17 @@ func (suite *ServerSuite) hackServer() {
suite.server.cluster, suite.server.cluster,
suite.server.nodeMgr, suite.server.nodeMgr,
) )
syncTargetVersionFn := func(collectionID int64) {
suite.server.targetObserver.Check(context.Background(), collectionID, common.AllPartitionsID)
}
suite.server.distController = dist.NewDistController( suite.server.distController = dist.NewDistController(
suite.server.cluster, suite.server.cluster,
suite.server.nodeMgr, suite.server.nodeMgr,
suite.server.dist, suite.server.dist,
suite.server.targetMgr, suite.server.targetMgr,
suite.server.taskScheduler, suite.server.taskScheduler,
syncTargetVersionFn,
) )
suite.server.checkerController = checkers.NewCheckerController( suite.server.checkerController = checkers.NewCheckerController(
suite.server.meta, suite.server.meta,
@ -582,6 +588,7 @@ func (suite *ServerSuite) hackServer() {
suite.server.dist, suite.server.dist,
suite.broker, suite.broker,
suite.server.cluster, suite.server.cluster,
suite.server.nodeMgr,
) )
suite.server.collectionObserver = observers.NewCollectionObserver( suite.server.collectionObserver = observers.NewCollectionObserver(
suite.server.dist, suite.server.dist,

View File

@ -154,6 +154,7 @@ func (s *Server) ShowPartitions(ctx context.Context, req *querypb.ShowPartitions
return partition.GetPartitionID() return partition.GetPartitionID()
}) })
} }
for _, partitionID := range partitions { for _, partitionID := range partitions {
percentage := s.meta.GetPartitionLoadPercentage(partitionID) percentage := s.meta.GetPartitionLoadPercentage(partitionID)
if percentage < 0 { if percentage < 0 {
@ -172,6 +173,7 @@ func (s *Server) ShowPartitions(ctx context.Context, req *querypb.ShowPartitions
Status: merr.Status(err), Status: merr.Status(err),
}, nil }, nil
} }
percentages = append(percentages, int64(percentage)) percentages = append(percentages, int64(percentage))
} }

View File

@ -157,6 +157,7 @@ func (suite *ServiceSuite) SetupTest() {
suite.dist, suite.dist,
suite.broker, suite.broker,
suite.cluster, suite.cluster,
suite.nodeMgr,
) )
suite.targetObserver.Start() suite.targetObserver.Start()
for _, node := range suite.nodes { for _, node := range suite.nodes {

View File

@ -45,7 +45,7 @@ func CheckNodeAvailable(nodeID int64, info *session.NodeInfo) error {
// 2. All QueryNodes in the distribution are online // 2. All QueryNodes in the distribution are online
// 3. The last heartbeat response time is within HeartbeatAvailableInterval for all QueryNodes(include leader) in the distribution // 3. The last heartbeat response time is within HeartbeatAvailableInterval for all QueryNodes(include leader) in the distribution
// 4. All segments of the shard in target should be in the distribution // 4. All segments of the shard in target should be in the distribution
func CheckLeaderAvailable(nodeMgr *session.NodeManager, targetMgr meta.TargetManagerInterface, leader *meta.LeaderView) error { func CheckDelegatorDataReady(nodeMgr *session.NodeManager, targetMgr meta.TargetManagerInterface, leader *meta.LeaderView, scope int32) error {
log := log.Ctx(context.TODO()). log := log.Ctx(context.TODO()).
WithRateGroup("utils.CheckLeaderAvailable", 1, 60). WithRateGroup("utils.CheckLeaderAvailable", 1, 60).
With(zap.Int64("leaderID", leader.ID)) With(zap.Int64("leaderID", leader.ID))
@ -68,7 +68,7 @@ func CheckLeaderAvailable(nodeMgr *session.NodeManager, targetMgr meta.TargetMan
return err return err
} }
} }
segmentDist := targetMgr.GetSealedSegmentsByChannel(leader.CollectionID, leader.Channel, meta.CurrentTarget) segmentDist := targetMgr.GetSealedSegmentsByChannel(leader.CollectionID, leader.Channel, scope)
// Check whether segments are fully loaded // Check whether segments are fully loaded
for segmentID, info := range segmentDist { for segmentID, info := range segmentDist {
_, exist := leader.Segments[segmentID] _, exist := leader.Segments[segmentID]

View File

@ -55,6 +55,7 @@ func (suite *UtilTestSuite) TestCheckLeaderAvaliable() {
ID: 1, ID: 1,
Channel: "test", Channel: "test",
Segments: map[int64]*querypb.SegmentDist{2: {NodeID: 2}}, Segments: map[int64]*querypb.SegmentDist{2: {NodeID: 2}},
TargetVersion: 1011,
} }
mockTargetManager := meta.NewMockTargetManager(suite.T()) mockTargetManager := meta.NewMockTargetManager(suite.T())
@ -64,9 +65,10 @@ func (suite *UtilTestSuite) TestCheckLeaderAvaliable() {
InsertChannel: "test", InsertChannel: "test",
}, },
}).Maybe() }).Maybe()
mockTargetManager.EXPECT().GetCollectionTargetVersion(mock.Anything, mock.Anything).Return(1011).Maybe()
suite.setNodeAvailable(1, 2) suite.setNodeAvailable(1, 2)
err := CheckLeaderAvailable(suite.nodeMgr, mockTargetManager, leadview) err := CheckDelegatorDataReady(suite.nodeMgr, mockTargetManager, leadview, meta.CurrentTarget)
suite.NoError(err) suite.NoError(err)
} }
@ -76,6 +78,7 @@ func (suite *UtilTestSuite) TestCheckLeaderAvaliableFailed() {
ID: 1, ID: 1,
Channel: "test", Channel: "test",
Segments: map[int64]*querypb.SegmentDist{2: {NodeID: 2}}, Segments: map[int64]*querypb.SegmentDist{2: {NodeID: 2}},
TargetVersion: 1011,
} }
mockTargetManager := meta.NewMockTargetManager(suite.T()) mockTargetManager := meta.NewMockTargetManager(suite.T())
mockTargetManager.EXPECT().GetSealedSegmentsByChannel(mock.Anything, mock.Anything, mock.Anything).Return(map[int64]*datapb.SegmentInfo{ mockTargetManager.EXPECT().GetSealedSegmentsByChannel(mock.Anything, mock.Anything, mock.Anything).Return(map[int64]*datapb.SegmentInfo{
@ -84,18 +87,19 @@ func (suite *UtilTestSuite) TestCheckLeaderAvaliableFailed() {
InsertChannel: "test", InsertChannel: "test",
}, },
}).Maybe() }).Maybe()
mockTargetManager.EXPECT().GetCollectionTargetVersion(mock.Anything, mock.Anything).Return(1011).Maybe()
// leader nodeID=1 not available // leader nodeID=1 not available
suite.setNodeAvailable(2) suite.setNodeAvailable(2)
err := CheckLeaderAvailable(suite.nodeMgr, mockTargetManager, leadview) err := CheckDelegatorDataReady(suite.nodeMgr, mockTargetManager, leadview, meta.CurrentTarget)
suite.Error(err) suite.Error(err)
suite.nodeMgr = session.NewNodeManager()
}) })
suite.Run("shard worker not available", func() { suite.Run("shard worker not available", func() {
leadview := &meta.LeaderView{ leadview := &meta.LeaderView{
ID: 1, ID: 11111,
Channel: "test", Channel: "test",
Segments: map[int64]*querypb.SegmentDist{2: {NodeID: 2}}, Segments: map[int64]*querypb.SegmentDist{2: {NodeID: 2}},
TargetVersion: 1011,
} }
mockTargetManager := meta.NewMockTargetManager(suite.T()) mockTargetManager := meta.NewMockTargetManager(suite.T())
@ -105,14 +109,35 @@ func (suite *UtilTestSuite) TestCheckLeaderAvaliableFailed() {
InsertChannel: "test", InsertChannel: "test",
}, },
}).Maybe() }).Maybe()
mockTargetManager.EXPECT().GetCollectionTargetVersion(mock.Anything, mock.Anything).Return(1011).Maybe()
// leader nodeID=2 not available // leader nodeID=2 not available
suite.setNodeAvailable(1) suite.setNodeAvailable(1)
err := CheckLeaderAvailable(suite.nodeMgr, mockTargetManager, leadview) err := CheckDelegatorDataReady(suite.nodeMgr, mockTargetManager, leadview, meta.CurrentTarget)
suite.Error(err) suite.Error(err)
suite.nodeMgr = session.NewNodeManager()
}) })
suite.Run("segment lacks", func() { suite.Run("segment lacks", func() {
leadview := &meta.LeaderView{
ID: 1,
Channel: "test",
Segments: map[int64]*querypb.SegmentDist{2: {NodeID: 2}},
TargetVersion: 1011,
}
mockTargetManager := meta.NewMockTargetManager(suite.T())
mockTargetManager.EXPECT().GetSealedSegmentsByChannel(mock.Anything, mock.Anything, mock.Anything).Return(map[int64]*datapb.SegmentInfo{
// target segmentID=1 not in leadView
1: {
ID: 1,
InsertChannel: "test",
},
}).Maybe()
mockTargetManager.EXPECT().GetCollectionTargetVersion(mock.Anything, mock.Anything).Return(1011).Maybe()
suite.setNodeAvailable(1, 2)
err := CheckDelegatorDataReady(suite.nodeMgr, mockTargetManager, leadview, meta.CurrentTarget)
suite.Error(err)
})
suite.Run("target version not synced", func() {
leadview := &meta.LeaderView{ leadview := &meta.LeaderView{
ID: 1, ID: 1,
Channel: "test", Channel: "test",
@ -126,10 +151,10 @@ func (suite *UtilTestSuite) TestCheckLeaderAvaliableFailed() {
InsertChannel: "test", InsertChannel: "test",
}, },
}).Maybe() }).Maybe()
mockTargetManager.EXPECT().GetCollectionTargetVersion(mock.Anything, mock.Anything).Return(1011).Maybe()
suite.setNodeAvailable(1, 2) suite.setNodeAvailable(1, 2)
err := CheckLeaderAvailable(suite.nodeMgr, mockTargetManager, leadview) err := CheckDelegatorDataReady(suite.nodeMgr, mockTargetManager, leadview, meta.CurrentTarget)
suite.Error(err) suite.Error(err)
suite.nodeMgr = session.NewNodeManager()
}) })
} }

View File

@ -760,7 +760,14 @@ func (s *LoadTestSuite) TestDynamicUpdateLoadConfigs_WithRGLackOfNode() {
s.Len(resp.GetResourceGroups(), rgNum+2) s.Len(resp.GetResourceGroups(), rgNum+2)
// test load collection with dynamic update // test load collection with dynamic update
s.loadCollection(collectionName, dbName, 3, rgs[:3]) loadStatus, err := s.Cluster.Proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{
DbName: dbName,
CollectionName: collectionName,
ReplicaNumber: int32(3),
ResourceGroups: rgs[:3],
})
s.NoError(err)
s.True(merr.Ok(loadStatus))
s.Eventually(func() bool { s.Eventually(func() bool {
resp3, err := s.Cluster.Proxy.GetReplicas(ctx, &milvuspb.GetReplicasRequest{ resp3, err := s.Cluster.Proxy.GetReplicas(ctx, &milvuspb.GetReplicasRequest{
DbName: dbName, DbName: dbName,
@ -771,7 +778,14 @@ func (s *LoadTestSuite) TestDynamicUpdateLoadConfigs_WithRGLackOfNode() {
return len(resp3.GetReplicas()) == 3 return len(resp3.GetReplicas()) == 3
}, 30*time.Second, 1*time.Second) }, 30*time.Second, 1*time.Second)
s.loadCollection(collectionName, dbName, 2, rgs[3:]) loadStatus, err = s.Cluster.Proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{
DbName: dbName,
CollectionName: collectionName,
ReplicaNumber: int32(2),
ResourceGroups: rgs[3:],
})
s.NoError(err)
s.True(merr.Ok(loadStatus))
s.Eventually(func() bool { s.Eventually(func() bool {
resp3, err := s.Cluster.Proxy.GetReplicas(ctx, &milvuspb.GetReplicasRequest{ resp3, err := s.Cluster.Proxy.GetReplicas(ctx, &milvuspb.GetReplicasRequest{
DbName: dbName, DbName: dbName,
@ -783,7 +797,14 @@ func (s *LoadTestSuite) TestDynamicUpdateLoadConfigs_WithRGLackOfNode() {
}, 30*time.Second, 1*time.Second) }, 30*time.Second, 1*time.Second)
// test load collection with dynamic update // test load collection with dynamic update
s.loadCollection(collectionName, dbName, 5, rgs) loadStatus, err = s.Cluster.Proxy.LoadCollection(ctx, &milvuspb.LoadCollectionRequest{
DbName: dbName,
CollectionName: collectionName,
ReplicaNumber: int32(5),
ResourceGroups: rgs,
})
s.NoError(err)
s.True(merr.Ok(loadStatus))
s.Eventually(func() bool { s.Eventually(func() bool {
resp3, err := s.Cluster.Proxy.GetReplicas(ctx, &milvuspb.GetReplicasRequest{ resp3, err := s.Cluster.Proxy.GetReplicas(ctx, &milvuspb.GetReplicasRequest{
DbName: dbName, DbName: dbName,