enhance: do check when add not empty logpath(#33640) (#34212)

pr: #33640

Signed-off-by: lixinguo <xinguo.li@zilliz.com>
Co-authored-by: lixinguo <xinguo.li@zilliz.com>
This commit is contained in:
smellthemoon 2024-07-01 18:10:08 +08:00 committed by GitHub
parent af442b936c
commit a3fdd02586
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
7 changed files with 58 additions and 40 deletions

View File

@ -690,7 +690,7 @@ func TestUpdateSegmentsInfo(t *testing.T) {
AddBinlogsOperator(1,
[]*datapb.FieldBinlog{getFieldBinlogIDsWithEntry(1, 10, 1)},
[]*datapb.FieldBinlog{getFieldBinlogIDs(1, 1)},
[]*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{{EntriesNum: 1, TimestampFrom: 100, TimestampTo: 200, LogSize: 1000, LogPath: getDeltaLogPath("deltalog1", 1), LogID: 2}}}},
[]*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{{EntriesNum: 1, TimestampFrom: 100, TimestampTo: 200, LogSize: 1000, LogPath: "", LogID: 2}}}},
),
UpdateStartPosition([]*datapb.SegmentStartPosition{{SegmentID: 1, StartPosition: &msgpb.MsgPosition{MsgID: []byte{1, 2, 3}}}}),
UpdateCheckPointOperator(1, []*datapb.CheckPoint{{SegmentID: 1, NumOfRows: 10}}),
@ -831,7 +831,7 @@ func TestUpdateSegmentsInfo(t *testing.T) {
AddBinlogsOperator(1,
[]*datapb.FieldBinlog{getFieldBinlogIDs(1, 2)},
[]*datapb.FieldBinlog{getFieldBinlogIDs(1, 2)},
[]*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{{EntriesNum: 1, TimestampFrom: 100, TimestampTo: 200, LogSize: 1000, LogPath: getDeltaLogPath("deltalog", 1), LogID: 2}}}},
[]*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{{EntriesNum: 1, TimestampFrom: 100, TimestampTo: 200, LogSize: 1000, LogPath: "", LogID: 2}}}},
),
UpdateStartPosition([]*datapb.SegmentStartPosition{{SegmentID: 1, StartPosition: &msgpb.MsgPosition{MsgID: []byte{1, 2, 3}}}}),
UpdateCheckPointOperator(1, []*datapb.CheckPoint{{SegmentID: 1, NumOfRows: 10}}),

View File

@ -172,7 +172,7 @@ func (s *SegmentsInfo) DropSegment(segmentID UniqueID) {
}
// SetSegment sets SegmentInfo with segmentID, perform overwrite if already exists
// set the logPath of segement in meta empty, to save space
// set the logPath of segment in meta empty, to save space
// if segment has logPath, make it empty
func (s *SegmentsInfo) SetSegment(segmentID UniqueID, segment *SegmentInfo) {
if segment, ok := s.segments[segmentID]; ok {

View File

@ -35,7 +35,6 @@ import (
"github.com/milvus-io/milvus/internal/metastore/mocks"
"github.com/milvus-io/milvus/internal/proto/datapb"
"github.com/milvus-io/milvus/pkg/util/etcd"
"github.com/milvus-io/milvus/pkg/util/metautil"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)
@ -649,7 +648,6 @@ func TestTryToSealSegment(t *testing.T) {
{
EntriesNum: 10,
LogID: 3,
LogPath: metautil.BuildInsertLogPath("", 1, 1, seg.ID, 2, 3),
},
},
},
@ -674,12 +672,10 @@ func TestTryToSealSegment(t *testing.T) {
{
EntriesNum: 10,
LogID: 1,
LogPath: metautil.BuildInsertLogPath("", 1, 1, seg.ID, 1, 3),
},
{
EntriesNum: 20,
LogID: 2,
LogPath: metautil.BuildInsertLogPath("", 1, 1, seg.ID, 1, 2),
},
},
},

View File

@ -57,7 +57,6 @@ import (
"github.com/milvus-io/milvus/pkg/util/etcd"
"github.com/milvus-io/milvus/pkg/util/funcutil"
"github.com/milvus-io/milvus/pkg/util/merr"
"github.com/milvus-io/milvus/pkg/util/metautil"
"github.com/milvus-io/milvus/pkg/util/metricsinfo"
"github.com/milvus-io/milvus/pkg/util/paramtable"
"github.com/milvus-io/milvus/pkg/util/tikv"
@ -319,17 +318,14 @@ func TestGetSegmentInfo(t *testing.T) {
Binlogs: []*datapb.Binlog{
{
EntriesNum: 20,
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 801),
LogID: 801,
},
{
EntriesNum: 20,
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 802),
LogID: 802,
},
{
EntriesNum: 20,
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 803),
LogID: 803,
},
},
@ -343,10 +339,10 @@ func TestGetSegmentInfo(t *testing.T) {
SegmentIDs: []int64{0},
}
resp, err := svr.GetSegmentInfo(svr.ctx, req)
assert.NoError(t, err)
assert.Equal(t, 1, len(resp.GetInfos()))
// Check that # of rows is corrected from 100 to 60.
assert.EqualValues(t, 60, resp.GetInfos()[0].GetNumOfRows())
assert.NoError(t, err)
assert.EqualValues(t, commonpb.ErrorCode_Success, resp.GetStatus().GetErrorCode())
})
t.Run("with wrong segmentID", func(t *testing.T) {
@ -1823,17 +1819,14 @@ func TestGetRecoveryInfo(t *testing.T) {
Binlogs: []*datapb.Binlog{
{
EntriesNum: 20,
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 901),
LogID: 901,
},
{
EntriesNum: 20,
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 902),
LogID: 902,
},
{
EntriesNum: 20,
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 903),
LogID: 903,
},
},
@ -1846,12 +1839,10 @@ func TestGetRecoveryInfo(t *testing.T) {
Binlogs: []*datapb.Binlog{
{
EntriesNum: 30,
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 1, 1, 801),
LogID: 801,
},
{
EntriesNum: 70,
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 1, 1, 802),
LogID: 802,
},
},
@ -1925,17 +1916,14 @@ func TestGetRecoveryInfo(t *testing.T) {
Binlogs: []*datapb.Binlog{
{
EntriesNum: 20,
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 3, 1, 901),
LogID: 901,
},
{
EntriesNum: 20,
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 3, 1, 902),
LogID: 902,
},
{
EntriesNum: 20,
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 3, 1, 903),
LogID: 903,
},
},
@ -1948,12 +1936,10 @@ func TestGetRecoveryInfo(t *testing.T) {
Binlogs: []*datapb.Binlog{
{
EntriesNum: 30,
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 4, 1, 801),
LogID: 801,
},
{
EntriesNum: 70,
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 4, 1, 802),
LogID: 802,
},
},

