enhance: Remove l0 delete cache (#33537)

Cherry pick from master
pr: #32989
remove l0 cache and build delete pk and ts everytime. this reduce the
memory and also increase the code readability

Signed-off-by: xiaofanluan <xiaofan.luan@zilliz.com>
This commit is contained in:
Xiaofan 2024-06-06 17:13:50 +08:00 committed by GitHub
parent a425a041a8
commit d331b403c3
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 79 additions and 111 deletions

View File

@ -106,12 +106,11 @@ type shardDelegator struct {
lifetime lifetime.Lifetime[lifetime.State]
distribution *distribution
segmentManager segments.SegmentManager
tsafeManager tsafe.Manager
pkOracle pkoracle.PkOracle
level0Mut sync.RWMutex
level0Deletions map[int64]*storage.DeleteData // partitionID -> deletions
distribution *distribution
segmentManager segments.SegmentManager
tsafeManager tsafe.Manager
pkOracle pkoracle.PkOracle
level0Mut sync.RWMutex
// stream delete buffer
deleteMut sync.RWMutex
deleteBuffer deletebuffer.DeleteBuffer[*deletebuffer.Item]
@ -876,7 +875,6 @@ func NewShardDelegator(ctx context.Context, collectionID UniqueID, replicaID Uni
workerManager: workerManager,
lifetime: lifetime.NewLifetime(lifetime.Initializing),
distribution: NewDistribution(),
level0Deletions: make(map[int64]*storage.DeleteData),
deleteBuffer: deletebuffer.NewListDeleteBuffer[*deletebuffer.Item](startTs, sizePerBlock),
pkOracle: pkoracle.NewPkOracle(),
tsafeManager: tsafeManager,

View File

@ -365,7 +365,7 @@ func (sd *shardDelegator) LoadGrowing(ctx context.Context, infos []*querypb.Segm
log := log.With(
zap.Int64("segmentID", segment.ID()),
)
deletedPks, deletedTss := sd.GetLevel0Deletions(segment.Partition())
deletedPks, deletedTss := sd.GetLevel0Deletions(segment.Partition(), pkoracle.NewCandidateKey(segment.ID(), segment.Partition(), segments.SegmentTypeGrowing))
if len(deletedPks) == 0 {
continue
}
@ -478,7 +478,7 @@ func (sd *shardDelegator) LoadSegments(ctx context.Context, req *querypb.LoadSeg
}
})
if req.GetInfos()[0].GetLevel() == datapb.SegmentLevel_L0 {
sd.GenerateLevel0DeletionCache()
sd.RefreshLevel0DeletionStats()
} else {
// load bloom filter only when candidate not exists
infos := lo.Filter(req.GetInfos(), func(info *querypb.SegmentLoadInfo, _ int) bool {
@ -512,94 +512,51 @@ func (sd *shardDelegator) LoadSegments(ctx context.Context, req *querypb.LoadSeg
return nil
}
func (sd *shardDelegator) GetLevel0Deletions(partitionID int64) ([]storage.PrimaryKey, []storage.Timestamp) {
sd.level0Mut.RLock()
deleteData, ok1 := sd.level0Deletions[partitionID]
allPartitionsDeleteData, ok2 := sd.level0Deletions[common.AllPartitionsID]
sd.level0Mut.RUnlock()
// we may need to merge the specified partition deletions and the all partitions deletions,
// so release the mutex as early as possible.
func (sd *shardDelegator) GetLevel0Deletions(partitionID int64, candidate pkoracle.Candidate) ([]storage.PrimaryKey, []storage.Timestamp) {
sd.level0Mut.Lock()
defer sd.level0Mut.Unlock()
if ok1 && ok2 {
pks := make([]storage.PrimaryKey, 0, deleteData.RowCount+allPartitionsDeleteData.RowCount)
tss := make([]storage.Timestamp, 0, deleteData.RowCount+allPartitionsDeleteData.RowCount)
// TODO: this could be large, host all L0 delete on delegator might be a dangerous, consider mmap it on local segment and stream processing it
level0Segments := sd.segmentManager.GetBy(segments.WithLevel(datapb.SegmentLevel_L0), segments.WithChannel(sd.vchannelName))
pks := make([]storage.PrimaryKey, 0)
tss := make([]storage.Timestamp, 0)
i := 0
j := 0
for i < int(deleteData.RowCount) || j < int(allPartitionsDeleteData.RowCount) {
if i == int(deleteData.RowCount) {
pks = append(pks, allPartitionsDeleteData.Pks[j])
tss = append(tss, allPartitionsDeleteData.Tss[j])
j++
} else if j == int(allPartitionsDeleteData.RowCount) {
pks = append(pks, deleteData.Pks[i])
tss = append(tss, deleteData.Tss[i])
i++
} else if deleteData.Tss[i] < allPartitionsDeleteData.Tss[j] {
pks = append(pks, deleteData.Pks[i])
tss = append(tss, deleteData.Tss[i])
i++
} else {
pks = append(pks, allPartitionsDeleteData.Pks[j])
tss = append(tss, allPartitionsDeleteData.Tss[j])
j++
for _, segment := range level0Segments {
segment := segment.(*segments.L0Segment)
if segment.Partition() == partitionID || segment.Partition() == common.AllPartitionsID {
segmentPks, segmentTss := segment.DeleteRecords()
for i, pk := range segmentPks {
if candidate.MayPkExist(pk) {
pks = append(pks, pk)
tss = append(tss, segmentTss[i])
}
}
}
return pks, tss
} else if ok1 {
return deleteData.Pks, deleteData.Tss
} else if ok2 {
return allPartitionsDeleteData.Pks, allPartitionsDeleteData.Tss
}
return nil, nil
sort.Slice(pks, func(i, j int) bool {
return tss[i] < tss[j]
})
return pks, tss
}
func (sd *shardDelegator) GenerateLevel0DeletionCache() {
func (sd *shardDelegator) RefreshLevel0DeletionStats() {
sd.level0Mut.Lock()
defer sd.level0Mut.Unlock()
level0Segments := sd.segmentManager.GetBy(segments.WithLevel(datapb.SegmentLevel_L0), segments.WithChannel(sd.vchannelName))
deletions := make(map[int64]*storage.DeleteData)
totalSize := int64(0)
for _, segment := range level0Segments {
segment := segment.(*segments.L0Segment)
pks, tss := segment.DeleteRecords()
deleteData, ok := deletions[segment.Partition()]
if !ok {
deleteData = storage.NewDeleteData(pks, tss)
} else {
deleteData.AppendBatch(pks, tss)
}
deletions[segment.Partition()] = deleteData
totalSize += lo.SumBy(pks, func(pk storage.PrimaryKey) int64 { return pk.Size() }) + int64(len(tss)*8)
}
type DeletePair struct {
Pk storage.PrimaryKey
Ts storage.Timestamp
}
for _, deleteData := range deletions {
pairs := make([]DeletePair, deleteData.RowCount)
for i := range deleteData.Pks {
pairs[i] = DeletePair{deleteData.Pks[i], deleteData.Tss[i]}
}
sort.Slice(pairs, func(i, j int) bool {
return pairs[i].Ts < pairs[j].Ts
})
for i := range pairs {
deleteData.Pks[i], deleteData.Tss[i] = pairs[i].Pk, pairs[i].Ts
}
}
sd.level0Mut.Lock()
defer sd.level0Mut.Unlock()
totalSize := int64(0)
for _, delete := range deletions {
totalSize += delete.Size()
}
metrics.QueryNodeLevelZeroSize.WithLabelValues(
fmt.Sprint(paramtable.GetNodeID()),
fmt.Sprint(sd.collectionID),
sd.vchannelName,
).Set(float64(totalSize))
sd.level0Deletions = deletions
}
func (sd *shardDelegator) loadStreamDelete(ctx context.Context,
@ -635,14 +592,9 @@ func (sd *shardDelegator) loadStreamDelete(ctx context.Context,
position = deltaPositions[0]
}
deletedPks, deletedTss := sd.GetLevel0Deletions(candidate.Partition())
deletedPks, deletedTss := sd.GetLevel0Deletions(candidate.Partition(), candidate)
deleteData := &storage.DeleteData{}
for i, pk := range deletedPks {
if candidate.MayPkExist(pk) {
deleteData.Append(pk, deletedTss[i])
}
}
deleteData.AppendBatch(deletedPks, deletedTss)
if deleteData.RowCount > 0 {
log.Info("forward L0 delete to worker...",
zap.Int64("deleteRowNum", deleteData.RowCount),
@ -900,7 +852,7 @@ func (sd *shardDelegator) ReleaseSegments(ctx context.Context, req *querypb.Rele
}
if hasLevel0 {
sd.GenerateLevel0DeletionCache()
sd.RefreshLevel0DeletionStats()
}
partitionsToReload := make([]UniqueID, 0)
lo.ForEach(req.GetSegmentIDs(), func(segmentID int64, _ int) {

View File

@ -1110,44 +1110,62 @@ func (s *DelegatorDataSuite) TestLevel0Deletions() {
partitionID := int64(10)
partitionDeleteData := storage.NewDeleteData([]storage.PrimaryKey{storage.NewInt64PrimaryKey(1)}, []storage.Timestamp{100})
allPartitionDeleteData := storage.NewDeleteData([]storage.PrimaryKey{storage.NewInt64PrimaryKey(2)}, []storage.Timestamp{101})
delegator.level0Deletions[partitionID] = partitionDeleteData
pks, _ := delegator.GetLevel0Deletions(partitionID)
schema := segments.GenTestCollectionSchema("test_stop", schemapb.DataType_Int64, true)
collection := segments.NewCollection(1, schema, nil, &querypb.LoadMetaInfo{
LoadType: querypb.LoadType_LoadCollection,
})
l0, _ := segments.NewL0Segment(collection, segments.SegmentTypeSealed, 1, &querypb.SegmentLoadInfo{
CollectionID: 1,
SegmentID: 2,
PartitionID: partitionID,
InsertChannel: delegator.vchannelName,
Level: datapb.SegmentLevel_L0,
NumOfRows: 1,
})
l0.LoadDeltaData(context.TODO(), partitionDeleteData)
delegator.segmentManager.Put(context.TODO(), segments.SegmentTypeSealed, l0)
l0Global, _ := segments.NewL0Segment(collection, segments.SegmentTypeSealed, 2, &querypb.SegmentLoadInfo{
CollectionID: 1,
SegmentID: 3,
PartitionID: common.AllPartitionsID,
InsertChannel: delegator.vchannelName,
Level: datapb.SegmentLevel_L0,
NumOfRows: int64(1),
})
l0Global.LoadDeltaData(context.TODO(), allPartitionDeleteData)
pks, _ := delegator.GetLevel0Deletions(partitionID, pkoracle.NewCandidateKey(l0.ID(), l0.Partition(), segments.SegmentTypeGrowing))
s.True(pks[0].EQ(partitionDeleteData.Pks[0]))
pks, _ = delegator.GetLevel0Deletions(partitionID + 1)
pks, _ = delegator.GetLevel0Deletions(partitionID+1, pkoracle.NewCandidateKey(l0.ID(), l0.Partition(), segments.SegmentTypeGrowing))
s.Empty(pks)
delegator.level0Deletions[common.AllPartitionsID] = allPartitionDeleteData
pks, _ = delegator.GetLevel0Deletions(partitionID)
s.Len(pks, 2)
delegator.segmentManager.Put(context.TODO(), segments.SegmentTypeSealed, l0Global)
pks, _ = delegator.GetLevel0Deletions(partitionID, pkoracle.NewCandidateKey(l0.ID(), l0.Partition(), segments.SegmentTypeGrowing))
s.True(pks[0].EQ(partitionDeleteData.Pks[0]))
s.True(pks[1].EQ(allPartitionDeleteData.Pks[0]))
delete(delegator.level0Deletions, partitionID)
pks, _ = delegator.GetLevel0Deletions(partitionID)
bfs := pkoracle.NewBloomFilterSet(3, l0.Partition(), commonpb.SegmentState_Sealed)
bfs.UpdateBloomFilter(allPartitionDeleteData.Pks)
pks, _ = delegator.GetLevel0Deletions(partitionID, bfs)
// bf filtered segment
s.Equal(len(pks), 1)
s.True(pks[0].EQ(allPartitionDeleteData.Pks[0]))
// exchange the order
delegator.level0Deletions = make(map[int64]*storage.DeleteData)
partitionDeleteData, allPartitionDeleteData = allPartitionDeleteData, partitionDeleteData
delegator.level0Deletions[partitionID] = partitionDeleteData
delegator.segmentManager.Remove(context.TODO(), l0.ID(), querypb.DataScope_All)
pks, _ = delegator.GetLevel0Deletions(partitionID, pkoracle.NewCandidateKey(l0.ID(), l0.Partition(), segments.SegmentTypeGrowing))
s.True(pks[0].EQ(allPartitionDeleteData.Pks[0]))
pks, _ = delegator.GetLevel0Deletions(partitionID)
s.True(pks[0].EQ(partitionDeleteData.Pks[0]))
pks, _ = delegator.GetLevel0Deletions(partitionID+1, pkoracle.NewCandidateKey(l0.ID(), l0.Partition(), segments.SegmentTypeGrowing))
s.True(pks[0].EQ(allPartitionDeleteData.Pks[0]))
pks, _ = delegator.GetLevel0Deletions(partitionID + 1)
delegator.segmentManager.Remove(context.TODO(), l0Global.ID(), querypb.DataScope_All)
pks, _ = delegator.GetLevel0Deletions(partitionID+1, pkoracle.NewCandidateKey(l0.ID(), l0.Partition(), segments.SegmentTypeGrowing))
s.Empty(pks)
delegator.level0Deletions[common.AllPartitionsID] = allPartitionDeleteData
pks, _ = delegator.GetLevel0Deletions(partitionID)
s.Len(pks, 2)
s.True(pks[0].EQ(allPartitionDeleteData.Pks[0]))
s.True(pks[1].EQ(partitionDeleteData.Pks[0]))
delete(delegator.level0Deletions, partitionID)
pks, _ = delegator.GetLevel0Deletions(partitionID)
s.True(pks[0].EQ(allPartitionDeleteData.Pks[0]))
}
func (s *DelegatorDataSuite) TestReadDeleteFromMsgstream() {