enhance: [2.4] Exclude L0 segment from readable snapshot (#35510)

Cherry-pick from master
pr: #35507

L0 segments now do not contain insert data and may cause confusion for
query hook optimizer if counted as sealed segment number.

This PR add segment level flag in segment entry and exclude L0 segments
while get readable segment snapshot

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
congqixia 2024-08-16 15:26:54 +08:00 committed by GitHub
parent 555b7a6aa7
commit bd222e58eb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 31 additions and 20 deletions

View File

@ -524,6 +524,7 @@ func (sd *shardDelegator) LoadSegments(ctx context.Context, req *querypb.LoadSeg
PartitionID: info.GetPartitionID(),
NodeID: req.GetDstNodeID(),
Version: req.GetVersion(),
Level: info.GetLevel(),
}
})
if req.GetInfos()[0].GetLevel() == datapb.SegmentLevel_L0 {

View File

@ -509,6 +509,7 @@ func (s *DelegatorDataSuite) TestLoadSegments() {
PartitionID: 500,
StartPosition: &msgpb.MsgPosition{Timestamp: 20000},
DeltaPosition: &msgpb.MsgPosition{Timestamp: 20000},
Level: datapb.SegmentLevel_L1,
InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", s.collectionID),
},
},
@ -524,6 +525,7 @@ func (s *DelegatorDataSuite) TestLoadSegments() {
NodeID: 1,
PartitionID: 500,
TargetVersion: unreadableTargetVersion,
Level: datapb.SegmentLevel_L1,
},
}, sealed[0].Segments)
})
@ -599,20 +601,21 @@ func (s *DelegatorDataSuite) TestLoadSegments() {
})
s.NoError(err)
err = s.delegator.LoadSegments(ctx, &querypb.LoadSegmentsRequest{
Base: commonpbutil.NewMsgBase(),
DstNodeID: 1,
CollectionID: s.collectionID,
Infos: []*querypb.SegmentLoadInfo{
{
SegmentID: 200,
PartitionID: 500,
StartPosition: &msgpb.MsgPosition{Timestamp: 20000},
DeltaPosition: &msgpb.MsgPosition{Timestamp: 20000},
InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", s.collectionID),
},
},
})
// err = s.delegator.LoadSegments(ctx, &querypb.LoadSegmentsRequest{
// Base: commonpbutil.NewMsgBase(),
// DstNodeID: 1,
// CollectionID: s.collectionID,
// Infos: []*querypb.SegmentLoadInfo{
// {
// SegmentID: 200,
// PartitionID: 500,
// StartPosition: &msgpb.MsgPosition{Timestamp: 20000},
// DeltaPosition: &msgpb.MsgPosition{Timestamp: 20000},
// Level: datapb.SegmentLevel_L1,
// InsertChannel: fmt.Sprintf("by-dev-rootcoord-dml_0_%dv0", s.collectionID),
// },
// },
// })
s.NoError(err)
sealed, _ := s.delegator.GetSegmentInfo(false)
@ -624,12 +627,14 @@ func (s *DelegatorDataSuite) TestLoadSegments() {
NodeID: 1,
PartitionID: 500,
TargetVersion: unreadableTargetVersion,
Level: datapb.SegmentLevel_L1,
},
{
SegmentID: 200,
NodeID: 1,
PartitionID: 500,
TargetVersion: unreadableTargetVersion,
Level: datapb.SegmentLevel_L0,
},
}, sealed[0].Segments)
})

View File

@ -23,6 +23,7 @@ import (
"go.uber.org/atomic"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/typeutil"
@ -84,6 +85,7 @@ type SegmentEntry struct {
PartitionID UniqueID
Version int64
TargetVersion int64
Level datapb.SegmentLevel
}
// NewDistribution creates a new distribution instance with all field initialized.
@ -114,9 +116,7 @@ func (d *distribution) PinReadableSegments(partitions ...int64) (sealed []Snapsh
sealed, growing = current.Get(partitions...)
version = current.version
targetVersion := current.GetTargetVersion()
filterReadable := func(entry SegmentEntry, _ int) bool {
return entry.TargetVersion == targetVersion || entry.TargetVersion == initialTargetVersion
}
filterReadable := d.readableFilter(targetVersion)
sealed, growing = d.filterSegments(sealed, growing, filterReadable)
return
}
@ -157,9 +157,7 @@ func (d *distribution) PeekSegments(readable bool, partitions ...int64) (sealed
if readable {
targetVersion := current.GetTargetVersion()
filterReadable := func(entry SegmentEntry, _ int) bool {
return entry.TargetVersion == targetVersion || entry.TargetVersion == initialTargetVersion
}
filterReadable := d.readableFilter(targetVersion)
sealed, growing = d.filterSegments(sealed, growing, filterReadable)
return
}
@ -382,6 +380,13 @@ func (d *distribution) genSnapshot() chan struct{} {
return last.cleared
}
func (d *distribution) readableFilter(targetVersion int64) func(entry SegmentEntry, _ int) bool {
return func(entry SegmentEntry, _ int) bool {
// segment L0 is not readable for now
return entry.Level != datapb.SegmentLevel_L0 && (entry.TargetVersion == targetVersion || entry.TargetVersion == initialTargetVersion)
}
}
// getCleanup returns cleanup snapshots function.
func (d *distribution) getCleanup(version int64) snapshotCleanup {
return func() {