mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-02 03:48:37 +08:00
fix: Return all compactTo segments after support split (#36361)
Related to #36360 Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
parent
3b10085f61
commit
d2c774fb6d
@ -301,7 +301,7 @@ func (t *clusteringCompactionTask) processStats() error {
|
||||
if !ok {
|
||||
return nil
|
||||
}
|
||||
resultSegments = append(resultSegments, to.GetID())
|
||||
resultSegments = append(resultSegments, lo.Map(to, func(segment *SegmentInfo, _ int) int64 { return segment.GetID() })...)
|
||||
}
|
||||
|
||||
log.Info("clustering compaction stats task finished",
|
||||
|
@ -556,7 +556,7 @@ func TestServer_AlterIndex(t *testing.T) {
|
||||
catalog: catalog,
|
||||
indexMeta: indexMeta,
|
||||
segments: &SegmentsInfo{
|
||||
compactionTo: make(map[int64]int64),
|
||||
compactionTo: make(map[int64][]int64),
|
||||
segments: map[UniqueID]*SegmentInfo{
|
||||
invalidSegID: {
|
||||
SegmentInfo: &datapb.SegmentInfo{
|
||||
|
@ -1651,7 +1651,7 @@ func (m *meta) HasSegments(segIDs []UniqueID) (bool, error) {
|
||||
}
|
||||
|
||||
// GetCompactionTo returns the segment info of the segment to be compacted to.
|
||||
func (m *meta) GetCompactionTo(segmentID int64) (*SegmentInfo, bool) {
|
||||
func (m *meta) GetCompactionTo(segmentID int64) ([]*SegmentInfo, bool) {
|
||||
m.RLock()
|
||||
defer m.RUnlock()
|
||||
|
||||
|
@ -861,7 +861,7 @@ func Test_meta_SetSegmentsCompacting(t *testing.T) {
|
||||
isCompacting: false,
|
||||
},
|
||||
},
|
||||
compactionTo: make(map[int64]UniqueID),
|
||||
compactionTo: make(map[int64][]UniqueID),
|
||||
},
|
||||
},
|
||||
args{
|
||||
|
@ -35,8 +35,9 @@ import (
|
||||
type SegmentsInfo struct {
|
||||
segments map[UniqueID]*SegmentInfo
|
||||
secondaryIndexes segmentInfoIndexes
|
||||
compactionTo map[UniqueID]UniqueID // map the compact relation, value is the segment which `CompactFrom` contains key.
|
||||
// A segment can be compacted to only one segment finally in meta.
|
||||
// map the compact relation, value is the segment which `CompactFrom` contains key.
|
||||
// now segment could be compacted to multiple segments
|
||||
compactionTo map[UniqueID][]UniqueID
|
||||
}
|
||||
|
||||
type segmentInfoIndexes struct {
|
||||
@ -87,7 +88,7 @@ func NewSegmentsInfo() *SegmentsInfo {
|
||||
coll2Segments: make(map[UniqueID]map[UniqueID]*SegmentInfo),
|
||||
channel2Segments: make(map[string]map[UniqueID]*SegmentInfo),
|
||||
},
|
||||
compactionTo: make(map[UniqueID]UniqueID),
|
||||
compactionTo: make(map[UniqueID][]UniqueID),
|
||||
}
|
||||
}
|
||||
|
||||
@ -167,15 +168,21 @@ func (s *SegmentsInfo) GetRealSegmentsForChannel(channel string) []*SegmentInfo
|
||||
// Return (nil, false) if given segmentID can not found in the meta.
|
||||
// Return (nil, true) if given segmentID can be found not no compaction to.
|
||||
// Return (notnil, true) if given segmentID can be found and has compaction to.
|
||||
func (s *SegmentsInfo) GetCompactionTo(fromSegmentID int64) (*SegmentInfo, bool) {
|
||||
func (s *SegmentsInfo) GetCompactionTo(fromSegmentID int64) ([]*SegmentInfo, bool) {
|
||||
if _, ok := s.segments[fromSegmentID]; !ok {
|
||||
return nil, false
|
||||
}
|
||||
if toID, ok := s.compactionTo[fromSegmentID]; ok {
|
||||
if to, ok := s.segments[toID]; ok {
|
||||
return to, true
|
||||
if compactTos, ok := s.compactionTo[fromSegmentID]; ok {
|
||||
result := []*SegmentInfo{}
|
||||
for _, compactTo := range compactTos {
|
||||
to, ok := s.segments[compactTo]
|
||||
if !ok {
|
||||
log.Warn("compactionTo relation is broken", zap.Int64("from", fromSegmentID), zap.Int64("to", compactTo))
|
||||
return nil, true
|
||||
}
|
||||
log.Warn("unreachable code: compactionTo relation is broken", zap.Int64("from", fromSegmentID), zap.Int64("to", toID))
|
||||
result = append(result, to)
|
||||
}
|
||||
return result, true
|
||||
}
|
||||
return nil, true
|
||||
}
|
||||
@ -380,7 +387,7 @@ func (s *SegmentsInfo) removeSecondaryIndex(segment *SegmentInfo) {
|
||||
// addCompactTo adds the compact relation to the segment
|
||||
func (s *SegmentsInfo) addCompactTo(segment *SegmentInfo) {
|
||||
for _, from := range segment.GetCompactionFrom() {
|
||||
s.compactionTo[from] = segment.GetID()
|
||||
s.compactionTo[from] = append(s.compactionTo[from], segment.GetID())
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -3,21 +3,23 @@ package datacoord
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/samber/lo"
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"github.com/milvus-io/milvus/internal/proto/datapb"
|
||||
)
|
||||
|
||||
func TestCompactionTo(t *testing.T) {
|
||||
t.Run("mix_2_to_1", func(t *testing.T) {
|
||||
segments := NewSegmentsInfo()
|
||||
segment := NewSegmentInfo(&datapb.SegmentInfo{
|
||||
ID: 1,
|
||||
})
|
||||
segments.SetSegment(segment.GetID(), segment)
|
||||
|
||||
s, ok := segments.GetCompactionTo(1)
|
||||
compactTos, ok := segments.GetCompactionTo(1)
|
||||
assert.True(t, ok)
|
||||
assert.Nil(t, s)
|
||||
assert.Nil(t, compactTos)
|
||||
|
||||
segment = NewSegmentInfo(&datapb.SegmentInfo{
|
||||
ID: 2,
|
||||
@ -29,72 +31,78 @@ func TestCompactionTo(t *testing.T) {
|
||||
})
|
||||
segments.SetSegment(segment.GetID(), segment)
|
||||
|
||||
s, ok = segments.GetCompactionTo(3)
|
||||
assert.Nil(t, s)
|
||||
assert.True(t, ok)
|
||||
s, ok = segments.GetCompactionTo(1)
|
||||
assert.True(t, ok)
|
||||
assert.NotNil(t, s)
|
||||
assert.Equal(t, int64(3), s.GetID())
|
||||
s, ok = segments.GetCompactionTo(2)
|
||||
assert.True(t, ok)
|
||||
assert.NotNil(t, s)
|
||||
assert.Equal(t, int64(3), s.GetID())
|
||||
getCompactToIDs := func(segments []*SegmentInfo) []int64 {
|
||||
return lo.Map(segments, func(segment *SegmentInfo, _ int) int64 { return segment.GetID() })
|
||||
}
|
||||
|
||||
// should be overwrite.
|
||||
segment = NewSegmentInfo(&datapb.SegmentInfo{
|
||||
ID: 3,
|
||||
CompactionFrom: []int64{2},
|
||||
})
|
||||
segments.SetSegment(segment.GetID(), segment)
|
||||
|
||||
s, ok = segments.GetCompactionTo(3)
|
||||
compactTos, ok = segments.GetCompactionTo(3)
|
||||
assert.Nil(t, compactTos)
|
||||
assert.True(t, ok)
|
||||
assert.Nil(t, s)
|
||||
s, ok = segments.GetCompactionTo(1)
|
||||
compactTos, ok = segments.GetCompactionTo(1)
|
||||
assert.True(t, ok)
|
||||
assert.Nil(t, s)
|
||||
s, ok = segments.GetCompactionTo(2)
|
||||
assert.NotNil(t, compactTos)
|
||||
assert.ElementsMatch(t, []int64{3}, getCompactToIDs(compactTos))
|
||||
compactTos, ok = segments.GetCompactionTo(2)
|
||||
assert.True(t, ok)
|
||||
assert.NotNil(t, s)
|
||||
assert.Equal(t, int64(3), s.GetID())
|
||||
|
||||
// should be overwrite back.
|
||||
segment = NewSegmentInfo(&datapb.SegmentInfo{
|
||||
ID: 3,
|
||||
CompactionFrom: []int64{1, 2},
|
||||
})
|
||||
segments.SetSegment(segment.GetID(), segment)
|
||||
|
||||
s, ok = segments.GetCompactionTo(3)
|
||||
assert.Nil(t, s)
|
||||
assert.True(t, ok)
|
||||
s, ok = segments.GetCompactionTo(1)
|
||||
assert.True(t, ok)
|
||||
assert.NotNil(t, s)
|
||||
assert.Equal(t, int64(3), s.GetID())
|
||||
s, ok = segments.GetCompactionTo(2)
|
||||
assert.True(t, ok)
|
||||
assert.NotNil(t, s)
|
||||
assert.Equal(t, int64(3), s.GetID())
|
||||
assert.NotNil(t, compactTos)
|
||||
assert.ElementsMatch(t, []int64{3}, getCompactToIDs(compactTos))
|
||||
|
||||
// should be droped.
|
||||
segments.DropSegment(1)
|
||||
s, ok = segments.GetCompactionTo(1)
|
||||
compactTos, ok = segments.GetCompactionTo(1)
|
||||
assert.False(t, ok)
|
||||
assert.Nil(t, s)
|
||||
s, ok = segments.GetCompactionTo(2)
|
||||
assert.Nil(t, compactTos)
|
||||
compactTos, ok = segments.GetCompactionTo(2)
|
||||
assert.True(t, ok)
|
||||
assert.NotNil(t, s)
|
||||
assert.Equal(t, int64(3), s.GetID())
|
||||
s, ok = segments.GetCompactionTo(3)
|
||||
assert.Nil(t, s)
|
||||
assert.NotNil(t, compactTos)
|
||||
assert.ElementsMatch(t, []int64{3}, getCompactToIDs(compactTos))
|
||||
compactTos, ok = segments.GetCompactionTo(3)
|
||||
assert.Nil(t, compactTos)
|
||||
assert.True(t, ok)
|
||||
|
||||
segments.DropSegment(3)
|
||||
s, ok = segments.GetCompactionTo(2)
|
||||
compactTos, ok = segments.GetCompactionTo(2)
|
||||
assert.True(t, ok)
|
||||
assert.Nil(t, s)
|
||||
assert.Nil(t, compactTos)
|
||||
})
|
||||
|
||||
t.Run("split_1_to_2", func(t *testing.T) {
|
||||
segments := NewSegmentsInfo()
|
||||
segment := NewSegmentInfo(&datapb.SegmentInfo{
|
||||
ID: 1,
|
||||
})
|
||||
segments.SetSegment(segment.GetID(), segment)
|
||||
|
||||
compactTos, ok := segments.GetCompactionTo(1)
|
||||
assert.True(t, ok)
|
||||
assert.Nil(t, compactTos)
|
||||
|
||||
segment = NewSegmentInfo(&datapb.SegmentInfo{
|
||||
ID: 2,
|
||||
CompactionFrom: []int64{1},
|
||||
})
|
||||
segments.SetSegment(segment.GetID(), segment)
|
||||
segment = NewSegmentInfo(&datapb.SegmentInfo{
|
||||
ID: 3,
|
||||
CompactionFrom: []int64{1},
|
||||
})
|
||||
segments.SetSegment(segment.GetID(), segment)
|
||||
|
||||
getCompactToIDs := func(segments []*SegmentInfo) []int64 {
|
||||
return lo.Map(segments, func(segment *SegmentInfo, _ int) int64 { return segment.GetID() })
|
||||
}
|
||||
|
||||
compactTos, ok = segments.GetCompactionTo(2)
|
||||
assert.Nil(t, compactTos)
|
||||
assert.True(t, ok)
|
||||
compactTos, ok = segments.GetCompactionTo(3)
|
||||
assert.Nil(t, compactTos)
|
||||
assert.True(t, ok)
|
||||
compactTos, ok = segments.GetCompactionTo(1)
|
||||
assert.True(t, ok)
|
||||
assert.NotNil(t, compactTos)
|
||||
assert.ElementsMatch(t, []int64{2, 3}, getCompactToIDs(compactTos))
|
||||
})
|
||||
}
|
||||
|
||||
func TestGetSegmentSize(t *testing.T) {
|
||||
|
@ -428,7 +428,7 @@ func (s *Server) GetSegmentInfo(ctx context.Context, req *datapb.GetSegmentInfoR
|
||||
info = s.meta.GetSegment(id)
|
||||
// TODO: GetCompactionTo should be removed and add into GetSegment method and protected by lock.
|
||||
// Too much modification need to be applied to SegmentInfo, a refactor is needed.
|
||||
child, ok := s.meta.GetCompactionTo(id)
|
||||
children, ok := s.meta.GetCompactionTo(id)
|
||||
|
||||
// info may be not-nil, but ok is false when the segment is being dropped concurrently.
|
||||
if info == nil || !ok {
|
||||
@ -439,7 +439,7 @@ func (s *Server) GetSegmentInfo(ctx context.Context, req *datapb.GetSegmentInfoR
|
||||
}
|
||||
|
||||
clonedInfo := info.Clone()
|
||||
if child != nil {
|
||||
for _, child := range children {
|
||||
clonedChild := child.Clone()
|
||||
// child segment should decompress binlog path
|
||||
binlog.DecompressBinLog(storage.DeleteBinlog, clonedChild.GetCollectionID(), clonedChild.GetPartitionID(), clonedChild.GetID(), clonedChild.GetDeltalogs())
|
||||
|
@ -102,7 +102,7 @@ func (s *statsTaskSuite) SetupSuite() {
|
||||
},
|
||||
},
|
||||
},
|
||||
compactionTo: map[UniqueID]UniqueID{},
|
||||
compactionTo: map[UniqueID][]UniqueID{},
|
||||
},
|
||||
|
||||
statsTaskMeta: &statsTaskMeta{
|
||||
|
Loading…
Reference in New Issue
Block a user