Fix ci issue caused by only compact flushed segments (#13317)

Signed-off-by: sunby <bingyi.sun@zilliz.com>

Co-authored-by: sunby <bingyi.sun@zilliz.com>
This commit is contained in:
Bingyi Sun 2021-12-14 13:55:07 +08:00 committed by GitHub
parent 1950beba24
commit 3a2aace6f0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 106 additions and 7 deletions

View File

@ -304,7 +304,7 @@ func (t *compactionTrigger) globalMergeCompaction(signal *compactionSignal, isFo
_, has := colls[segment.GetCollectionID()]
return (has || len(collections) == 0) && // if filters collection
isSegmentHealthy(segment) &&
segment.State == commonpb.SegmentState_Flushed && // flushed only
isFlush(segment) &&
!segment.isCompacting // not compacting now
}) // m is list of chanPartSegments, which is channel-partition organized segments
plans := make([]*datapb.CompactionPlan, 0)
@ -354,7 +354,7 @@ func (t *compactionTrigger) getCandidateSegments(channel string, partitionID Uni
segments := t.meta.GetSegmentsByChannel(channel)
res := make([]*SegmentInfo, 0)
for _, s := range segments {
if s.GetState() != commonpb.SegmentState_Flushed || s.GetInsertChannel() != channel ||
if !isFlush(s) || s.GetInsertChannel() != channel ||
s.GetPartitionID() != partitionID || s.isCompacting {
continue
}
@ -375,11 +375,16 @@ func (t *compactionTrigger) shouldDoMergeCompaction(segments []*SegmentInfo) boo
func (t *compactionTrigger) fillOriginPlan(plan *datapb.CompactionPlan) error {
// TODO context
id, err := t.allocator.allocID(context.Background())
id, err := t.allocator.allocID(context.TODO())
if err != nil {
return err
}
ts, err := t.allocator.allocTimestamp(context.TODO())
if err != nil {
return err
}
plan.PlanID = id
plan.StartTime = ts
plan.TimeoutInSeconds = maxCompactionTimeoutInSeconds
return nil
}
@ -444,3 +449,7 @@ func (t *compactionTrigger) singleCompaction(segment *SegmentInfo, isForce bool,
}
return plan, t.compactionHandler.execCompactionPlan(signal, plan)
}
func isFlush(segment *SegmentInfo) bool {
return segment.GetState() == commonpb.SegmentState_Flushed || segment.GetState() == commonpb.SegmentState_Flushing
}

View File

@ -160,6 +160,7 @@ func Test_compactionTrigger_forceTriggerCompaction(t *testing.T) {
},
},
},
StartTime: 3,
TimeoutInSeconds: maxCompactionTimeoutInSeconds,
Type: datapb.CompactionType_MergeCompaction,
Timetravel: 200,
@ -219,6 +220,7 @@ func Test_compactionTrigger_forceTriggerCompaction(t *testing.T) {
},
},
},
StartTime: 3,
TimeoutInSeconds: maxCompactionTimeoutInSeconds,
Type: datapb.CompactionType_InnerCompaction,
Timetravel: 200,
@ -364,13 +366,14 @@ func Test_compactionTrigger_triggerCompaction(t *testing.T) {
},
},
},
StartTime: 3,
TimeoutInSeconds: maxCompactionTimeoutInSeconds,
Type: datapb.CompactionType_InnerCompaction,
Timetravel: 200,
Channel: "ch1",
},
{
PlanID: 3,
PlanID: 4,
SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{
{
SegmentID: 2,
@ -391,6 +394,7 @@ func Test_compactionTrigger_triggerCompaction(t *testing.T) {
},
},
},
StartTime: 5,
TimeoutInSeconds: maxCompactionTimeoutInSeconds,
Type: datapb.CompactionType_MergeCompaction,
Timetravel: 200,
@ -475,6 +479,94 @@ func Test_compactionTrigger_triggerCompaction(t *testing.T) {
false,
[]*datapb.CompactionPlan{},
},
{
"test merge flushing segment",
fields{
&meta{
segments: &SegmentsInfo{
map[int64]*SegmentInfo{
2: {
SegmentInfo: &datapb.SegmentInfo{
ID: 2,
CollectionID: 2,
PartitionID: 1,
LastExpireTime: 100,
NumOfRows: 300,
MaxRowNum: 1000,
InsertChannel: "ch2",
State: commonpb.SegmentState_Flushing,
Binlogs: []*datapb.FieldBinlog{
{FieldID: 1, Binlogs: []string{"binlog2"}},
},
Deltalogs: []*datapb.DeltaLogInfo{
{RecordEntries: 5, DeltaLogPath: "deltalog2"},
},
},
},
3: {
SegmentInfo: &datapb.SegmentInfo{
ID: 3,
CollectionID: 2,
PartitionID: 1,
LastExpireTime: 100,
NumOfRows: 300,
MaxRowNum: 1000,
InsertChannel: "ch2",
State: commonpb.SegmentState_Flushing,
Binlogs: []*datapb.FieldBinlog{
{FieldID: 1, Binlogs: []string{"binlog3"}},
},
Deltalogs: []*datapb.DeltaLogInfo{
{RecordEntries: 5, DeltaLogPath: "deltalog3"},
},
},
},
},
},
},
newMockAllocator(),
make(chan *compactionSignal, 1),
(singleCompactionFunc)(chooseAllBinlogs),
(mergeCompactionFunc)(greedyMergeCompaction),
&spyCompactionHandler{spyChan: make(chan *datapb.CompactionPlan, 2)},
2,
true,
},
args{
&timetravel{200},
},
false,
[]*datapb.CompactionPlan{
{
PlanID: 2,
SegmentBinlogs: []*datapb.CompactionSegmentBinlogs{
{
SegmentID: 2,
FieldBinlogs: []*datapb.FieldBinlog{
{FieldID: 1, Binlogs: []string{"binlog2"}},
},
Deltalogs: []*datapb.DeltaLogInfo{
{RecordEntries: 5, DeltaLogPath: "deltalog2"},
},
},
{
SegmentID: 3,
FieldBinlogs: []*datapb.FieldBinlog{
{FieldID: 1, Binlogs: []string{"binlog3"}},
},
Deltalogs: []*datapb.DeltaLogInfo{
{RecordEntries: 5, DeltaLogPath: "deltalog3"},
},
},
},
StartTime: 3,
TimeoutInSeconds: maxCompactionTimeoutInSeconds,
Type: datapb.CompactionType_MergeCompaction,
Timetravel: 200,
Channel: "ch2",
},
},
},
}
for _, tt := range tests {

View File

@ -51,9 +51,7 @@ type MockAllocator struct {
func (m *MockAllocator) allocTimestamp(ctx context.Context) (Timestamp, error) {
val := atomic.AddInt64(&m.cnt, 1)
phy := time.Now().UnixNano() / int64(time.Millisecond)
ts := tsoutil.ComposeTS(phy, val)
return ts, nil
return Timestamp(val), nil
}
func (m *MockAllocator) allocID(ctx context.Context) (UniqueID, error) {