fix: Update segment's version in leader task (#31643)

issue: #31468

1. when segment's version in leader view doesn't match segment's version
in dist, should update leader view
2. after call loadDeltalog, should update segment's load version with
latest ts
3. change leader task's priority from high to low, to avoid leader task
replace segment task and balance task

---------

Signed-off-by: Wei Liu <wei.liu@zilliz.com>
This commit is contained in:
wei liu 2024-04-01 10:37:21 +08:00 committed by GitHub
parent f7a59766df
commit c311932d5f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 93 additions and 14 deletions

View File

@ -20,6 +20,7 @@ import (
"context"
"time"
"github.com/samber/lo"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/proto/datapb"
@ -120,8 +121,14 @@ func (c *LeaderChecker) findNeedLoadedSegments(ctx context.Context, replica *met
zap.Int64("leaderViewID", leaderView.ID),
)
ret := make([]task.Task, 0)
dist = utils.FindMaxVersionSegments(dist)
for _, s := range dist {
// skip set segment on stopping node to leader view
aliveNodeDist := lo.Filter(dist, func(s *meta.Segment, _ int) bool {
nodeInfo := c.nodeMgr.Get(s.Node)
return nodeInfo != nil && nodeInfo.GetState() != session.NodeStateStopping
})
latestNodeDist := utils.FindMaxVersionSegments(aliveNodeDist)
for _, s := range latestNodeDist {
segment := c.target.GetSealedSegment(leaderView.CollectionID, s.GetID(), meta.CurrentTargetFirst)
existInTarget := segment != nil
isL0Segment := existInTarget && segment.GetLevel() == datapb.SegmentLevel_L0
@ -130,12 +137,14 @@ func (c *LeaderChecker) findNeedLoadedSegments(ctx context.Context, replica *met
continue
}
// when segment's version in leader view doesn't match segment's version in dist
// which means leader view store wrong segment location in leader view, then we should update segment location and segment's version
version, ok := leaderView.Segments[s.GetID()]
if !ok || version.GetVersion() < s.Version {
if !ok || version.GetVersion() != s.Version {
log.RatedDebug(10, "leader checker append a segment to set",
zap.Int64("segmentID", s.GetID()),
zap.Int64("nodeID", s.Node))
action := task.NewLeaderAction(leaderView.ID, s.Node, task.ActionTypeGrow, s.GetInsertChannel(), s.GetID(), s.Version)
action := task.NewLeaderAction(leaderView.ID, s.Node, task.ActionTypeGrow, s.GetInsertChannel(), s.GetID(), time.Now().UnixNano())
t := task.NewLeaderTask(
ctx,
params.Params.QueryCoordCfg.SegmentTaskTimeout.GetAsDuration(time.Millisecond),
@ -145,8 +154,9 @@ func (c *LeaderChecker) findNeedLoadedSegments(ctx context.Context, replica *met
leaderView.ID,
action,
)
// index task shall have lower or equal priority than balance task
t.SetPriority(task.TaskPriorityHigh)
// leader task shouldn't replace executing segment task
t.SetPriority(task.TaskPriorityLow)
t.SetReason("add segment to leader view")
ret = append(ret, t)
}
@ -190,7 +200,8 @@ func (c *LeaderChecker) findNeedRemovedSegments(ctx context.Context, replica *me
action,
)
t.SetPriority(task.TaskPriorityHigh)
// leader task shouldn't replace executing segment task
t.SetPriority(task.TaskPriorityLow)
t.SetReason("remove segment from leader view")
ret = append(ret, t)
}

View File

@ -28,6 +28,7 @@ import (
etcdkv "github.com/milvus-io/milvus/internal/kv/etcd"
"github.com/milvus-io/milvus/internal/metastore/kv/querycoord"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
. "github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/internal/querycoordv2/session"
@ -105,6 +106,19 @@ func (suite *LeaderCheckerTestSuite) TestSyncLoadedSegments() {
// before target ready, should skip check collection
tasks := suite.checker.Check(context.TODO())
suite.Len(tasks, 0)
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 1,
Address: "localhost",
Hostname: "localhost",
}))
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 2,
Address: "localhost",
Hostname: "localhost",
}))
// test leader view lack of segments
observer.target.UpdateCollectionNextTarget(int64(1))
observer.target.UpdateCollectionCurrentTarget(1)
loadVersion := time.Now().UnixMilli()
@ -121,8 +135,26 @@ func (suite *LeaderCheckerTestSuite) TestSyncLoadedSegments() {
suite.Equal(tasks[0].Actions()[0].Type(), task.ActionTypeGrow)
suite.Equal(tasks[0].Actions()[0].Node(), int64(1))
suite.Equal(tasks[0].Actions()[0].(*task.LeaderAction).SegmentID(), int64(1))
suite.Equal(tasks[0].Actions()[0].(*task.LeaderAction).Version(), loadVersion)
suite.Equal(tasks[0].Priority(), task.TaskPriorityHigh)
suite.Equal(tasks[0].Priority(), task.TaskPriorityLow)
// test segment's version in leader view doesn't match segment's version in dist
observer.dist.SegmentDistManager.Update(1, utils.CreateTestSegment(1, 1, 1, 2, 1, "test-insert-channel"))
view = utils.CreateTestLeaderView(2, 1, "test-insert-channel", map[int64]int64{}, map[int64]*meta.Segment{})
view.TargetVersion = observer.target.GetCollectionTargetVersion(1, meta.CurrentTarget)
view.Segments[1] = &querypb.SegmentDist{
NodeID: 0,
Version: time.Now().UnixMilli() - 1,
}
observer.dist.LeaderViewManager.Update(2, view)
tasks = suite.checker.Check(context.TODO())
suite.Len(tasks, 1)
suite.Equal(tasks[0].Source(), utils.LeaderChecker)
suite.Len(tasks[0].Actions(), 1)
suite.Equal(tasks[0].Actions()[0].Type(), task.ActionTypeGrow)
suite.Equal(tasks[0].Actions()[0].Node(), int64(1))
suite.Equal(tasks[0].Actions()[0].(*task.LeaderAction).SegmentID(), int64(1))
suite.Equal(tasks[0].Priority(), task.TaskPriorityLow)
// test skip sync l0 segment
segments = []*datapb.SegmentInfo{
@ -168,6 +200,17 @@ func (suite *LeaderCheckerTestSuite) TestActivation() {
}
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return(
channels, segments, nil)
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 1,
Address: "localhost",
Hostname: "localhost",
}))
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 2,
Address: "localhost",
Hostname: "localhost",
}))
observer.target.UpdateCollectionNextTarget(int64(1))
observer.target.UpdateCollectionCurrentTarget(1)
observer.dist.SegmentDistManager.Update(1, utils.CreateTestSegment(1, 1, 1, 2, 1, "test-insert-channel"))
@ -187,7 +230,7 @@ func (suite *LeaderCheckerTestSuite) TestActivation() {
suite.Equal(tasks[0].Actions()[0].Type(), task.ActionTypeGrow)
suite.Equal(tasks[0].Actions()[0].Node(), int64(1))
suite.Equal(tasks[0].Actions()[0].(*task.LeaderAction).SegmentID(), int64(1))
suite.Equal(tasks[0].Priority(), task.TaskPriorityHigh)
suite.Equal(tasks[0].Priority(), task.TaskPriorityLow)
}
func (suite *LeaderCheckerTestSuite) TestStoppingNode() {
@ -249,6 +292,17 @@ func (suite *LeaderCheckerTestSuite) TestIgnoreSyncLoadedSegments() {
}
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return(
channels, segments, nil)
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 1,
Address: "localhost",
Hostname: "localhost",
}))
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 2,
Address: "localhost",
Hostname: "localhost",
}))
observer.target.UpdateCollectionNextTarget(int64(1))
observer.target.UpdateCollectionCurrentTarget(1)
observer.target.UpdateCollectionNextTarget(int64(1))
@ -266,7 +320,7 @@ func (suite *LeaderCheckerTestSuite) TestIgnoreSyncLoadedSegments() {
suite.Equal(tasks[0].Actions()[0].Type(), task.ActionTypeGrow)
suite.Equal(tasks[0].Actions()[0].Node(), int64(1))
suite.Equal(tasks[0].Actions()[0].(*task.LeaderAction).SegmentID(), int64(1))
suite.Equal(tasks[0].Priority(), task.TaskPriorityHigh)
suite.Equal(tasks[0].Priority(), task.TaskPriorityLow)
}
func (suite *LeaderCheckerTestSuite) TestSyncLoadedSegmentsWithReplicas() {
@ -290,6 +344,18 @@ func (suite *LeaderCheckerTestSuite) TestSyncLoadedSegmentsWithReplicas() {
}
suite.broker.EXPECT().GetRecoveryInfoV2(mock.Anything, int64(1)).Return(
channels, segments, nil)
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 1,
Address: "localhost",
Hostname: "localhost",
}))
suite.nodeMgr.Add(session.NewNodeInfo(session.ImmutableNodeInfo{
NodeID: 2,
Address: "localhost",
Hostname: "localhost",
}))
observer.target.UpdateCollectionNextTarget(int64(1))
observer.target.UpdateCollectionCurrentTarget(1)
observer.dist.SegmentDistManager.Update(1, utils.CreateTestSegment(1, 1, 1, 1, 0, "test-insert-channel"))
@ -311,7 +377,7 @@ func (suite *LeaderCheckerTestSuite) TestSyncLoadedSegmentsWithReplicas() {
suite.Equal(tasks[0].Actions()[0].Type(), task.ActionTypeGrow)
suite.Equal(tasks[0].Actions()[0].Node(), int64(1))
suite.Equal(tasks[0].Actions()[0].(*task.LeaderAction).SegmentID(), int64(1))
suite.Equal(tasks[0].Priority(), task.TaskPriorityHigh)
suite.Equal(tasks[0].Priority(), task.TaskPriorityLow)
}
func (suite *LeaderCheckerTestSuite) TestSyncRemovedSegments() {
@ -346,7 +412,7 @@ func (suite *LeaderCheckerTestSuite) TestSyncRemovedSegments() {
suite.Equal(tasks[0].Actions()[0].Node(), int64(1))
suite.Equal(tasks[0].Actions()[0].(*task.LeaderAction).SegmentID(), int64(3))
suite.Equal(tasks[0].Actions()[0].(*task.LeaderAction).Version(), int64(0))
suite.Equal(tasks[0].Priority(), task.TaskPriorityHigh)
suite.Equal(tasks[0].Priority(), task.TaskPriorityLow)
// skip sync l0 segments
segments := []*datapb.SegmentInfo{
@ -407,7 +473,7 @@ func (suite *LeaderCheckerTestSuite) TestIgnoreSyncRemovedSegments() {
suite.Equal(tasks[0].Actions()[0].Type(), task.ActionTypeReduce)
suite.Equal(tasks[0].Actions()[0].Node(), int64(2))
suite.Equal(tasks[0].Actions()[0].(*task.LeaderAction).SegmentID(), int64(3))
suite.Equal(tasks[0].Priority(), task.TaskPriorityHigh)
suite.Equal(tasks[0].Priority(), task.TaskPriorityLow)
}
func TestLeaderCheckerSuite(t *testing.T) {

View File

@ -129,6 +129,8 @@ func (node *QueryNode) loadDeltaLogs(ctx context.Context, req *querypb.LoadSegme
}
continue
}
// try to update segment version after load delta logs
node.manager.Segment.UpdateBy(segments.IncreaseVersion(req.GetVersion()), segments.WithType(segments.SegmentTypeSealed), segments.WithID(info.GetSegmentID()))
}
if finalErr != nil {