Ignore log number check for compaction (#24835)

Signed-off-by: Congqi Xia <congqi.xia@zilliz.com>
This commit is contained in:
congqixia 2023-06-13 20:04:38 +08:00 committed by GitHub
parent 57fa11dfab
commit 5659a3b728
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -371,7 +371,7 @@ func (kc *Catalog) AlterSegmentsAndAddNewSegment(ctx context.Context, segments [
// convert to new format that include segment key and three binlog keys,
// or GC can not find data path on the storage.
if !hasBinlogkeys {
binlogsKvs, err := buildBinlogKvsWithLogID(noBinlogsSegment.CollectionID, noBinlogsSegment.PartitionID, noBinlogsSegment.ID, binlogs, deltalogs, statslogs)
binlogsKvs, err := buildBinlogKvsWithLogID(noBinlogsSegment.CollectionID, noBinlogsSegment.PartitionID, noBinlogsSegment.ID, binlogs, deltalogs, statslogs, true)
if err != nil {
return err
}
@ -837,14 +837,14 @@ func hasSepcialStatslog(logs *datapb.FieldBinlog) bool {
}
func buildBinlogKvsWithLogID(collectionID, partitionID, segmentID typeutil.UniqueID,
binlogs, deltalogs, statslogs []*datapb.FieldBinlog) (map[string]string, error) {
binlogs, deltalogs, statslogs []*datapb.FieldBinlog, ignoreNumberCheck bool) (map[string]string, error) {
checkBinlogs(storage.InsertBinlog, segmentID, binlogs)
checkBinlogs(storage.DeleteBinlog, segmentID, deltalogs)
checkBinlogs(storage.StatsBinlog, segmentID, statslogs)
// check stats log and bin log size match
// num of stats log may one more than num of binlogs if segment flushed and merged stats log
if len(binlogs) != 0 && len(statslogs) != 0 && !hasSepcialStatslog(statslogs[0]) {
if !ignoreNumberCheck && len(binlogs) != 0 && len(statslogs) != 0 && !hasSepcialStatslog(statslogs[0]) {
if len(binlogs[0].GetBinlogs()) != len(statslogs[0].GetBinlogs()) {
log.Warn("find invalid segment while bin log size didn't match stat log size",
zap.Int64("collection", collectionID),
@ -873,8 +873,10 @@ func buildSegmentAndBinlogsKvs(segment *datapb.SegmentInfo) (map[string]string,
// `segment` is not mutated above. Also, `noBinlogsSegment` is a cloned version of `segment`.
segmentutil.ReCalcRowCount(segment, noBinlogsSegment)
// compacted segment has only one statslog
ignore := (len(segment.GetCompactionFrom()) > 0)
// save binlogs separately
kvs, err := buildBinlogKvsWithLogID(noBinlogsSegment.CollectionID, noBinlogsSegment.PartitionID, noBinlogsSegment.ID, binlogs, deltalogs, statslogs)
kvs, err := buildBinlogKvsWithLogID(noBinlogsSegment.CollectionID, noBinlogsSegment.PartitionID, noBinlogsSegment.ID, binlogs, deltalogs, statslogs, ignore)
if err != nil {
return nil, err
}