2021-12-28 15:43:51 +08:00
|
|
|
// Licensed to the LF AI & Data foundation under one
|
|
|
|
// or more contributor license agreements. See the NOTICE file
|
|
|
|
// distributed with this work for additional information
|
|
|
|
// regarding copyright ownership. The ASF licenses this file
|
|
|
|
// to you under the Apache License, Version 2.0 (the
|
|
|
|
// "License"); you may not use this file except in compliance
|
|
|
|
// with the License. You may obtain a copy of the License at
|
|
|
|
//
|
|
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
//
|
|
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
// See the License for the specific language governing permissions and
|
|
|
|
// limitations under the License.
|
|
|
|
|
2021-11-05 22:25:00 +08:00
|
|
|
package datacoord
|
|
|
|
|
|
|
|
import (
|
|
|
|
"testing"
|
|
|
|
"time"
|
|
|
|
|
2023-02-26 11:31:49 +08:00
|
|
|
"github.com/cockroachdb/errors"
|
2023-12-05 18:44:37 +08:00
|
|
|
"github.com/samber/lo"
|
2023-01-06 14:33:36 +08:00
|
|
|
"github.com/stretchr/testify/mock"
|
2023-12-05 18:44:37 +08:00
|
|
|
"github.com/stretchr/testify/suite"
|
2022-11-18 15:35:09 +08:00
|
|
|
|
2023-06-09 01:28:37 +08:00
|
|
|
"github.com/milvus-io/milvus-proto/go-api/v2/commonpb"
|
2021-11-05 22:25:00 +08:00
|
|
|
"github.com/milvus-io/milvus/internal/proto/datapb"
|
2023-04-06 19:14:32 +08:00
|
|
|
"github.com/milvus-io/milvus/pkg/util/metautil"
|
2023-09-21 09:45:27 +08:00
|
|
|
"github.com/milvus-io/milvus/pkg/util/tsoutil"
|
2023-04-06 19:14:32 +08:00
|
|
|
"github.com/milvus-io/milvus/pkg/util/typeutil"
|
2021-11-05 22:25:00 +08:00
|
|
|
)
|
|
|
|
|
2023-12-05 18:44:37 +08:00
|
|
|
func TestCompactionPlanHandlerSuite(t *testing.T) {
|
|
|
|
suite.Run(t, new(CompactionPlanHandlerSuite))
|
|
|
|
}
|
|
|
|
|
|
|
|
type CompactionPlanHandlerSuite struct {
|
|
|
|
suite.Suite
|
|
|
|
|
2024-01-02 18:08:49 +08:00
|
|
|
mockMeta *MockCompactionMeta
|
|
|
|
mockAlloc *NMockAllocator
|
|
|
|
mockSch *MockScheduler
|
|
|
|
mockCm *MockChannelManager
|
|
|
|
mockSession *MockSessionManager
|
2023-12-05 18:44:37 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
func (s *CompactionPlanHandlerSuite) SetupTest() {
|
|
|
|
s.mockMeta = NewMockCompactionMeta(s.T())
|
|
|
|
s.mockAlloc = NewNMockAllocator(s.T())
|
|
|
|
s.mockSch = NewMockScheduler(s.T())
|
2024-01-02 18:08:49 +08:00
|
|
|
s.mockCm = NewMockChannelManager(s.T())
|
|
|
|
s.mockSession = NewMockSessionManager(s.T())
|
2023-12-05 18:44:37 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
func (s *CompactionPlanHandlerSuite) TestRemoveTasksByChannel() {
|
|
|
|
s.mockSch.EXPECT().Finish(mock.Anything, mock.Anything).Return().Once()
|
|
|
|
handler := newCompactionPlanHandler(nil, nil, nil, nil)
|
|
|
|
handler.scheduler = s.mockSch
|
|
|
|
|
|
|
|
var ch string = "ch1"
|
|
|
|
handler.mu.Lock()
|
|
|
|
handler.plans[1] = &compactionTask{
|
|
|
|
plan: &datapb.CompactionPlan{PlanID: 19530},
|
|
|
|
dataNodeID: 1,
|
|
|
|
triggerInfo: &compactionSignal{channel: ch},
|
|
|
|
}
|
|
|
|
handler.mu.Unlock()
|
|
|
|
|
|
|
|
handler.removeTasksByChannel(ch)
|
|
|
|
|
|
|
|
handler.mu.Lock()
|
|
|
|
s.Equal(0, len(handler.plans))
|
|
|
|
handler.mu.Unlock()
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *CompactionPlanHandlerSuite) TestCheckResult() {
|
2024-01-02 18:08:49 +08:00
|
|
|
s.mockSession.EXPECT().GetCompactionPlansResults().Return(map[int64]*datapb.CompactionPlanResult{
|
|
|
|
1: {PlanID: 1, State: commonpb.CompactionState_Executing},
|
|
|
|
2: {PlanID: 2, State: commonpb.CompactionState_Completed, Segments: []*datapb.CompactionSegment{{PlanID: 2}}},
|
|
|
|
3: {PlanID: 3, State: commonpb.CompactionState_Executing},
|
|
|
|
4: {PlanID: 4, State: commonpb.CompactionState_Executing},
|
|
|
|
})
|
2023-12-22 12:00:43 +08:00
|
|
|
{
|
|
|
|
s.mockAlloc.EXPECT().allocTimestamp(mock.Anything).Return(0, errors.New("mock")).Once()
|
2024-01-02 18:08:49 +08:00
|
|
|
handler := newCompactionPlanHandler(s.mockSession, nil, nil, s.mockAlloc)
|
2023-12-22 12:00:43 +08:00
|
|
|
handler.checkResult()
|
|
|
|
}
|
|
|
|
|
|
|
|
{
|
|
|
|
s.mockAlloc.EXPECT().allocTimestamp(mock.Anything).Return(19530, nil).Once()
|
2024-01-02 18:08:49 +08:00
|
|
|
handler := newCompactionPlanHandler(s.mockSession, nil, nil, s.mockAlloc)
|
2023-12-22 12:00:43 +08:00
|
|
|
handler.checkResult()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *CompactionPlanHandlerSuite) TestClean() {
|
|
|
|
startTime := tsoutil.ComposeTSByTime(time.Now(), 0)
|
|
|
|
cleanTime := tsoutil.ComposeTSByTime(time.Now().Add(-2*time.Hour), 0)
|
|
|
|
c := &compactionPlanHandler{
|
|
|
|
allocator: s.mockAlloc,
|
|
|
|
plans: map[int64]*compactionTask{
|
|
|
|
1: {
|
|
|
|
state: executing,
|
|
|
|
},
|
|
|
|
2: {
|
|
|
|
state: pipelining,
|
|
|
|
},
|
|
|
|
3: {
|
|
|
|
state: completed,
|
|
|
|
plan: &datapb.CompactionPlan{
|
|
|
|
StartTime: startTime,
|
|
|
|
},
|
|
|
|
},
|
|
|
|
4: {
|
|
|
|
state: completed,
|
|
|
|
plan: &datapb.CompactionPlan{
|
|
|
|
StartTime: cleanTime,
|
|
|
|
},
|
|
|
|
},
|
|
|
|
},
|
|
|
|
}
|
|
|
|
|
|
|
|
c.Clean()
|
|
|
|
s.Len(c.plans, 3)
|
2023-12-05 18:44:37 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
func (s *CompactionPlanHandlerSuite) TestHandleL0CompactionResults() {
|
|
|
|
channel := "Ch-1"
|
|
|
|
s.mockMeta.EXPECT().UpdateSegmentsInfo(mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).
|
|
|
|
Run(func(operators ...UpdateOperator) {
|
|
|
|
s.Equal(5, len(operators))
|
|
|
|
}).Return(nil).Once()
|
|
|
|
|
|
|
|
deltalogs := []*datapb.FieldBinlog{getFieldBinlogPaths(101, getDeltaLogPath("log3", 1))}
|
|
|
|
// 2 l0 segments, 3 sealed segments
|
|
|
|
plan := &datapb.CompactionPlan{
|
|
|
|
PlanID: 1,
|
|
|
|
SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{
|
|
|
|
{
|
|
|
|
SegmentID: 100,
|
|
|
|
Deltalogs: deltalogs,
|
|
|
|
Level: datapb.SegmentLevel_L0,
|
|
|
|
InsertChannel: channel,
|
|
|
|
},
|
|
|
|
{
|
|
|
|
SegmentID: 101,
|
|
|
|
Deltalogs: deltalogs,
|
|
|
|
Level: datapb.SegmentLevel_L0,
|
|
|
|
InsertChannel: channel,
|
|
|
|
},
|
|
|
|
{
|
|
|
|
SegmentID: 200,
|
|
|
|
Level: datapb.SegmentLevel_L1,
|
|
|
|
InsertChannel: channel,
|
|
|
|
},
|
|
|
|
{
|
|
|
|
SegmentID: 201,
|
|
|
|
Level: datapb.SegmentLevel_L1,
|
|
|
|
InsertChannel: channel,
|
|
|
|
},
|
|
|
|
{
|
|
|
|
SegmentID: 202,
|
|
|
|
Level: datapb.SegmentLevel_L1,
|
|
|
|
InsertChannel: channel,
|
|
|
|
},
|
|
|
|
},
|
|
|
|
Type: datapb.CompactionType_Level0DeleteCompaction,
|
|
|
|
}
|
|
|
|
|
|
|
|
result := &datapb.CompactionPlanResult{
|
|
|
|
PlanID: plan.GetPlanID(),
|
|
|
|
State: commonpb.CompactionState_Completed,
|
|
|
|
Channel: channel,
|
|
|
|
Segments: []*datapb.CompactionSegment{
|
|
|
|
{
|
|
|
|
SegmentID: 200,
|
|
|
|
Deltalogs: deltalogs,
|
|
|
|
Channel: channel,
|
|
|
|
},
|
|
|
|
{
|
|
|
|
SegmentID: 201,
|
|
|
|
Deltalogs: deltalogs,
|
|
|
|
Channel: channel,
|
|
|
|
},
|
|
|
|
{
|
|
|
|
SegmentID: 202,
|
|
|
|
Deltalogs: deltalogs,
|
|
|
|
Channel: channel,
|
|
|
|
},
|
|
|
|
},
|
|
|
|
}
|
|
|
|
|
|
|
|
handler := newCompactionPlanHandler(nil, nil, s.mockMeta, s.mockAlloc)
|
|
|
|
err := handler.handleL0CompactionResult(plan, result)
|
|
|
|
s.NoError(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
func (s *CompactionPlanHandlerSuite) TestRefreshL0Plan() {
|
|
|
|
channel := "Ch-1"
|
|
|
|
s.mockMeta.EXPECT().SelectSegments(mock.Anything).Return(
|
|
|
|
[]*SegmentInfo{
|
|
|
|
{SegmentInfo: &datapb.SegmentInfo{
|
|
|
|
ID: 200,
|
|
|
|
Level: datapb.SegmentLevel_L1,
|
|
|
|
InsertChannel: channel,
|
|
|
|
}},
|
|
|
|
{SegmentInfo: &datapb.SegmentInfo{
|
|
|
|
ID: 201,
|
|
|
|
Level: datapb.SegmentLevel_L1,
|
|
|
|
InsertChannel: channel,
|
|
|
|
}},
|
|
|
|
{SegmentInfo: &datapb.SegmentInfo{
|
|
|
|
ID: 202,
|
|
|
|
Level: datapb.SegmentLevel_L1,
|
|
|
|
InsertChannel: channel,
|
|
|
|
}},
|
|
|
|
},
|
|
|
|
)
|
|
|
|
|
|
|
|
deltalogs := []*datapb.FieldBinlog{getFieldBinlogPaths(101, getDeltaLogPath("log3", 1))}
|
|
|
|
// 2 l0 segments
|
|
|
|
plan := &datapb.CompactionPlan{
|
|
|
|
PlanID: 1,
|
|
|
|
SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{
|
|
|
|
{
|
|
|
|
SegmentID: 100,
|
|
|
|
Deltalogs: deltalogs,
|
|
|
|
Level: datapb.SegmentLevel_L0,
|
|
|
|
InsertChannel: channel,
|
|
|
|
},
|
|
|
|
{
|
|
|
|
SegmentID: 101,
|
|
|
|
Deltalogs: deltalogs,
|
|
|
|
Level: datapb.SegmentLevel_L0,
|
|
|
|
InsertChannel: channel,
|
|
|
|
},
|
|
|
|
},
|
|
|
|
Type: datapb.CompactionType_Level0DeleteCompaction,
|
|
|
|
}
|
|
|
|
|
|
|
|
task := &compactionTask{
|
|
|
|
triggerInfo: &compactionSignal{id: 19530, collectionID: 1, partitionID: 10},
|
|
|
|
state: executing,
|
|
|
|
plan: plan,
|
|
|
|
dataNodeID: 1,
|
|
|
|
}
|
|
|
|
|
|
|
|
handler := newCompactionPlanHandler(nil, nil, s.mockMeta, s.mockAlloc)
|
|
|
|
handler.RefreshPlan(task)
|
|
|
|
|
|
|
|
s.Equal(5, len(task.plan.GetSegmentBinlogs()))
|
|
|
|
segIDs := lo.Map(task.plan.GetSegmentBinlogs(), func(b *datapb.CompactionSegmentBinlogs, _ int) int64 {
|
|
|
|
return b.GetSegmentID()
|
|
|
|
})
|
|
|
|
|
|
|
|
s.ElementsMatch([]int64{200, 201, 202, 100, 101}, segIDs)
|
|
|
|
}
|
|
|
|
|
2024-01-02 18:08:49 +08:00
|
|
|
func (s *CompactionPlanHandlerSuite) TestExecCompactionPlan() {
|
|
|
|
s.mockCm.EXPECT().FindWatcher(mock.Anything).RunAndReturn(func(channel string) (int64, error) {
|
|
|
|
if channel == "ch-1" {
|
|
|
|
return 0, errors.Errorf("mock error for ch-1")
|
|
|
|
}
|
|
|
|
|
|
|
|
return 1, nil
|
|
|
|
}).Twice()
|
|
|
|
s.mockSch.EXPECT().Submit(mock.Anything).Return().Once()
|
|
|
|
|
2021-11-05 22:25:00 +08:00
|
|
|
tests := []struct {
|
2024-01-02 18:08:49 +08:00
|
|
|
description string
|
|
|
|
channel string
|
|
|
|
hasError bool
|
2021-11-05 22:25:00 +08:00
|
|
|
}{
|
2024-01-02 18:08:49 +08:00
|
|
|
{"channel with error", "ch-1", true},
|
|
|
|
{"channel with no error", "ch-2", false},
|
2021-11-05 22:25:00 +08:00
|
|
|
}
|
2024-01-02 18:08:49 +08:00
|
|
|
|
|
|
|
handler := newCompactionPlanHandler(nil, s.mockCm, s.mockMeta, s.mockAlloc)
|
|
|
|
handler.scheduler = s.mockSch
|
|
|
|
|
|
|
|
for idx, test := range tests {
|
|
|
|
sig := &compactionSignal{id: int64(idx)}
|
|
|
|
plan := &datapb.CompactionPlan{
|
|
|
|
PlanID: int64(idx),
|
|
|
|
}
|
|
|
|
s.Run(test.description, func() {
|
|
|
|
plan.Channel = test.channel
|
|
|
|
|
|
|
|
err := handler.execCompactionPlan(sig, plan)
|
|
|
|
if test.hasError {
|
|
|
|
s.Error(err)
|
2023-11-07 03:18:18 +08:00
|
|
|
} else {
|
2024-01-02 18:08:49 +08:00
|
|
|
s.NoError(err)
|
2021-11-05 22:25:00 +08:00
|
|
|
}
|
|
|
|
})
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2024-01-02 18:08:49 +08:00
|
|
|
func (s *CompactionPlanHandlerSuite) TestHandleMergeCompactionResult() {
|
2022-09-27 16:02:53 +08:00
|
|
|
plan := &datapb.CompactionPlan{
|
|
|
|
PlanID: 1,
|
|
|
|
SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{
|
2024-01-02 18:08:49 +08:00
|
|
|
{SegmentID: 1},
|
|
|
|
{SegmentID: 2},
|
2021-11-05 22:25:00 +08:00
|
|
|
},
|
2024-01-02 18:08:49 +08:00
|
|
|
Type: datapb.CompactionType_MixCompaction,
|
2022-09-27 16:02:53 +08:00
|
|
|
}
|
|
|
|
|
2024-01-02 18:08:49 +08:00
|
|
|
s.Run("illegal nil result", func() {
|
|
|
|
s.SetupTest()
|
|
|
|
handler := newCompactionPlanHandler(s.mockSession, s.mockCm, s.mockMeta, s.mockAlloc)
|
|
|
|
err := handler.handleMergeCompactionResult(nil, nil)
|
|
|
|
s.Error(err)
|
|
|
|
})
|
2022-09-27 16:02:53 +08:00
|
|
|
|
2024-01-02 18:08:49 +08:00
|
|
|
s.Run("not empty compacted to segment info", func() {
|
|
|
|
s.SetupTest()
|
|
|
|
s.mockMeta.EXPECT().GetHealthySegment(mock.Anything).RunAndReturn(
|
|
|
|
func(segID int64) *SegmentInfo {
|
|
|
|
if segID == 3 {
|
|
|
|
return NewSegmentInfo(&datapb.SegmentInfo{ID: 3})
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}).Once()
|
|
|
|
s.mockSession.EXPECT().SyncSegments(mock.Anything, mock.Anything).Return(nil).Once()
|
|
|
|
|
|
|
|
handler := newCompactionPlanHandler(s.mockSession, s.mockCm, s.mockMeta, s.mockAlloc)
|
|
|
|
handler.plans[plan.PlanID] = &compactionTask{dataNodeID: 111, plan: plan}
|
|
|
|
|
|
|
|
compactionResult := &datapb.CompactionPlanResult{
|
|
|
|
PlanID: plan.PlanID,
|
|
|
|
Segments: []*datapb.CompactionSegment{
|
|
|
|
{SegmentID: 3, NumOfRows: 15},
|
2022-10-26 16:49:31 +08:00
|
|
|
},
|
2024-01-02 18:08:49 +08:00
|
|
|
}
|
2022-10-26 16:49:31 +08:00
|
|
|
|
2024-01-02 18:08:49 +08:00
|
|
|
err := handler.handleMergeCompactionResult(plan, compactionResult)
|
|
|
|
s.NoError(err)
|
|
|
|
})
|
|
|
|
s.Run("prepare error", func() {
|
|
|
|
s.SetupTest()
|
|
|
|
s.mockMeta.EXPECT().GetHealthySegment(mock.Anything).Return(nil).Once()
|
|
|
|
s.mockMeta.EXPECT().PrepareCompleteCompactionMutation(mock.Anything, mock.Anything).Return(
|
|
|
|
nil, nil, nil, errors.New("mock error")).Once()
|
|
|
|
|
|
|
|
handler := newCompactionPlanHandler(s.mockSession, s.mockCm, s.mockMeta, s.mockAlloc)
|
|
|
|
handler.plans[plan.PlanID] = &compactionTask{dataNodeID: 111, plan: plan}
|
|
|
|
compactionResult := &datapb.CompactionPlanResult{
|
|
|
|
PlanID: plan.PlanID,
|
|
|
|
Segments: []*datapb.CompactionSegment{
|
|
|
|
{SegmentID: 4, NumOfRows: 15},
|
2021-11-11 15:54:42 +08:00
|
|
|
},
|
2024-01-02 18:08:49 +08:00
|
|
|
}
|
2022-10-26 16:49:31 +08:00
|
|
|
|
2024-01-02 18:08:49 +08:00
|
|
|
err := handler.handleMergeCompactionResult(plan, compactionResult)
|
|
|
|
s.Error(err)
|
|
|
|
})
|
2022-09-27 16:02:53 +08:00
|
|
|
|
2024-01-02 18:08:49 +08:00
|
|
|
s.Run("alter error", func() {
|
|
|
|
s.SetupTest()
|
|
|
|
s.mockMeta.EXPECT().GetHealthySegment(mock.Anything).Return(nil).Once()
|
|
|
|
s.mockMeta.EXPECT().PrepareCompleteCompactionMutation(mock.Anything, mock.Anything).Return(
|
|
|
|
[]*SegmentInfo{},
|
|
|
|
NewSegmentInfo(&datapb.SegmentInfo{ID: 100}),
|
|
|
|
&segMetricMutation{}, nil).Once()
|
|
|
|
s.mockMeta.EXPECT().alterMetaStoreAfterCompaction(mock.Anything, mock.Anything).
|
|
|
|
Return(errors.New("mock error")).Once()
|
|
|
|
|
|
|
|
handler := newCompactionPlanHandler(s.mockSession, s.mockCm, s.mockMeta, s.mockAlloc)
|
|
|
|
handler.plans[plan.PlanID] = &compactionTask{dataNodeID: 111, plan: plan}
|
|
|
|
compactionResult := &datapb.CompactionPlanResult{
|
|
|
|
PlanID: plan.PlanID,
|
|
|
|
Segments: []*datapb.CompactionSegment{
|
|
|
|
{SegmentID: 4, NumOfRows: 15},
|
2023-11-14 15:56:19 +08:00
|
|
|
},
|
2024-01-02 18:08:49 +08:00
|
|
|
}
|
2022-10-26 16:49:31 +08:00
|
|
|
|
2024-01-02 18:08:49 +08:00
|
|
|
err := handler.handleMergeCompactionResult(plan, compactionResult)
|
|
|
|
s.Error(err)
|
|
|
|
})
|
2022-10-26 16:49:31 +08:00
|
|
|
|
2024-01-02 18:08:49 +08:00
|
|
|
s.Run("sync segment error", func() {
|
|
|
|
s.SetupTest()
|
|
|
|
s.mockMeta.EXPECT().GetHealthySegment(mock.Anything).Return(nil).Once()
|
|
|
|
s.mockMeta.EXPECT().PrepareCompleteCompactionMutation(mock.Anything, mock.Anything).Return(
|
|
|
|
[]*SegmentInfo{},
|
|
|
|
NewSegmentInfo(&datapb.SegmentInfo{ID: 100}),
|
|
|
|
&segMetricMutation{}, nil).Once()
|
|
|
|
s.mockMeta.EXPECT().alterMetaStoreAfterCompaction(mock.Anything, mock.Anything).
|
|
|
|
Return(nil).Once()
|
|
|
|
s.mockSession.EXPECT().SyncSegments(mock.Anything, mock.Anything).Return(errors.New("mock error")).Once()
|
|
|
|
|
|
|
|
handler := newCompactionPlanHandler(s.mockSession, s.mockCm, s.mockMeta, s.mockAlloc)
|
|
|
|
handler.plans[plan.PlanID] = &compactionTask{dataNodeID: 111, plan: plan}
|
|
|
|
compactionResult := &datapb.CompactionPlanResult{
|
|
|
|
PlanID: plan.PlanID,
|
|
|
|
Segments: []*datapb.CompactionSegment{
|
|
|
|
{SegmentID: 4, NumOfRows: 15},
|
|
|
|
},
|
|
|
|
}
|
2023-03-17 17:27:56 +08:00
|
|
|
|
2024-01-02 18:08:49 +08:00
|
|
|
err := handler.handleMergeCompactionResult(plan, compactionResult)
|
|
|
|
s.Error(err)
|
|
|
|
})
|
2022-09-27 16:02:53 +08:00
|
|
|
}
|
|
|
|
|
2024-01-02 18:08:49 +08:00
|
|
|
func (s *CompactionPlanHandlerSuite) TestCompleteCompaction() {
|
|
|
|
s.Run("test not exists compaction task", func() {
|
|
|
|
handler := newCompactionPlanHandler(nil, nil, nil, nil)
|
|
|
|
err := handler.completeCompaction(&datapb.CompactionPlanResult{PlanID: 2})
|
|
|
|
s.Error(err)
|
2022-09-27 16:02:53 +08:00
|
|
|
})
|
2024-01-02 18:08:49 +08:00
|
|
|
|
|
|
|
s.Run("test completed compaction task", func() {
|
2022-09-27 16:02:53 +08:00
|
|
|
c := &compactionPlanHandler{
|
|
|
|
plans: map[int64]*compactionTask{1: {state: completed}},
|
|
|
|
}
|
2023-11-14 15:56:19 +08:00
|
|
|
err := c.completeCompaction(&datapb.CompactionPlanResult{PlanID: 1})
|
2024-01-02 18:08:49 +08:00
|
|
|
s.Error(err)
|
2022-09-27 16:02:53 +08:00
|
|
|
})
|
|
|
|
|
2024-01-02 18:08:49 +08:00
|
|
|
s.Run("test complete merge compaction task", func() {
|
|
|
|
s.mockSession.EXPECT().SyncSegments(mock.Anything, mock.Anything).Return(nil).Once()
|
|
|
|
// mock for handleMergeCompactionResult
|
|
|
|
s.mockMeta.EXPECT().GetHealthySegment(mock.Anything).Return(nil).Once()
|
|
|
|
s.mockMeta.EXPECT().PrepareCompleteCompactionMutation(mock.Anything, mock.Anything).Return(
|
|
|
|
[]*SegmentInfo{},
|
|
|
|
NewSegmentInfo(&datapb.SegmentInfo{ID: 100}),
|
|
|
|
&segMetricMutation{}, nil).Once()
|
|
|
|
s.mockMeta.EXPECT().alterMetaStoreAfterCompaction(mock.Anything, mock.Anything).
|
|
|
|
Return(nil).Once()
|
|
|
|
s.mockSch.EXPECT().Finish(mock.Anything, mock.Anything).Return()
|
2022-09-27 16:02:53 +08:00
|
|
|
|
|
|
|
dataNodeID := UniqueID(111)
|
|
|
|
|
|
|
|
seg1 := &datapb.SegmentInfo{
|
|
|
|
ID: 1,
|
2022-11-18 15:35:09 +08:00
|
|
|
Binlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getInsertLogPath("log1", 1))},
|
|
|
|
Statslogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getStatsLogPath("log2", 1))},
|
|
|
|
Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getDeltaLogPath("log3", 1))},
|
2022-09-27 16:02:53 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
seg2 := &datapb.SegmentInfo{
|
|
|
|
ID: 2,
|
2022-11-18 15:35:09 +08:00
|
|
|
Binlogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getInsertLogPath("log4", 2))},
|
|
|
|
Statslogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getStatsLogPath("log5", 2))},
|
|
|
|
Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getDeltaLogPath("log6", 2))},
|
2022-09-27 16:02:53 +08:00
|
|
|
}
|
|
|
|
|
|
|
|
plan := &datapb.CompactionPlan{
|
|
|
|
PlanID: 1,
|
|
|
|
SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{
|
|
|
|
{
|
|
|
|
SegmentID: seg1.ID,
|
|
|
|
FieldBinlogs: seg1.GetBinlogs(),
|
|
|
|
Field2StatslogPaths: seg1.GetStatslogs(),
|
|
|
|
Deltalogs: seg1.GetDeltalogs(),
|
2022-06-20 21:56:12 +08:00
|
|
|
},
|
2022-09-27 16:02:53 +08:00
|
|
|
{
|
|
|
|
SegmentID: seg2.ID,
|
|
|
|
FieldBinlogs: seg2.GetBinlogs(),
|
|
|
|
Field2StatslogPaths: seg2.GetStatslogs(),
|
|
|
|
Deltalogs: seg2.GetDeltalogs(),
|
2022-06-20 21:56:12 +08:00
|
|
|
},
|
|
|
|
},
|
2024-01-02 18:08:49 +08:00
|
|
|
Type: datapb.CompactionType_MixCompaction,
|
2022-09-27 16:02:53 +08:00
|
|
|
}
|
2022-06-20 21:56:12 +08:00
|
|
|
|
2022-09-27 16:02:53 +08:00
|
|
|
task := &compactionTask{
|
|
|
|
triggerInfo: &compactionSignal{id: 1},
|
|
|
|
state: executing,
|
|
|
|
plan: plan,
|
|
|
|
dataNodeID: dataNodeID,
|
|
|
|
}
|
|
|
|
|
|
|
|
plans := map[int64]*compactionTask{1: task}
|
|
|
|
|
2023-11-14 15:56:19 +08:00
|
|
|
compactionResult := datapb.CompactionPlanResult{
|
|
|
|
PlanID: 1,
|
|
|
|
Segments: []*datapb.CompactionSegment{
|
|
|
|
{
|
|
|
|
SegmentID: 3,
|
|
|
|
NumOfRows: 15,
|
|
|
|
InsertLogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getInsertLogPath("log301", 3))},
|
|
|
|
Field2StatslogPaths: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getStatsLogPath("log302", 3))},
|
|
|
|
Deltalogs: []*datapb.FieldBinlog{getFieldBinlogPaths(101, getDeltaLogPath("log303", 3))},
|
|
|
|
},
|
|
|
|
},
|
2022-09-27 16:02:53 +08:00
|
|
|
}
|
|
|
|
|
2024-01-02 18:08:49 +08:00
|
|
|
c := newCompactionPlanHandler(s.mockSession, s.mockCm, s.mockMeta, s.mockAlloc)
|
|
|
|
c.scheduler = s.mockSch
|
|
|
|
c.plans = plans
|
2022-09-27 16:02:53 +08:00
|
|
|
|
|
|
|
err := c.completeCompaction(&compactionResult)
|
2024-01-02 18:08:49 +08:00
|
|
|
s.NoError(err)
|
|
|
|
s.Nil(compactionResult.GetSegments()[0].GetInsertLogs())
|
|
|
|
s.Nil(compactionResult.GetSegments()[0].GetField2StatslogPaths())
|
|
|
|
s.Nil(compactionResult.GetSegments()[0].GetDeltalogs())
|
2022-09-27 16:02:53 +08:00
|
|
|
})
|
2024-01-02 18:08:49 +08:00
|
|
|
}
|
2022-10-27 17:15:32 +08:00
|
|
|
|
2024-01-02 18:08:49 +08:00
|
|
|
func (s *CompactionPlanHandlerSuite) TestGetCompactionTask() {
|
|
|
|
inPlans := map[int64]*compactionTask{
|
|
|
|
1: {
|
2022-10-27 17:15:32 +08:00
|
|
|
triggerInfo: &compactionSignal{id: 1},
|
2024-01-02 18:08:49 +08:00
|
|
|
plan: &datapb.CompactionPlan{PlanID: 1},
|
2022-10-27 17:15:32 +08:00
|
|
|
state: executing,
|
2024-01-02 18:08:49 +08:00
|
|
|
},
|
|
|
|
2: {
|
|
|
|
triggerInfo: &compactionSignal{id: 1},
|
|
|
|
plan: &datapb.CompactionPlan{PlanID: 2},
|
|
|
|
state: completed,
|
|
|
|
},
|
|
|
|
3: {
|
|
|
|
triggerInfo: &compactionSignal{id: 1},
|
|
|
|
plan: &datapb.CompactionPlan{PlanID: 3},
|
|
|
|
state: failed,
|
|
|
|
},
|
|
|
|
}
|
2022-10-27 17:15:32 +08:00
|
|
|
|
2024-01-02 18:08:49 +08:00
|
|
|
expected := lo.Values(inPlans)
|
2022-10-27 17:15:32 +08:00
|
|
|
|
2024-01-02 18:08:49 +08:00
|
|
|
handler := &compactionPlanHandler{plans: inPlans}
|
|
|
|
got := handler.getCompactionTasksBySignalID(1)
|
|
|
|
s.ElementsMatch(expected, got)
|
2022-10-27 17:15:32 +08:00
|
|
|
|
2024-01-02 18:08:49 +08:00
|
|
|
task := handler.getCompaction(1)
|
|
|
|
s.NotNil(task)
|
|
|
|
s.EqualValues(1, task.plan.PlanID)
|
2022-10-27 17:15:32 +08:00
|
|
|
|
2024-01-02 18:08:49 +08:00
|
|
|
task = handler.getCompaction(19530)
|
|
|
|
s.Nil(task)
|
|
|
|
}
|
2022-10-27 17:15:32 +08:00
|
|
|
|
2024-01-02 18:08:49 +08:00
|
|
|
func (s *CompactionPlanHandlerSuite) TestUpdateCompaction() {
|
|
|
|
s.mockSession.EXPECT().GetCompactionPlansResults().Return(map[int64]*datapb.CompactionPlanResult{
|
|
|
|
1: {PlanID: 1, State: commonpb.CompactionState_Executing},
|
|
|
|
2: {PlanID: 2, State: commonpb.CompactionState_Completed, Segments: []*datapb.CompactionSegment{{PlanID: 2}}},
|
|
|
|
3: {PlanID: 3, State: commonpb.CompactionState_Executing},
|
2022-10-27 17:15:32 +08:00
|
|
|
})
|
2021-11-05 22:25:00 +08:00
|
|
|
|
2024-01-02 18:08:49 +08:00
|
|
|
inPlans := map[int64]*compactionTask{
|
|
|
|
1: {
|
|
|
|
triggerInfo: &compactionSignal{},
|
|
|
|
plan: &datapb.CompactionPlan{PlanID: 1},
|
|
|
|
state: executing,
|
2021-11-05 22:25:00 +08:00
|
|
|
},
|
2024-01-02 18:08:49 +08:00
|
|
|
2: {
|
|
|
|
triggerInfo: &compactionSignal{},
|
|
|
|
plan: &datapb.CompactionPlan{PlanID: 2},
|
|
|
|
state: executing,
|
2021-11-05 22:25:00 +08:00
|
|
|
},
|
2024-01-02 18:08:49 +08:00
|
|
|
3: {
|
|
|
|
triggerInfo: &compactionSignal{},
|
|
|
|
plan: &datapb.CompactionPlan{PlanID: 3},
|
|
|
|
state: timeout,
|
|
|
|
},
|
|
|
|
4: {
|
|
|
|
triggerInfo: &compactionSignal{},
|
|
|
|
plan: &datapb.CompactionPlan{PlanID: 4},
|
|
|
|
state: timeout,
|
2021-11-05 22:25:00 +08:00
|
|
|
},
|
|
|
|
}
|
|
|
|
|
2024-01-02 18:08:49 +08:00
|
|
|
handler := newCompactionPlanHandler(s.mockSession, s.mockCm, s.mockMeta, s.mockAlloc)
|
|
|
|
handler.plans = inPlans
|
2021-11-05 22:25:00 +08:00
|
|
|
|
2024-01-02 18:08:49 +08:00
|
|
|
err := handler.updateCompaction(0)
|
|
|
|
s.NoError(err)
|
2023-08-30 11:12:27 +08:00
|
|
|
|
2024-01-02 18:08:49 +08:00
|
|
|
task := handler.plans[1]
|
|
|
|
s.Equal(timeout, task.state)
|
2021-11-05 22:25:00 +08:00
|
|
|
|
2024-01-02 18:08:49 +08:00
|
|
|
task = handler.plans[2]
|
|
|
|
s.Equal(executing, task.state)
|
2021-11-09 14:47:02 +08:00
|
|
|
|
2024-01-02 18:08:49 +08:00
|
|
|
task = handler.plans[3]
|
|
|
|
s.Equal(timeout, task.state)
|
2021-11-09 14:47:02 +08:00
|
|
|
|
2024-01-02 18:08:49 +08:00
|
|
|
task = handler.plans[4]
|
|
|
|
s.Equal(failed, task.state)
|
2021-11-09 14:47:02 +08:00
|
|
|
}
|
2022-06-15 23:14:10 +08:00
|
|
|
|
|
|
|
func getFieldBinlogPaths(id int64, paths ...string) *datapb.FieldBinlog {
|
|
|
|
l := &datapb.FieldBinlog{
|
|
|
|
FieldID: id,
|
|
|
|
Binlogs: make([]*datapb.Binlog, 0, len(paths)),
|
|
|
|
}
|
|
|
|
for _, path := range paths {
|
|
|
|
l.Binlogs = append(l.Binlogs, &datapb.Binlog{LogPath: path})
|
|
|
|
}
|
|
|
|
return l
|
|
|
|
}
|
2023-02-27 10:41:46 +08:00
|
|
|
|
|
|
|
func getFieldBinlogPathsWithEntry(id int64, entry int64, paths ...string) *datapb.FieldBinlog {
|
|
|
|
l := &datapb.FieldBinlog{
|
|
|
|
FieldID: id,
|
|
|
|
Binlogs: make([]*datapb.Binlog, 0, len(paths)),
|
|
|
|
}
|
|
|
|
for _, path := range paths {
|
|
|
|
l.Binlogs = append(l.Binlogs, &datapb.Binlog{LogPath: path, EntriesNum: entry})
|
|
|
|
}
|
|
|
|
return l
|
|
|
|
}
|
2024-01-02 18:08:49 +08:00
|
|
|
|
|
|
|
func getInsertLogPath(rootPath string, segmentID typeutil.UniqueID) string {
|
|
|
|
return metautil.BuildInsertLogPath(rootPath, 10, 100, segmentID, 1000, 10000)
|
|
|
|
}
|
|
|
|
|
|
|
|
func getStatsLogPath(rootPath string, segmentID typeutil.UniqueID) string {
|
|
|
|
return metautil.BuildStatsLogPath(rootPath, 10, 100, segmentID, 1000, 10000)
|
|
|
|
}
|
|
|
|
|
|
|
|
func getDeltaLogPath(rootPath string, segmentID typeutil.UniqueID) string {
|
|
|
|
return metautil.BuildDeltaLogPath(rootPath, 10, 100, segmentID, 10000)
|
|
|
|
}
|