mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-11-29 10:28:41 +08:00
enhance: Use PrimaryKeys
to replace interface slice for segment delete (#37880)
Related to #35303 Reduce temporary memory usage for PK interface for segment delete. Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
parent
923a661dfe
commit
1ed686783f
@ -522,14 +522,13 @@ func (sd *shardDelegator) LoadSegments(ctx context.Context, req *querypb.LoadSeg
|
||||
return nil
|
||||
}
|
||||
|
||||
func (sd *shardDelegator) GetLevel0Deletions(partitionID int64, candidate pkoracle.Candidate) ([]storage.PrimaryKey, []storage.Timestamp) {
|
||||
func (sd *shardDelegator) GetLevel0Deletions(partitionID int64, candidate pkoracle.Candidate) (storage.PrimaryKeys, []storage.Timestamp) {
|
||||
sd.level0Mut.Lock()
|
||||
defer sd.level0Mut.Unlock()
|
||||
|
||||
// TODO: this could be large, host all L0 delete on delegator might be a dangerous, consider mmap it on local segment and stream processing it
|
||||
level0Segments := sd.segmentManager.GetBy(segments.WithLevel(datapb.SegmentLevel_L0), segments.WithChannel(sd.vchannelName))
|
||||
pks := make([]storage.PrimaryKey, 0)
|
||||
tss := make([]storage.Timestamp, 0)
|
||||
deltaData := storage.NewDeltaData(0)
|
||||
|
||||
for _, segment := range level0Segments {
|
||||
segment := segment.(*segments.L0Segment)
|
||||
@ -546,15 +545,14 @@ func (sd *shardDelegator) GetLevel0Deletions(partitionID int64, candidate pkorac
|
||||
hits := candidate.BatchPkExist(lc)
|
||||
for i, hit := range hits {
|
||||
if hit {
|
||||
pks = append(pks, segmentPks[idx+i])
|
||||
tss = append(tss, segmentTss[idx+i])
|
||||
deltaData.Append(segmentPks[idx+i], segmentTss[idx+i])
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return pks, tss
|
||||
return deltaData.DeletePks(), deltaData.DeleteTimestamps()
|
||||
}
|
||||
|
||||
func (sd *shardDelegator) RefreshLevel0DeletionStats() {
|
||||
|
@ -1546,29 +1546,33 @@ func (s *DelegatorDataSuite) TestLevel0Deletions() {
|
||||
l0Global.LoadDeltaData(context.TODO(), allPartitionDeleteData)
|
||||
|
||||
pks, _ := delegator.GetLevel0Deletions(partitionID, pkoracle.NewCandidateKey(l0.ID(), l0.Partition(), segments.SegmentTypeGrowing))
|
||||
s.True(pks[0].EQ(partitionDeleteData.DeletePks().Get(0)))
|
||||
s.True(pks.Get(0).EQ(partitionDeleteData.DeletePks().Get(0)))
|
||||
|
||||
pks, _ = delegator.GetLevel0Deletions(partitionID+1, pkoracle.NewCandidateKey(l0.ID(), l0.Partition(), segments.SegmentTypeGrowing))
|
||||
s.Empty(pks)
|
||||
|
||||
delegator.segmentManager.Put(context.TODO(), segments.SegmentTypeSealed, l0Global)
|
||||
pks, _ = delegator.GetLevel0Deletions(partitionID, pkoracle.NewCandidateKey(l0.ID(), l0.Partition(), segments.SegmentTypeGrowing))
|
||||
s.ElementsMatch(pks, []storage.PrimaryKey{partitionDeleteData.DeletePks().Get(0), allPartitionDeleteData.DeletePks().Get(0)})
|
||||
rawPks := make([]storage.PrimaryKey, 0, pks.Len())
|
||||
for i := 0; i < pks.Len(); i++ {
|
||||
rawPks = append(rawPks, pks.Get(i))
|
||||
}
|
||||
s.ElementsMatch(rawPks, []storage.PrimaryKey{partitionDeleteData.DeletePks().Get(0), allPartitionDeleteData.DeletePks().Get(0)})
|
||||
|
||||
bfs := pkoracle.NewBloomFilterSet(3, l0.Partition(), commonpb.SegmentState_Sealed)
|
||||
bfs.UpdateBloomFilter([]storage.PrimaryKey{allPartitionDeleteData.DeletePks().Get(0)})
|
||||
|
||||
pks, _ = delegator.GetLevel0Deletions(partitionID, bfs)
|
||||
// bf filtered segment
|
||||
s.Equal(len(pks), 1)
|
||||
s.True(pks[0].EQ(allPartitionDeleteData.DeletePks().Get(0)))
|
||||
s.Equal(pks.Len(), 1)
|
||||
s.True(pks.Get(0).EQ(allPartitionDeleteData.DeletePks().Get(0)))
|
||||
|
||||
delegator.segmentManager.Remove(context.TODO(), l0.ID(), querypb.DataScope_All)
|
||||
pks, _ = delegator.GetLevel0Deletions(partitionID, pkoracle.NewCandidateKey(l0.ID(), l0.Partition(), segments.SegmentTypeGrowing))
|
||||
s.True(pks[0].EQ(allPartitionDeleteData.DeletePks().Get(0)))
|
||||
s.True(pks.Get(0).EQ(allPartitionDeleteData.DeletePks().Get(0)))
|
||||
|
||||
pks, _ = delegator.GetLevel0Deletions(partitionID+1, pkoracle.NewCandidateKey(l0.ID(), l0.Partition(), segments.SegmentTypeGrowing))
|
||||
s.True(pks[0].EQ(allPartitionDeleteData.DeletePks().Get(0)))
|
||||
s.True(pks.Get(0).EQ(allPartitionDeleteData.DeletePks().Get(0)))
|
||||
|
||||
delegator.segmentManager.Remove(context.TODO(), l0Global.ID(), querypb.DataScope_All)
|
||||
pks, _ = delegator.GetLevel0Deletions(partitionID+1, pkoracle.NewCandidateKey(l0.ID(), l0.Partition(), segments.SegmentTypeGrowing))
|
||||
|
@ -100,11 +100,11 @@ func (sd *shardDelegator) addL0ForGrowing(ctx context.Context, segment segments.
|
||||
|
||||
func (sd *shardDelegator) addL0GrowingBF(ctx context.Context, segment segments.Segment) error {
|
||||
deletedPks, deletedTss := sd.GetLevel0Deletions(segment.Partition(), pkoracle.NewCandidateKey(segment.ID(), segment.Partition(), segments.SegmentTypeGrowing))
|
||||
if len(deletedPks) == 0 {
|
||||
if deletedPks == nil || deletedPks.Len() == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
log.Info("forwarding L0 delete records...", zap.Int64("segmentID", segment.ID()), zap.Int("deletionCount", len(deletedPks)))
|
||||
log.Info("forwarding L0 delete records...", zap.Int64("segmentID", segment.ID()), zap.Int("deletionCount", deletedPks.Len()))
|
||||
return segment.Delete(ctx, deletedPks, deletedTss)
|
||||
}
|
||||
|
||||
@ -131,19 +131,21 @@ func (sd *shardDelegator) forwardL0ByBF(ctx context.Context,
|
||||
}
|
||||
|
||||
deletedPks, deletedTss := sd.GetLevel0Deletions(candidate.Partition(), candidate)
|
||||
deleteData := &storage.DeleteData{}
|
||||
deleteData.AppendBatch(deletedPks, deletedTss)
|
||||
if deleteData.RowCount > 0 {
|
||||
if deletedPks != nil && deletedPks.Len() > 0 {
|
||||
log.Info("forward L0 delete to worker...",
|
||||
zap.Int64("deleteRowNum", deleteData.RowCount),
|
||||
zap.Int("deleteRowNum", deletedPks.Len()),
|
||||
)
|
||||
err := worker.Delete(ctx, &querypb.DeleteRequest{
|
||||
pks, err := storage.ParsePrimaryKeysBatch2IDs(deletedPks)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
err = worker.Delete(ctx, &querypb.DeleteRequest{
|
||||
Base: commonpbutil.NewMsgBase(commonpbutil.WithTargetID(targetNodeID)),
|
||||
CollectionId: info.GetCollectionID(),
|
||||
PartitionId: info.GetPartitionID(),
|
||||
SegmentId: info.GetSegmentID(),
|
||||
PrimaryKeys: storage.ParsePrimaryKeys2IDs(deleteData.Pks),
|
||||
Timestamps: deleteData.Tss,
|
||||
PrimaryKeys: pks,
|
||||
Timestamps: deletedTss,
|
||||
Scope: deleteScope,
|
||||
})
|
||||
if err != nil {
|
||||
|
@ -221,7 +221,7 @@ func (_c *MockSegment_DatabaseName_Call) RunAndReturn(run func() string) *MockSe
|
||||
}
|
||||
|
||||
// Delete provides a mock function with given fields: ctx, primaryKeys, timestamps
|
||||
func (_m *MockSegment) Delete(ctx context.Context, primaryKeys []storage.PrimaryKey, timestamps []uint64) error {
|
||||
func (_m *MockSegment) Delete(ctx context.Context, primaryKeys storage.PrimaryKeys, timestamps []uint64) error {
|
||||
ret := _m.Called(ctx, primaryKeys, timestamps)
|
||||
|
||||
if len(ret) == 0 {
|
||||
@ -229,7 +229,7 @@ func (_m *MockSegment) Delete(ctx context.Context, primaryKeys []storage.Primary
|
||||
}
|
||||
|
||||
var r0 error
|
||||
if rf, ok := ret.Get(0).(func(context.Context, []storage.PrimaryKey, []uint64) error); ok {
|
||||
if rf, ok := ret.Get(0).(func(context.Context, storage.PrimaryKeys, []uint64) error); ok {
|
||||
r0 = rf(ctx, primaryKeys, timestamps)
|
||||
} else {
|
||||
r0 = ret.Error(0)
|
||||
@ -245,15 +245,15 @@ type MockSegment_Delete_Call struct {
|
||||
|
||||
// Delete is a helper method to define mock.On call
|
||||
// - ctx context.Context
|
||||
// - primaryKeys []storage.PrimaryKey
|
||||
// - primaryKeys storage.PrimaryKeys
|
||||
// - timestamps []uint64
|
||||
func (_e *MockSegment_Expecter) Delete(ctx interface{}, primaryKeys interface{}, timestamps interface{}) *MockSegment_Delete_Call {
|
||||
return &MockSegment_Delete_Call{Call: _e.mock.On("Delete", ctx, primaryKeys, timestamps)}
|
||||
}
|
||||
|
||||
func (_c *MockSegment_Delete_Call) Run(run func(ctx context.Context, primaryKeys []storage.PrimaryKey, timestamps []uint64)) *MockSegment_Delete_Call {
|
||||
func (_c *MockSegment_Delete_Call) Run(run func(ctx context.Context, primaryKeys storage.PrimaryKeys, timestamps []uint64)) *MockSegment_Delete_Call {
|
||||
_c.Call.Run(func(args mock.Arguments) {
|
||||
run(args[0].(context.Context), args[1].([]storage.PrimaryKey), args[2].([]uint64))
|
||||
run(args[0].(context.Context), args[1].(storage.PrimaryKeys), args[2].([]uint64))
|
||||
})
|
||||
return _c
|
||||
}
|
||||
@ -263,7 +263,7 @@ func (_c *MockSegment_Delete_Call) Return(_a0 error) *MockSegment_Delete_Call {
|
||||
return _c
|
||||
}
|
||||
|
||||
func (_c *MockSegment_Delete_Call) RunAndReturn(run func(context.Context, []storage.PrimaryKey, []uint64) error) *MockSegment_Delete_Call {
|
||||
func (_c *MockSegment_Delete_Call) RunAndReturn(run func(context.Context, storage.PrimaryKeys, []uint64) error) *MockSegment_Delete_Call {
|
||||
_c.Call.Return(run)
|
||||
return _c
|
||||
}
|
||||
|
@ -776,7 +776,7 @@ func (s *LocalSegment) Insert(ctx context.Context, rowIDs []int64, timestamps []
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *LocalSegment) Delete(ctx context.Context, primaryKeys []storage.PrimaryKey, timestamps []typeutil.Timestamp) error {
|
||||
func (s *LocalSegment) Delete(ctx context.Context, primaryKeys storage.PrimaryKeys, timestamps []typeutil.Timestamp) error {
|
||||
/*
|
||||
CStatus
|
||||
Delete(CSegmentInterface c_segment,
|
||||
@ -786,7 +786,7 @@ func (s *LocalSegment) Delete(ctx context.Context, primaryKeys []storage.Primary
|
||||
const unsigned long* timestamps);
|
||||
*/
|
||||
|
||||
if len(primaryKeys) == 0 {
|
||||
if primaryKeys.Len() == 0 {
|
||||
return nil
|
||||
}
|
||||
if !s.ptrLock.RLockIf(state.IsNotReleased) {
|
||||
@ -795,34 +795,12 @@ func (s *LocalSegment) Delete(ctx context.Context, primaryKeys []storage.Primary
|
||||
defer s.ptrLock.RUnlock()
|
||||
|
||||
cOffset := C.int64_t(0) // depre
|
||||
cSize := C.int64_t(len(primaryKeys))
|
||||
cSize := C.int64_t(primaryKeys.Len())
|
||||
cTimestampsPtr := (*C.uint64_t)(&(timestamps)[0])
|
||||
|
||||
ids := &schemapb.IDs{}
|
||||
pkType := primaryKeys[0].Type()
|
||||
switch pkType {
|
||||
case schemapb.DataType_Int64:
|
||||
int64Pks := make([]int64, len(primaryKeys))
|
||||
for index, pk := range primaryKeys {
|
||||
int64Pks[index] = pk.(*storage.Int64PrimaryKey).Value
|
||||
}
|
||||
ids.IdField = &schemapb.IDs_IntId{
|
||||
IntId: &schemapb.LongArray{
|
||||
Data: int64Pks,
|
||||
},
|
||||
}
|
||||
case schemapb.DataType_VarChar:
|
||||
varCharPks := make([]string, len(primaryKeys))
|
||||
for index, entity := range primaryKeys {
|
||||
varCharPks[index] = entity.(*storage.VarCharPrimaryKey).Value
|
||||
}
|
||||
ids.IdField = &schemapb.IDs_StrId{
|
||||
StrId: &schemapb.StringArray{
|
||||
Data: varCharPks,
|
||||
},
|
||||
}
|
||||
default:
|
||||
return fmt.Errorf("invalid data type of primary keys")
|
||||
ids, err := storage.ParsePrimaryKeysBatch2IDs(primaryKeys)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
dataBlob, err := proto.Marshal(ids)
|
||||
|
@ -77,7 +77,7 @@ type Segment interface {
|
||||
|
||||
// Modification related
|
||||
Insert(ctx context.Context, rowIDs []int64, timestamps []typeutil.Timestamp, record *segcorepb.InsertRecord) error
|
||||
Delete(ctx context.Context, primaryKeys []storage.PrimaryKey, timestamps []typeutil.Timestamp) error
|
||||
Delete(ctx context.Context, primaryKeys storage.PrimaryKeys, timestamps []typeutil.Timestamp) error
|
||||
LoadDeltaData(ctx context.Context, deltaData *storage.DeltaData) error
|
||||
LastDeltaTimestamp() uint64
|
||||
Release(ctx context.Context, opts ...releaseOption)
|
||||
|
@ -147,7 +147,7 @@ func (s *L0Segment) Insert(ctx context.Context, rowIDs []int64, timestamps []typ
|
||||
return merr.WrapErrIoFailedReason("insert not supported for L0 segment")
|
||||
}
|
||||
|
||||
func (s *L0Segment) Delete(ctx context.Context, primaryKeys []storage.PrimaryKey, timestamps []typeutil.Timestamp) error {
|
||||
func (s *L0Segment) Delete(ctx context.Context, primaryKeys storage.PrimaryKeys, timestamps []typeutil.Timestamp) error {
|
||||
return merr.WrapErrIoFailedReason("delete not supported for L0 segment")
|
||||
}
|
||||
|
||||
|
@ -166,15 +166,15 @@ func (suite *SegmentSuite) TestResourceUsageEstimate() {
|
||||
func (suite *SegmentSuite) TestDelete() {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
pks, err := storage.GenInt64PrimaryKeys(0, 1)
|
||||
suite.NoError(err)
|
||||
pks := storage.NewInt64PrimaryKeys(2)
|
||||
pks.AppendRaw(0, 1)
|
||||
|
||||
// Test for sealed
|
||||
rowNum := suite.sealed.RowNum()
|
||||
err = suite.sealed.Delete(ctx, pks, []uint64{1000, 1000})
|
||||
err := suite.sealed.Delete(ctx, pks, []uint64{1000, 1000})
|
||||
suite.NoError(err)
|
||||
|
||||
suite.Equal(rowNum-int64(len(pks)), suite.sealed.RowNum())
|
||||
suite.Equal(rowNum-int64(pks.Len()), suite.sealed.RowNum())
|
||||
suite.Equal(rowNum, suite.sealed.InsertCount())
|
||||
|
||||
// Test for growing
|
||||
@ -182,7 +182,7 @@ func (suite *SegmentSuite) TestDelete() {
|
||||
err = suite.growing.Delete(ctx, pks, []uint64{1000, 1000})
|
||||
suite.NoError(err)
|
||||
|
||||
suite.Equal(rowNum-int64(len(pks)), suite.growing.RowNum())
|
||||
suite.Equal(rowNum-int64(pks.Len()), suite.growing.RowNum())
|
||||
suite.Equal(rowNum, suite.growing.InsertCount())
|
||||
}
|
||||
|
||||
|
@ -1372,7 +1372,7 @@ func (node *QueryNode) Delete(ctx context.Context, req *querypb.DeleteRequest) (
|
||||
return merr.Status(err), nil
|
||||
}
|
||||
|
||||
pks := storage.ParseIDs2PrimaryKeys(req.GetPrimaryKeys())
|
||||
pks := storage.ParseIDs2PrimaryKeysBatch(req.GetPrimaryKeys())
|
||||
for _, segment := range segments {
|
||||
err := segment.Delete(ctx, pks, req.GetTimestamps())
|
||||
if err != nil {
|
||||
@ -1427,7 +1427,7 @@ func (node *QueryNode) DeleteBatch(ctx context.Context, req *querypb.DeleteBatch
|
||||
log.Warn("Delete batch find missing ids", zap.Int64s("missing_ids", missingIDs.Collect()))
|
||||
}
|
||||
|
||||
pks := storage.ParseIDs2PrimaryKeys(req.GetPrimaryKeys())
|
||||
pks := storage.ParseIDs2PrimaryKeysBatch(req.GetPrimaryKeys())
|
||||
|
||||
// control the execution batch parallel with P number
|
||||
// maybe it shall be lower in case of heavy CPU usage may impacting search/query
|
||||
|
@ -350,6 +350,25 @@ func ParseIDs2PrimaryKeys(ids *schemapb.IDs) []PrimaryKey {
|
||||
return ret
|
||||
}
|
||||
|
||||
func ParseIDs2PrimaryKeysBatch(ids *schemapb.IDs) PrimaryKeys {
|
||||
var result PrimaryKeys
|
||||
switch ids.IdField.(type) {
|
||||
case *schemapb.IDs_IntId:
|
||||
int64Pks := ids.GetIntId().GetData()
|
||||
pks := NewInt64PrimaryKeys(int64(len(int64Pks)))
|
||||
pks.AppendRaw(int64Pks...)
|
||||
result = pks
|
||||
case *schemapb.IDs_StrId:
|
||||
stringPks := ids.GetStrId().GetData()
|
||||
pks := NewVarcharPrimaryKeys(int64(len(stringPks)))
|
||||
pks.AppendRaw(stringPks...)
|
||||
result = pks
|
||||
default:
|
||||
panic(fmt.Sprintf("unexpected schema id field type %T", ids.IdField))
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
func ParsePrimaryKeysBatch2IDs(pks PrimaryKeys) (*schemapb.IDs, error) {
|
||||
ret := &schemapb.IDs{}
|
||||
if pks.Len() == 0 {
|
||||
|
Loading…
Reference in New Issue
Block a user