enhance: remove merger for load segments (#29062)

remove merger as now QueryNode could load segments concurrently
fix #29063

Signed-off-by: yah01 <yah2er0ne@outlook.com>
This commit is contained in:
yah01 2023-12-12 10:48:37 +08:00 committed by GitHub
parent d0bac9d0bb
commit 0a87724f18
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 34 additions and 500 deletions

View File

@ -112,7 +112,8 @@ func (action *SegmentAction) IsFinished(distMgr *meta.DistributionManager) bool
leaderSegmentDist := distMgr.LeaderViewManager.GetSealedSegmentDist(action.SegmentID())
nodeSegmentDist := distMgr.SegmentDistManager.GetSegmentDist(action.SegmentID())
return lo.Contains(leaderSegmentDist, action.Node()) &&
lo.Contains(nodeSegmentDist, action.Node())
lo.Contains(nodeSegmentDist, action.Node()) &&
action.rpcReturned.Load()
} else if action.Type() == ActionTypeReduce {
// FIXME: Now shard leader's segment view is a map of segment ID to node ID,
// loading segment replaces the node ID with the new one,

View File

@ -25,7 +25,6 @@ import (
"go.uber.org/atomic"
"go.uber.org/zap"
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
"github.com/milvus-io/milvus/internal/querycoordv2/meta"
. "github.com/milvus-io/milvus/internal/querycoordv2/params"
@ -47,9 +46,6 @@ type Executor struct {
cluster session.Cluster
nodeMgr *session.NodeManager
// Merge load segment requests
merger *Merger[segmentIndex, *querypb.LoadSegmentsRequest]
executingTasks *typeutil.ConcurrentSet[string] // task index
executingTaskNum atomic.Int32
}
@ -69,19 +65,15 @@ func NewExecutor(meta *meta.Meta,
targetMgr: targetMgr,
cluster: cluster,
nodeMgr: nodeMgr,
merger: NewMerger[segmentIndex, *querypb.LoadSegmentsRequest](),
executingTasks: typeutil.NewConcurrentSet[string](),
}
}
func (ex *Executor) Start(ctx context.Context) {
ex.merger.Start(ctx)
ex.scheduleRequests()
}
func (ex *Executor) Stop() {
ex.merger.Stop()
ex.wg.Wait()
}
@ -121,82 +113,6 @@ func (ex *Executor) Execute(task Task, step int) bool {
return true
}
func (ex *Executor) scheduleRequests() {
ex.wg.Add(1)
go func() {
defer ex.wg.Done()
for mergeTask := range ex.merger.Chan() {
task := mergeTask.(*LoadSegmentsTask)
log.Info("get merge task, process it",
zap.Int64("collectionID", task.req.GetCollectionID()),
zap.Int64("replicaID", task.req.GetReplicaID()),
zap.String("shard", task.req.GetInfos()[0].GetInsertChannel()),
zap.Int64("nodeID", task.req.GetDstNodeID()),
zap.Int("taskNum", len(task.tasks)),
)
go ex.processMergeTask(mergeTask.(*LoadSegmentsTask))
}
}()
}
func (ex *Executor) processMergeTask(mergeTask *LoadSegmentsTask) {
startTs := time.Now()
task := mergeTask.tasks[0]
action := task.Actions()[mergeTask.steps[0]]
var err error
defer func() {
if err != nil {
for i := range mergeTask.tasks {
mergeTask.tasks[i].Fail(err)
}
}
for i := range mergeTask.tasks {
ex.removeTask(mergeTask.tasks[i], mergeTask.steps[i])
}
}()
taskIDs := make([]int64, 0, len(mergeTask.tasks))
segments := make([]int64, 0, len(mergeTask.tasks))
for _, task := range mergeTask.tasks {
taskIDs = append(taskIDs, task.ID())
segments = append(segments, task.SegmentID())
}
log := log.With(
zap.Int64s("taskIDs", taskIDs),
zap.Int64("collectionID", task.CollectionID()),
zap.Int64("replicaID", task.ReplicaID()),
zap.String("shard", task.Shard()),
zap.Int64s("segmentIDs", segments),
zap.Int64("nodeID", action.Node()),
zap.String("source", task.Source().String()),
)
// Get shard leader for the given replica and segment
channel := mergeTask.req.GetInfos()[0].GetInsertChannel()
leader, ok := getShardLeader(ex.meta.ReplicaManager, ex.dist, task.CollectionID(), action.Node(), channel)
if !ok {
err = merr.WrapErrChannelNotFound(channel, "shard delegator not found")
log.Warn("no shard leader for the segment to execute loading", zap.Error(task.Err()))
return
}
log.Info("load segments...")
status, err := ex.cluster.LoadSegments(task.Context(), leader, mergeTask.req)
if err != nil {
log.Warn("failed to load segment", zap.Error(err))
return
}
if !merr.Ok(status) {
err = merr.Error(status)
log.Warn("failed to load segment", zap.Error(err))
return
}
elapsed := time.Since(startTs)
log.Info("load segments done", zap.Duration("elapsed", elapsed))
}
func (ex *Executor) removeTask(task Task, step int) {
if task.Err() != nil {
log.Info("execute action done, remove it",
@ -238,8 +154,8 @@ func (ex *Executor) loadSegment(task *SegmentTask, step int) error {
defer func() {
if err != nil {
task.Fail(err)
ex.removeTask(task, step)
}
ex.removeTask(task, step)
}()
collectionInfo, err := ex.broker.DescribeCollection(ctx, task.CollectionID())
@ -280,22 +196,6 @@ func (ex *Executor) loadSegment(task *SegmentTask, step int) error {
}
loadInfo := utils.PackSegmentLoadInfo(resp.GetInfos()[0], channel.GetSeekPosition(), indexes)
// Get shard leaderID for the given replica and segment
leaderID, ok := getShardLeader(
ex.meta.ReplicaManager,
ex.dist,
task.CollectionID(),
action.Node(),
segment.GetInsertChannel(),
)
if !ok {
msg := "no shard leader for the segment to execute loading"
err = merr.WrapErrChannelNotFound(segment.GetInsertChannel(), "shard delegator not found")
log.Warn(msg, zap.Error(err))
return err
}
log = log.With(zap.Int64("shardLeader", leaderID))
// Get collection index info
indexInfo, err := ex.broker.DescribeIndex(ctx, task.CollectionID())
if err != nil {
@ -312,9 +212,29 @@ func (ex *Executor) loadSegment(task *SegmentTask, step int) error {
loadInfo,
indexInfo,
)
loadTask := NewLoadSegmentsTask(task, step, req)
ex.merger.Add(loadTask)
log.Info("load segment task committed")
// Get shard leader for the given replica and segment
leaderID, ok := getShardLeader(ex.meta.ReplicaManager, ex.dist, task.CollectionID(), action.Node(), segment.GetInsertChannel())
if !ok {
msg := "no shard leader for the segment to execute loading"
err = merr.WrapErrChannelNotFound(segment.GetInsertChannel(), "shard delegator not found")
log.Warn(msg, zap.Error(err))
return err
}
log = log.With(zap.Int64("shardLeader", leaderID))
startTs := time.Now()
log.Info("load segments...")
status, err := ex.cluster.LoadSegments(task.Context(), leaderID, req)
err = merr.CheckRPCCall(status, err)
if err != nil {
log.Warn("failed to load segment", zap.Error(err))
return err
}
elapsed := time.Since(startTs)
log.Info("load segments done", zap.Duration("elapsed", elapsed))
return nil
}
@ -370,12 +290,9 @@ func (ex *Executor) releaseSegment(task *SegmentTask, step int) {
log.Info("release segment...")
status, err := ex.cluster.ReleaseSegments(ctx, dstNode, req)
err = merr.CheckRPCCall(status, err)
if err != nil {
log.Warn("failed to release segment, it may be a false failure", zap.Error(err))
return
}
if status.GetErrorCode() != commonpb.ErrorCode_Success {
log.Warn("failed to release segment", zap.String("reason", status.GetReason()))
log.Warn("failed to release segment", zap.Error(err))
return
}
elapsed := time.Since(startTs)

View File

@ -1,83 +0,0 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package task
import (
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
)
type MergeableTask[K comparable, R any] interface {
ID() K
Merge(other MergeableTask[K, R])
}
var _ MergeableTask[segmentIndex, *querypb.LoadSegmentsRequest] = (*LoadSegmentsTask)(nil)
type segmentIndex struct {
NodeID int64
CollectionID int64
Shard string
}
type LoadSegmentsTask struct {
tasks []*SegmentTask
steps []int
req *querypb.LoadSegmentsRequest
}
func NewLoadSegmentsTask(task *SegmentTask, step int, req *querypb.LoadSegmentsRequest) *LoadSegmentsTask {
return &LoadSegmentsTask{
tasks: []*SegmentTask{task},
steps: []int{step},
req: req,
}
}
func (task *LoadSegmentsTask) ID() segmentIndex {
return segmentIndex{
NodeID: task.req.GetDstNodeID(),
CollectionID: task.req.GetCollectionID(),
Shard: task.req.GetInfos()[0].GetInsertChannel(),
}
}
func (task *LoadSegmentsTask) Merge(other MergeableTask[segmentIndex, *querypb.LoadSegmentsRequest]) {
otherTask := other.(*LoadSegmentsTask)
task.tasks = append(task.tasks, otherTask.tasks...)
task.steps = append(task.steps, otherTask.steps...)
task.req.Infos = append(task.req.Infos, otherTask.req.GetInfos()...)
positions := make(map[string]*msgpb.MsgPosition)
for _, position := range task.req.DeltaPositions {
positions[position.GetChannelName()] = position
}
for _, position := range otherTask.req.GetDeltaPositions() {
merged, ok := positions[position.GetChannelName()]
if !ok || merged.GetTimestamp() > position.GetTimestamp() {
merged = position
}
positions[position.GetChannelName()] = merged
}
task.req.DeltaPositions = make([]*msgpb.MsgPosition, 0, len(positions))
for _, position := range positions {
task.req.DeltaPositions = append(task.req.DeltaPositions, position)
}
}
func (task *LoadSegmentsTask) Result() *querypb.LoadSegmentsRequest {
return task.req
}

View File

@ -1,139 +0,0 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package task
import (
"context"
"sync"
"time"
"go.uber.org/zap"
. "github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/pkg/log"
)
// Merger merges tasks with the same mergeID.
const waitQueueCap = 256
type Merger[K comparable, R any] struct {
stopCh chan struct{}
wg sync.WaitGroup
queues map[K][]MergeableTask[K, R] // TaskID -> Queue
waitQueue chan MergeableTask[K, R]
outCh chan MergeableTask[K, R]
stopOnce sync.Once
}
func NewMerger[K comparable, R any]() *Merger[K, R] {
return &Merger[K, R]{
stopCh: make(chan struct{}),
queues: make(map[K][]MergeableTask[K, R]),
waitQueue: make(chan MergeableTask[K, R], waitQueueCap),
outCh: make(chan MergeableTask[K, R], Params.QueryCoordCfg.TaskMergeCap.GetAsInt()),
}
}
func (merger *Merger[K, R]) Start(ctx context.Context) {
merger.schedule(ctx)
}
func (merger *Merger[K, R]) Stop() {
merger.stopOnce.Do(func() {
close(merger.stopCh)
merger.wg.Wait()
})
}
func (merger *Merger[K, R]) Chan() <-chan MergeableTask[K, R] {
return merger.outCh
}
func (merger *Merger[K, R]) schedule(ctx context.Context) {
merger.wg.Add(1)
go func() {
defer merger.wg.Done()
ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
close(merger.outCh)
log.Info("Merger stopped due to context canceled")
return
case <-merger.stopCh:
close(merger.outCh)
log.Info("Merger stopped")
return
case <-ticker.C:
merger.drain()
for id := range merger.queues {
merger.triggerExecution(id)
}
}
}
}()
}
func (merger *Merger[K, R]) Add(task MergeableTask[K, R]) {
merger.waitQueue <- task
}
func (merger *Merger[K, R]) drain() {
for {
select {
case task := <-merger.waitQueue:
queue, ok := merger.queues[task.ID()]
if !ok {
queue = []MergeableTask[K, R]{}
}
queue = append(queue, task)
merger.queues[task.ID()] = queue
default:
return
}
}
}
func (merger *Merger[K, R]) triggerExecution(id K) {
tasks := merger.queues[id]
delete(merger.queues, id)
var task MergeableTask[K, R]
merged := 0
for i := 0; i < len(tasks); i++ {
if merged == 0 {
task = tasks[i]
} else {
task.Merge(tasks[i])
}
merged++
if merged >= Params.QueryCoordCfg.TaskMergeCap.GetAsInt() {
merger.outCh <- task
merged = 0
}
}
if merged != 0 {
merger.outCh <- task
}
log.Info("merge tasks done, trigger execution", zap.Any("mergeID", task.ID()))
}

View File

@ -1,146 +0,0 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package task
import (
"context"
"testing"
"time"
"github.com/stretchr/testify/suite"
"github.com/milvus-io/milvus-proto/go-api/v2/msgpb"
"github.com/milvus-io/milvus/internal/proto/querypb"
. "github.com/milvus-io/milvus/internal/querycoordv2/params"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
type MergerSuite struct {
suite.Suite
// Data
collectionID int64
replicaID int64
nodeID int64
requests map[int64]*querypb.LoadSegmentsRequest
merger *Merger[segmentIndex, *querypb.LoadSegmentsRequest]
}
func (suite *MergerSuite) SetupSuite() {
paramtable.Init()
paramtable.Get().Save(Params.QueryCoordCfg.TaskMergeCap.Key, "3")
suite.collectionID = 1000
suite.replicaID = 100
suite.nodeID = 1
suite.requests = map[int64]*querypb.LoadSegmentsRequest{
1: {
DstNodeID: suite.nodeID,
CollectionID: suite.collectionID,
Infos: []*querypb.SegmentLoadInfo{
{
SegmentID: 1,
InsertChannel: "dmc0",
},
},
DeltaPositions: []*msgpb.MsgPosition{
{
ChannelName: "dmc0",
Timestamp: 2,
},
{
ChannelName: "dmc1",
Timestamp: 3,
},
},
},
2: {
DstNodeID: suite.nodeID,
CollectionID: suite.collectionID,
Infos: []*querypb.SegmentLoadInfo{
{
SegmentID: 2,
InsertChannel: "dmc0",
},
},
DeltaPositions: []*msgpb.MsgPosition{
{
ChannelName: "dmc0",
Timestamp: 3,
},
{
ChannelName: "dmc1",
Timestamp: 2,
},
},
},
3: {
DstNodeID: suite.nodeID,
CollectionID: suite.collectionID,
Infos: []*querypb.SegmentLoadInfo{
{
SegmentID: 3,
InsertChannel: "dmc0",
},
},
DeltaPositions: []*msgpb.MsgPosition{
{
ChannelName: "dmc0",
Timestamp: 1,
},
{
ChannelName: "dmc1",
Timestamp: 1,
},
},
},
}
}
func (suite *MergerSuite) SetupTest() {
suite.merger = NewMerger[segmentIndex, *querypb.LoadSegmentsRequest]()
}
func (suite *MergerSuite) TestMerge() {
const (
requestNum = 5
timeout = 5 * time.Second
)
ctx := context.Background()
for segmentID := int64(1); segmentID <= 3; segmentID++ {
task, err := NewSegmentTask(ctx, timeout, WrapIDSource(0), suite.collectionID, suite.replicaID,
NewSegmentAction(suite.nodeID, ActionTypeGrow, "", segmentID))
suite.NoError(err)
suite.merger.Add(NewLoadSegmentsTask(task, 0, suite.requests[segmentID]))
}
suite.merger.Start(ctx)
defer suite.merger.Stop()
taskI := <-suite.merger.Chan()
task := taskI.(*LoadSegmentsTask)
suite.Len(task.tasks, 3)
suite.Len(task.steps, 3)
suite.EqualValues(1, task.Result().DeltaPositions[0].Timestamp)
suite.EqualValues(1, task.Result().DeltaPositions[1].Timestamp)
suite.merger.Stop()
_, ok := <-suite.merger.Chan()
suite.Equal(ok, false)
}
func TestMerger(t *testing.T) {
suite.Run(t, new(MergerSuite))
}

View File

@ -1313,7 +1313,7 @@ func (suite *TaskSuite) TestNoExecutor() {
CollectionID: suite.collection,
ChannelName: Params.CommonCfg.RootCoordDml.GetValue() + "-test",
}
suite.nodeMgr.Add(session.NewNodeInfo(targetNode, "localhost"))
suite.meta.ReplicaManager.Put(
utils.CreateTestReplica(suite.replica, suite.collection, []int64{1, 2, 3, -1}))
@ -1348,24 +1348,6 @@ func (suite *TaskSuite) TestNoExecutor() {
// Process tasks
suite.dispatchAndWait(targetNode)
suite.AssertTaskNum(segmentsNum, 0, 0, segmentsNum)
// Process tasks done
// Dist contains channels
view := &meta.LeaderView{
ID: targetNode,
CollectionID: suite.collection,
Segments: map[int64]*querypb.SegmentDist{},
}
for _, segment := range suite.loadSegments {
view.Segments[segment] = &querypb.SegmentDist{NodeID: targetNode, Version: 0}
}
distSegments := lo.Map(segments, func(info *datapb.SegmentInfo, _ int) *meta.Segment {
return meta.SegmentFromInfo(info)
})
suite.dist.LeaderViewManager.Update(targetNode, view)
suite.dist.SegmentDistManager.Update(targetNode, distSegments...)
suite.dispatchAndWait(targetNode)
suite.AssertTaskNum(0, 0, 0, 0)
}

View File

@ -1218,8 +1218,10 @@ type queryCoordConfig struct {
// Deprecated: Since 2.2.0
RetryNum ParamItem `refreshable:"true"`
// Deprecated: Since 2.2.0
RetryInterval ParamItem `refreshable:"true"`
TaskMergeCap ParamItem `refreshable:"false"`
RetryInterval ParamItem `refreshable:"true"`
// Deprecated: Since 2.3.4
TaskMergeCap ParamItem `refreshable:"false"`
TaskExecutionCap ParamItem `refreshable:"true"`
// ---- Handoff ---
@ -1287,7 +1289,7 @@ func (p *queryCoordConfig) init(base *BaseTable) {
p.TaskMergeCap = ParamItem{
Key: "queryCoord.taskMergeCap",
Version: "2.2.0",
DefaultValue: "16",
DefaultValue: "1",
Export: true,
}
p.TaskMergeCap.Init(base.mgr)