Revert DML quota check (#19790)

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>

Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
bigsheeper 2022-10-14 17:17:23 +08:00 committed by GitHub
parent 8b8df0a5e9
commit bdaddd7b09
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 0 additions and 34 deletions

View File

@ -477,18 +477,9 @@ func (m *meta) UpdateFlushSegmentsInfo(
s.NumOfRows = cp.GetNumOfRows()
modSegments[cp.GetSegmentID()] = s
}
var totalSize int64
segments := make([]*datapb.SegmentInfo, 0, len(modSegments))
for _, seg := range modSegments {
segments = append(segments, seg.SegmentInfo)
totalSize += seg.getSegmentSize()
}
// check disk quota
for _, seg := range m.segments.GetSegments() {
totalSize += seg.getSegmentSize()
}
if float64(totalSize) >= Params.QuotaConfig.DiskQuota {
return fmt.Errorf("UpdateFlushSegmentsInfo failed: disk quota exceeds if update, segID = %d", segmentID)
}
if err := m.catalog.AlterSegments(m.ctx, segments); err != nil {
log.Error("meta update: update flush segments info - failed to store flush segment info into Etcd",

View File

@ -514,7 +514,6 @@ func TestGetUnFlushedSegments(t *testing.T) {
}
func TestUpdateFlushSegmentsInfo(t *testing.T) {
Params.Init()
t.Run("normal", func(t *testing.T) {
meta, err := newMeta(context.TODO(), memkv.NewMemoryKV(), "")
assert.Nil(t, err)
@ -591,30 +590,6 @@ func TestUpdateFlushSegmentsInfo(t *testing.T) {
assert.Nil(t, segmentInfo.Binlogs)
assert.Nil(t, segmentInfo.StartPosition)
})
t.Run("test exceed disk quota", func(t *testing.T) {
meta, err := newMeta(context.TODO(), memkv.NewMemoryKV(), "")
assert.Nil(t, err)
diskQuotaBackup := Params.QuotaConfig.DiskQuota
const (
diskQuota = 5 * 1024 * 1024 * 1024
segmentSize = 3 * 1024 * 1024 * 1024
)
Params.QuotaConfig.DiskQuota = diskQuota
segment1 := &SegmentInfo{SegmentInfo: &datapb.SegmentInfo{ID: 1, State: commonpb.SegmentState_Growing,
Binlogs: []*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{{EntriesNum: 1, TimestampFrom: 100, TimestampTo: 200, LogSize: segmentSize}}}},
Statslogs: []*datapb.FieldBinlog{getFieldBinlogPaths(1, "statslog0")}}}
err = meta.AddSegment(segment1)
assert.Nil(t, err)
err = meta.UpdateFlushSegmentsInfo(1, true, false, true, []*datapb.FieldBinlog{getFieldBinlogPaths(1, "binlog1")},
[]*datapb.FieldBinlog{getFieldBinlogPaths(1, "statslog1")},
[]*datapb.FieldBinlog{{Binlogs: []*datapb.Binlog{{EntriesNum: 1, TimestampFrom: 100, TimestampTo: 200, LogSize: segmentSize}}}},
[]*datapb.CheckPoint{{SegmentID: 1, NumOfRows: 10}}, []*datapb.SegmentStartPosition{{SegmentID: 1, StartPosition: &internalpb.MsgPosition{MsgID: []byte{1, 2, 3}}}})
assert.Error(t, err)
Params.QuotaConfig.DiskQuota = diskQuotaBackup
})
}
func TestSaveHandoffMeta(t *testing.T) {