fix: [cherry-pick] Only load or release Flushed segment in datanode meta (#34393)

issue: #34376 ,  #34375,  #34379

master pr: #34390

---------

Signed-off-by: Cai Zhang <cai.zhang@zilliz.com>
This commit is contained in:
cai.zhang 2024-07-03 17:44:11 +08:00 committed by GitHub
parent 945f0106f6
commit 0c01ace0d2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 92 additions and 18 deletions

View File

@ -312,26 +312,35 @@ func (node *DataNode) SyncSegments(ctx context.Context, req *datapb.SyncSegments
futures := make([]*conc.Future[any], 0, len(missingSegments)) futures := make([]*conc.Future[any], 0, len(missingSegments))
for _, segID := range missingSegments { for _, segID := range missingSegments {
segID := segID
newSeg := req.GetSegmentInfos()[segID] newSeg := req.GetSegmentInfos()[segID]
newSegments = append(newSegments, newSeg) switch newSeg.GetLevel() {
future := io.GetOrCreateStatsPool().Submit(func() (any, error) { case datapb.SegmentLevel_L0:
var val *metacache.BloomFilterSet log.Warn("segment level is L0, may be the channel has not been successfully watched yet", zap.Int64("segmentID", segID))
var err error case datapb.SegmentLevel_Legacy:
err = binlog.DecompressBinLog(storage.StatsBinlog, req.GetCollectionId(), req.GetPartitionId(), newSeg.GetSegmentId(), []*datapb.FieldBinlog{newSeg.GetPkStatsLog()}) log.Warn("segment level is legacy, please check", zap.Int64("segmentID", segID))
if err != nil { default:
log.Warn("failed to DecompressBinLog", zap.Error(err)) if newSeg.GetState() == commonpb.SegmentState_Flushed {
return val, err log.Info("segment loading PKs", zap.Int64("segmentID", segID))
newSegments = append(newSegments, newSeg)
future := io.GetOrCreateStatsPool().Submit(func() (any, error) {
var val *metacache.BloomFilterSet
var err error
err = binlog.DecompressBinLog(storage.StatsBinlog, req.GetCollectionId(), req.GetPartitionId(), newSeg.GetSegmentId(), []*datapb.FieldBinlog{newSeg.GetPkStatsLog()})
if err != nil {
log.Warn("failed to DecompressBinLog", zap.Error(err))
return val, err
}
pks, err := util.LoadStats(ctx, node.chunkManager, ds.metacache.Schema(), newSeg.GetSegmentId(), []*datapb.FieldBinlog{newSeg.GetPkStatsLog()})
if err != nil {
log.Warn("failed to load segment stats log", zap.Error(err))
return val, err
}
val = metacache.NewBloomFilterSet(pks...)
return val, nil
})
futures = append(futures, future)
} }
pks, err := util.LoadStats(ctx, node.chunkManager, ds.metacache.Schema(), newSeg.GetSegmentId(), []*datapb.FieldBinlog{newSeg.GetPkStatsLog()}) }
if err != nil {
log.Warn("failed to load segment stats log", zap.Error(err))
return val, err
}
val = metacache.NewBloomFilterSet(pks...)
return val, nil
})
futures = append(futures, future)
} }
err := conc.AwaitAll(futures...) err := conc.AwaitAll(futures...)

View File

@ -1022,6 +1022,71 @@ func (s *DataNodeServicesSuite) TestSyncSegments() {
s.True(exist) s.True(exist)
s.NotNil(info) s.NotNil(info)
}) })
s.Run("dc growing/flushing dn dropped", func() {
s.SetupTest()
cache := metacache.NewMetaCache(&datapb.ChannelWatchInfo{
Schema: &schemapb.CollectionSchema{
Fields: []*schemapb.FieldSchema{
{
FieldID: 100,
Name: "pk",
IsPrimaryKey: true,
Description: "",
DataType: schemapb.DataType_Int64,
},
},
},
Vchan: &datapb.VchannelInfo{},
}, func(*datapb.SegmentInfo) *metacache.BloomFilterSet {
return metacache.NewBloomFilterSet()
})
mockFlowgraphManager := NewMockFlowgraphManager(s.T())
mockFlowgraphManager.EXPECT().GetFlowgraphService(mock.Anything).Return(&dataSyncService{
metacache: cache,
}, true)
s.node.flowgraphManager = mockFlowgraphManager
ctx := context.Background()
req := &datapb.SyncSegmentsRequest{
ChannelName: "channel1",
PartitionId: 2,
CollectionId: 1,
SegmentInfos: map[int64]*datapb.SyncSegmentInfo{
100: {
SegmentId: 100,
PkStatsLog: &datapb.FieldBinlog{
FieldID: 100,
Binlogs: nil,
},
State: commonpb.SegmentState_Growing,
Level: datapb.SegmentLevel_L1,
NumOfRows: 1024,
},
101: {
SegmentId: 101,
PkStatsLog: &datapb.FieldBinlog{
FieldID: 100,
Binlogs: nil,
},
State: commonpb.SegmentState_Flushing,
Level: datapb.SegmentLevel_L1,
NumOfRows: 1024,
},
},
}
status, err := s.node.SyncSegments(ctx, req)
s.NoError(err)
s.True(merr.Ok(status))
info, exist := cache.GetSegmentByID(100)
s.False(exist)
s.Nil(info)
info, exist = cache.GetSegmentByID(101)
s.False(exist)
s.Nil(info)
})
} }
func (s *DataNodeServicesSuite) TestDropCompactionPlan() { func (s *DataNodeServicesSuite) TestDropCompactionPlan() {