View File

@ -1138,17 +1138,14 @@ func TestGetRecoveryInfoV2(t *testing.T) {
Binlogs: []*datapb.Binlog{
{
EntriesNum: 20,
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 901),
LogID: 901,
},
{
EntriesNum: 20,
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 902),
LogID: 902,
},
{
EntriesNum: 20,
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 0, 1, 903),
LogID: 903,
},
},
@ -1161,12 +1158,10 @@ func TestGetRecoveryInfoV2(t *testing.T) {
Binlogs: []*datapb.Binlog{
{
EntriesNum: 30,
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 1, 1, 801),
LogID: 801,
},
{
EntriesNum: 70,
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 1, 1, 802),
LogID: 802,
},
},
@ -1243,17 +1238,14 @@ func TestGetRecoveryInfoV2(t *testing.T) {
Binlogs: []*datapb.Binlog{
{
EntriesNum: 20,
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 3, 1, 901),
LogID: 901,
},
{
EntriesNum: 20,
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 3, 1, 902),
LogID: 902,
},
{
EntriesNum: 20,
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 3, 1, 903),
LogID: 903,
},
},
@ -1266,12 +1258,10 @@ func TestGetRecoveryInfoV2(t *testing.T) {
Binlogs: []*datapb.Binlog{
{
EntriesNum: 30,
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 4, 1, 801),
LogID: 801,
},
{
EntriesNum: 70,
LogPath: metautil.BuildInsertLogPath("a", 0, 0, 4, 1, 802),
LogID: 802,
},
},
@ -1318,12 +1308,10 @@ func TestGetRecoveryInfoV2(t *testing.T) {
FieldID: 1,
Binlogs: []*datapb.Binlog{
{
LogPath: metautil.BuildInsertLogPath("a", 0, 100, 0, 1, 801),
LogID: 801,
LogID: 801,
},
{
LogPath: metautil.BuildInsertLogPath("a", 0, 100, 0, 1, 801),
LogID: 801,
LogID: 801,
},
},
},
@ -1333,12 +1321,10 @@ func TestGetRecoveryInfoV2(t *testing.T) {
FieldID: 1,
Binlogs: []*datapb.Binlog{
{
LogPath: metautil.BuildStatsLogPath("a", 0, 100, 0, 1000, 10000),
LogID: 10000,
LogID: 10000,
},
{
LogPath: metautil.BuildStatsLogPath("a", 0, 100, 0, 1000, 10000),
LogID: 10000,
LogID: 10000,
},
},
},

View File

@ -313,6 +313,53 @@ func Test_AddSegments(t *testing.T) {
assert.Equal(t, 4, len(savedKvs))
verifySavedKvsForSegment(t, savedKvs)
})
t.Run("no need to store log path", func(t *testing.T) {
metakv := mocks.NewMetaKv(t)
catalog := NewCatalog(metakv, rootPath, "")
validFieldBinlog := []*datapb.FieldBinlog{{
FieldID: 1,
Binlogs: []*datapb.Binlog{
{
LogID: 1,
LogPath: "",
},
},
}}
invalidFieldBinlog := []*datapb.FieldBinlog{{
FieldID: 1,
Binlogs: []*datapb.Binlog{
{
LogID: 1,
LogPath: "no need to store",
},
},
}}
segment := &datapb.SegmentInfo{
ID: segmentID,
CollectionID: collectionID,
PartitionID: partitionID,
NumOfRows: 100,
State: commonpb.SegmentState_Flushed,
}
segment.Statslogs = invalidFieldBinlog
err := catalog.AddSegment(context.TODO(), segment)
assert.Error(t, err)
segment.Statslogs = validFieldBinlog
segment.Binlogs = invalidFieldBinlog
err = catalog.AddSegment(context.TODO(), segment)
assert.Error(t, err)
segment.Binlogs = validFieldBinlog
segment.Deltalogs = invalidFieldBinlog
err = catalog.AddSegment(context.TODO(), segment)
assert.Error(t, err)
})
}
func Test_AlterSegments(t *testing.T) {

View File

@ -169,6 +169,9 @@ func buildBinlogKvs(collectionID, partitionID, segmentID typeutil.UniqueID, binl
if binlog.GetLogID() == 0 {
return fmt.Errorf("invalid log id, binlog:%v", binlog)
}
if binlog.GetLogPath() != "" {
return fmt.Errorf("fieldBinlog no need to store logpath, binlog:%v", binlog)
}
}
return nil
}