mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-02 11:59:00 +08:00
issue: #35087 pr: #35197 after qc restarts, and target is not ready yet, if dist_handler try to update segment dist, it will set legacy level to l0 segment, which may cause l0 segment be moved to other node, cause search/query failed. Signed-off-by: Wei Liu <wei.liu@zilliz.com>
This commit is contained in:
parent
756922ebec
commit
11578772ef
@ -626,6 +626,7 @@ message SegmentVersionInfo {
|
||||
int64 version = 5;
|
||||
uint64 last_delta_timestamp = 6;
|
||||
map<int64, FieldIndexInfo> index_info = 7;
|
||||
data.SegmentLevel level = 8;
|
||||
}
|
||||
|
||||
message ChannelVersionInfo {
|
||||
|
@ -23,7 +23,6 @@ import (
|
||||
"github.com/samber/lo"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/session"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/task"
|
||||
@ -148,9 +147,7 @@ func (b *ChannelLevelScoreBalancer) genStoppingSegmentPlan(replica *meta.Replica
|
||||
for _, nodeID := range offlineNodes {
|
||||
dist := b.dist.SegmentDistManager.GetByFilter(meta.WithCollectionID(replica.GetCollectionID()), meta.WithNodeID(nodeID), meta.WithChannel(channelName))
|
||||
segments := lo.Filter(dist, func(segment *meta.Segment, _ int) bool {
|
||||
return b.targetMgr.GetSealedSegment(segment.GetCollectionID(), segment.GetID(), meta.CurrentTarget) != nil &&
|
||||
b.targetMgr.GetSealedSegment(segment.GetCollectionID(), segment.GetID(), meta.NextTarget) != nil &&
|
||||
segment.GetLevel() != datapb.SegmentLevel_L0
|
||||
return b.targetMgr.CanSegmentBeMoved(segment.GetCollectionID(), segment.GetID())
|
||||
})
|
||||
plans := b.AssignSegment(replica.GetCollectionID(), segments, onlineNodes, false)
|
||||
for i := range plans {
|
||||
@ -171,9 +168,7 @@ func (b *ChannelLevelScoreBalancer) genSegmentPlan(replica *meta.Replica, channe
|
||||
for _, node := range onlineNodes {
|
||||
dist := b.dist.SegmentDistManager.GetByFilter(meta.WithCollectionID(replica.GetCollectionID()), meta.WithNodeID(node), meta.WithChannel(channelName))
|
||||
segments := lo.Filter(dist, func(segment *meta.Segment, _ int) bool {
|
||||
return b.targetMgr.GetSealedSegment(segment.GetCollectionID(), segment.GetID(), meta.CurrentTarget) != nil &&
|
||||
b.targetMgr.GetSealedSegment(segment.GetCollectionID(), segment.GetID(), meta.NextTarget) != nil &&
|
||||
segment.GetLevel() != datapb.SegmentLevel_L0
|
||||
return b.targetMgr.CanSegmentBeMoved(segment.GetCollectionID(), segment.GetID())
|
||||
})
|
||||
segmentDist[node] = segments
|
||||
totalScore += nodeScore[node].getPriority()
|
||||
|
@ -9,7 +9,6 @@ import (
|
||||
"github.com/samber/lo"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/params"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/session"
|
||||
@ -511,9 +510,7 @@ func (b *MultiTargetBalancer) genSegmentPlan(replica *meta.Replica, rwNodes []in
|
||||
for _, node := range rwNodes {
|
||||
dist := b.dist.SegmentDistManager.GetByFilter(meta.WithCollectionID(replica.GetCollectionID()), meta.WithNodeID(node))
|
||||
segments := lo.Filter(dist, func(segment *meta.Segment, _ int) bool {
|
||||
return b.targetMgr.GetSealedSegment(segment.GetCollectionID(), segment.GetID(), meta.CurrentTarget) != nil &&
|
||||
b.targetMgr.GetSealedSegment(segment.GetCollectionID(), segment.GetID(), meta.NextTarget) != nil &&
|
||||
segment.GetLevel() != datapb.SegmentLevel_L0
|
||||
return b.targetMgr.CanSegmentBeMoved(segment.GetCollectionID(), segment.GetID())
|
||||
})
|
||||
nodeSegments[node] = segments
|
||||
globalNodeSegments[node] = b.dist.SegmentDistManager.GetByFilter(meta.WithNodeID(node))
|
||||
|
@ -25,7 +25,6 @@ import (
|
||||
"github.com/samber/lo"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/session"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/task"
|
||||
@ -220,9 +219,7 @@ func (b *RowCountBasedBalancer) genStoppingSegmentPlan(replica *meta.Replica, rw
|
||||
for _, nodeID := range roNodes {
|
||||
dist := b.dist.SegmentDistManager.GetByFilter(meta.WithCollectionID(replica.GetCollectionID()), meta.WithNodeID(nodeID))
|
||||
segments := lo.Filter(dist, func(segment *meta.Segment, _ int) bool {
|
||||
return b.targetMgr.GetSealedSegment(segment.GetCollectionID(), segment.GetID(), meta.CurrentTarget) != nil &&
|
||||
b.targetMgr.GetSealedSegment(segment.GetCollectionID(), segment.GetID(), meta.NextTarget) != nil &&
|
||||
segment.GetLevel() != datapb.SegmentLevel_L0
|
||||
return b.targetMgr.CanSegmentBeMoved(segment.GetCollectionID(), segment.GetID())
|
||||
})
|
||||
plans := b.AssignSegment(replica.GetCollectionID(), segments, rwNodes, false)
|
||||
for i := range plans {
|
||||
@ -243,9 +240,7 @@ func (b *RowCountBasedBalancer) genSegmentPlan(replica *meta.Replica, rwNodes []
|
||||
for _, node := range rwNodes {
|
||||
dist := b.dist.SegmentDistManager.GetByFilter(meta.WithCollectionID(replica.GetCollectionID()), meta.WithNodeID(node))
|
||||
segments := lo.Filter(dist, func(segment *meta.Segment, _ int) bool {
|
||||
return b.targetMgr.GetSealedSegment(segment.GetCollectionID(), segment.GetID(), meta.CurrentTarget) != nil &&
|
||||
b.targetMgr.GetSealedSegment(segment.GetCollectionID(), segment.GetID(), meta.NextTarget) != nil &&
|
||||
segment.GetLevel() != datapb.SegmentLevel_L0
|
||||
return b.targetMgr.CanSegmentBeMoved(segment.GetCollectionID(), segment.GetID())
|
||||
})
|
||||
rowCount := 0
|
||||
for _, s := range segments {
|
||||
|
@ -23,7 +23,6 @@ import (
|
||||
"github.com/samber/lo"
|
||||
"go.uber.org/zap"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/params"
|
||||
"github.com/milvus-io/milvus/internal/querycoordv2/session"
|
||||
@ -265,9 +264,7 @@ func (b *ScoreBasedBalancer) genStoppingSegmentPlan(replica *meta.Replica, onlin
|
||||
for _, nodeID := range offlineNodes {
|
||||
dist := b.dist.SegmentDistManager.GetByFilter(meta.WithCollectionID(replica.GetCollectionID()), meta.WithNodeID(nodeID))
|
||||
segments := lo.Filter(dist, func(segment *meta.Segment, _ int) bool {
|
||||
return b.targetMgr.GetSealedSegment(segment.GetCollectionID(), segment.GetID(), meta.CurrentTarget) != nil &&
|
||||
b.targetMgr.GetSealedSegment(segment.GetCollectionID(), segment.GetID(), meta.NextTarget) != nil &&
|
||||
segment.GetLevel() != datapb.SegmentLevel_L0
|
||||
return b.targetMgr.CanSegmentBeMoved(segment.GetCollectionID(), segment.GetID())
|
||||
})
|
||||
plans := b.AssignSegment(replica.GetCollectionID(), segments, onlineNodes, false)
|
||||
for i := range plans {
|
||||
@ -288,9 +285,7 @@ func (b *ScoreBasedBalancer) genSegmentPlan(replica *meta.Replica, onlineNodes [
|
||||
for _, node := range onlineNodes {
|
||||
dist := b.dist.SegmentDistManager.GetByFilter(meta.WithCollectionID(replica.GetCollectionID()), meta.WithNodeID(node))
|
||||
segments := lo.Filter(dist, func(segment *meta.Segment, _ int) bool {
|
||||
return b.targetMgr.GetSealedSegment(segment.GetCollectionID(), segment.GetID(), meta.CurrentTarget) != nil &&
|
||||
b.targetMgr.GetSealedSegment(segment.GetCollectionID(), segment.GetID(), meta.NextTarget) != nil &&
|
||||
segment.GetLevel() != datapb.SegmentLevel_L0
|
||||
return b.targetMgr.CanSegmentBeMoved(segment.GetCollectionID(), segment.GetID())
|
||||
})
|
||||
segmentDist[node] = segments
|
||||
totalScore += nodeScore[node].getPriority()
|
||||
|
41
internal/querycoordv2/dist/dist_handler.go
vendored
41
internal/querycoordv2/dist/dist_handler.go
vendored
@ -137,36 +137,23 @@ func (dh *distHandler) handleDistResp(resp *querypb.GetDataDistributionResponse,
|
||||
func (dh *distHandler) updateSegmentsDistribution(resp *querypb.GetDataDistributionResponse) {
|
||||
updates := make([]*meta.Segment, 0, len(resp.GetSegments()))
|
||||
for _, s := range resp.GetSegments() {
|
||||
// for collection which is already loaded
|
||||
segmentInfo := dh.target.GetSealedSegment(s.GetCollection(), s.GetID(), meta.CurrentTarget)
|
||||
segmentInfo := dh.target.GetSealedSegment(s.GetCollection(), s.GetID(), meta.CurrentTargetFirst)
|
||||
if segmentInfo == nil {
|
||||
// for collection which is loading
|
||||
segmentInfo = dh.target.GetSealedSegment(s.GetCollection(), s.GetID(), meta.NextTarget)
|
||||
}
|
||||
var segment *meta.Segment
|
||||
if segmentInfo == nil {
|
||||
segment = &meta.Segment{
|
||||
SegmentInfo: &datapb.SegmentInfo{
|
||||
ID: s.GetID(),
|
||||
CollectionID: s.GetCollection(),
|
||||
PartitionID: s.GetPartition(),
|
||||
InsertChannel: s.GetChannel(),
|
||||
},
|
||||
Node: resp.GetNodeID(),
|
||||
Version: s.GetVersion(),
|
||||
LastDeltaTimestamp: s.GetLastDeltaTimestamp(),
|
||||
IndexInfo: s.GetIndexInfo(),
|
||||
}
|
||||
} else {
|
||||
segment = &meta.Segment{
|
||||
SegmentInfo: proto.Clone(segmentInfo).(*datapb.SegmentInfo),
|
||||
Node: resp.GetNodeID(),
|
||||
Version: s.GetVersion(),
|
||||
LastDeltaTimestamp: s.GetLastDeltaTimestamp(),
|
||||
IndexInfo: s.GetIndexInfo(),
|
||||
segmentInfo = &datapb.SegmentInfo{
|
||||
ID: s.GetID(),
|
||||
CollectionID: s.GetCollection(),
|
||||
PartitionID: s.GetPartition(),
|
||||
InsertChannel: s.GetChannel(),
|
||||
Level: s.GetLevel(),
|
||||
}
|
||||
}
|
||||
updates = append(updates, segment)
|
||||
updates = append(updates, &meta.Segment{
|
||||
SegmentInfo: proto.Clone(segmentInfo).(*datapb.SegmentInfo),
|
||||
Node: resp.GetNodeID(),
|
||||
Version: s.GetVersion(),
|
||||
LastDeltaTimestamp: s.GetLastDeltaTimestamp(),
|
||||
IndexInfo: s.GetIndexInfo(),
|
||||
})
|
||||
}
|
||||
|
||||
dh.dist.SegmentDistManager.Update(resp.GetNodeID(), updates...)
|
||||
|
@ -24,6 +24,49 @@ func (_m *MockTargetManager) EXPECT() *MockTargetManager_Expecter {
|
||||
return &MockTargetManager_Expecter{mock: &_m.Mock}
|
||||
}
|
||||
|
||||
// CanSegmentBeMoved provides a mock function with given fields: collectionID, segmentID
|
||||
func (_m *MockTargetManager) CanSegmentBeMoved(collectionID int64, segmentID int64) bool {
|
||||
ret := _m.Called(collectionID, segmentID)
|
||||
|
||||
var r0 bool
|
||||
if rf, ok := ret.Get(0).(func(int64, int64) bool); ok {
|
||||
r0 = rf(collectionID, segmentID)
|
||||
} else {
|
||||
r0 = ret.Get(0).(bool)
|
||||
}
|
||||
|
||||
return r0
|
||||
}
|
||||
|
||||
// MockTargetManager_CanSegmentBeMoved_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'CanSegmentBeMoved'
|
||||
type MockTargetManager_CanSegmentBeMoved_Call struct {
|
||||
*mock.Call
|
||||
}
|
||||
|
||||
// CanSegmentBeMoved is a helper method to define mock.On call
|
||||
// - collectionID int64
|
||||
// - segmentID int64
|
||||
func (_e *MockTargetManager_Expecter) CanSegmentBeMoved(collectionID interface{}, segmentID interface{}) *MockTargetManager_CanSegmentBeMoved_Call {
|
||||
return &MockTargetManager_CanSegmentBeMoved_Call{Call: _e.mock.On("CanSegmentBeMoved", collectionID, segmentID)}
|
||||
}
|
||||
|
||||
func (_c *MockTargetManager_CanSegmentBeMoved_Call) Run(run func(collectionID int64, segmentID int64)) *MockTargetManager_CanSegmentBeMoved_Call {
|
||||
_c.Call.Run(func(args mock.Arguments) {
|
||||
run(args[0].(int64), args[1].(int64))
|
||||
})
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockTargetManager_CanSegmentBeMoved_Call) Return(_a0 bool) *MockTargetManager_CanSegmentBeMoved_Call {
|
||||
_c.Call.Return(_a0)
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockTargetManager_CanSegmentBeMoved_Call) RunAndReturn(run func(int64, int64) bool) *MockTargetManager_CanSegmentBeMoved_Call {
|
||||
_c.Call.Return(run)
|
||||
return _c
|
||||
}
|
||||
|
||||
// GetCollectionTargetVersion provides a mock function with given fields: collectionID, scope
|
||||
func (_m *MockTargetManager) GetCollectionTargetVersion(collectionID int64, scope int32) int64 {
|
||||
ret := _m.Called(collectionID, scope)
|
||||
|
@ -71,6 +71,7 @@ type TargetManagerInterface interface {
|
||||
IsNextTargetExist(collectionID int64) bool
|
||||
SaveCurrentTarget(catalog metastore.QueryCoordCatalog)
|
||||
Recover(catalog metastore.QueryCoordCatalog) error
|
||||
CanSegmentBeMoved(collectionID, segmentID int64) bool
|
||||
}
|
||||
|
||||
type TargetManager struct {
|
||||
@ -690,3 +691,20 @@ func (mgr *TargetManager) Recover(catalog metastore.QueryCoordCatalog) error {
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// if segment isn't l0 segment, and exist in current/next target, then it can be moved
|
||||
func (mgr *TargetManager) CanSegmentBeMoved(collectionID, segmentID int64) bool {
|
||||
mgr.rwMutex.Lock()
|
||||
defer mgr.rwMutex.Unlock()
|
||||
current := mgr.current.getCollectionTarget(collectionID)
|
||||
if current != nil && current.segments[segmentID] != nil && current.segments[segmentID].GetLevel() != datapb.SegmentLevel_L0 {
|
||||
return true
|
||||
}
|
||||
|
||||
next := mgr.next.getCollectionTarget(collectionID)
|
||||
if next != nil && next.segments[segmentID] != nil && next.segments[segmentID].GetLevel() != datapb.SegmentLevel_L0 {
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
@ -1208,6 +1208,7 @@ func (node *QueryNode) GetDataDistribution(ctx context.Context, req *querypb.Get
|
||||
Partition: s.Partition(),
|
||||
Channel: s.Shard().VirtualName(),
|
||||
Version: s.Version(),
|
||||
Level: s.Level(),
|
||||
LastDeltaTimestamp: s.LastDeltaTimestamp(),
|
||||
IndexInfo: lo.SliceToMap(s.Indexes(), func(info *segments.IndexedFieldInfo) (int64, *querypb.FieldIndexInfo) {
|
||||
return info.IndexInfo.FieldID, info.IndexInfo
|
||||
|
Loading…
Reference in New Issue
Block a user