mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-02 11:59:00 +08:00
Log segment type as possible in QueryNode (#17846)
Signed-off-by: bigsheeper <yihao.dai@zilliz.com>
This commit is contained in:
parent
6b72f9013f
commit
6b79d4f7bc
@ -603,7 +603,7 @@ func (replica *metaReplica) removeSegmentPrivate(segmentID UniqueID, segType seg
|
||||
deleteSegment(segment)
|
||||
}
|
||||
default:
|
||||
panic("unsupported segment type")
|
||||
panic(fmt.Sprintf("unsupported segment type %s", segType.String()))
|
||||
}
|
||||
|
||||
metrics.QueryNodeNumSegments.WithLabelValues(fmt.Sprint(Params.QueryNodeCfg.GetNodeID())).Dec()
|
||||
|
@ -97,7 +97,11 @@ func (p *Partition) removeSegmentID(segmentID UniqueID, segType segmentType) {
|
||||
default:
|
||||
return
|
||||
}
|
||||
log.Info("remove a segment from replica", zap.Int64("collectionID", p.collectionID), zap.Int64("partitionID", p.partitionID), zap.Int64("segmentID", segmentID))
|
||||
log.Info("remove a segment from replica",
|
||||
zap.Int64("collectionID", p.collectionID),
|
||||
zap.Int64("partitionID", p.partitionID),
|
||||
zap.Int64("segmentID", segmentID),
|
||||
zap.String("segmentType", segType.String()))
|
||||
}
|
||||
|
||||
// newPartition returns a new Partition
|
||||
|
@ -84,7 +84,7 @@ type Segment struct {
|
||||
rmMutex sync.RWMutex // guards recentlyModified
|
||||
recentlyModified bool
|
||||
|
||||
typeMu sync.Mutex // guards builtIndex
|
||||
typeMu sync.RWMutex // guards segmentType
|
||||
segmentType segmentType
|
||||
|
||||
idBinlogRowSizes []int64
|
||||
@ -127,8 +127,8 @@ func (s *Segment) setType(segType segmentType) {
|
||||
}
|
||||
|
||||
func (s *Segment) getType() segmentType {
|
||||
s.typeMu.Lock()
|
||||
defer s.typeMu.Unlock()
|
||||
s.typeMu.RLock()
|
||||
defer s.typeMu.RUnlock()
|
||||
return s.segmentType
|
||||
}
|
||||
|
||||
@ -178,7 +178,7 @@ func newSegment(collection *Collection, segmentID UniqueID, partitionID UniqueID
|
||||
zap.Int64("collectionID", collectionID),
|
||||
zap.Int64("partitionID", partitionID),
|
||||
zap.Int64("segmentID", segmentID),
|
||||
zap.Int32("segment type", int32(segType)),
|
||||
zap.String("segmentType", segType.String()),
|
||||
zap.Error(err))
|
||||
return nil, err
|
||||
}
|
||||
@ -187,7 +187,7 @@ func newSegment(collection *Collection, segmentID UniqueID, partitionID UniqueID
|
||||
zap.Int64("collectionID", collectionID),
|
||||
zap.Int64("partitionID", partitionID),
|
||||
zap.Int64("segmentID", segmentID),
|
||||
zap.Int32("segmentType", int32(segType)))
|
||||
zap.String("segmentType", segType.String()))
|
||||
|
||||
var segment = &Segment{
|
||||
segmentPtr: segmentPtr,
|
||||
@ -217,7 +217,11 @@ func deleteSegment(segment *Segment) {
|
||||
C.DeleteSegment(cPtr)
|
||||
segment.segmentPtr = nil
|
||||
|
||||
log.Info("delete segment from memory", zap.Int64("collectionID", segment.collectionID), zap.Int64("partitionID", segment.partitionID), zap.Int64("segmentID", segment.ID()))
|
||||
log.Info("delete segment from memory",
|
||||
zap.Int64("collectionID", segment.collectionID),
|
||||
zap.Int64("partitionID", segment.partitionID),
|
||||
zap.Int64("segmentID", segment.ID()),
|
||||
zap.String("segmentType", segment.getType().String()))
|
||||
}
|
||||
|
||||
func (s *Segment) getRowCount() int64 {
|
||||
@ -758,7 +762,8 @@ func (s *Segment) segmentLoadDeletedRecord(primaryKeys []primaryKey, timestamps
|
||||
|
||||
log.Info("load deleted record done",
|
||||
zap.Int64("row count", rowCount),
|
||||
zap.Int64("segmentID", s.ID()))
|
||||
zap.Int64("segmentID", s.ID()),
|
||||
zap.String("segmentType", s.getType().String()))
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -86,7 +86,7 @@ func (loader *segmentLoader) LoadSegment(req *querypb.LoadSegmentsRequest, segme
|
||||
log.Info("segmentLoader start loading...",
|
||||
zap.Any("collectionID", req.CollectionID),
|
||||
zap.Any("segmentNum", segmentNum),
|
||||
zap.Any("loadType", segmentType))
|
||||
zap.Any("segmentType", segmentType.String()))
|
||||
|
||||
// check memory limit
|
||||
concurrencyLevel := loader.cpuPool.Cap()
|
||||
@ -134,7 +134,7 @@ func (loader *segmentLoader) LoadSegment(req *querypb.LoadSegmentsRequest, segme
|
||||
zap.Int64("collectionID", collectionID),
|
||||
zap.Int64("partitionID", partitionID),
|
||||
zap.Int64("segmentID", segmentID),
|
||||
zap.Int32("segment type", int32(segmentType)),
|
||||
zap.String("segmentType", segmentType.String()),
|
||||
zap.Error(err))
|
||||
segmentGC()
|
||||
return err
|
||||
@ -157,7 +157,7 @@ func (loader *segmentLoader) LoadSegment(req *querypb.LoadSegmentsRequest, segme
|
||||
zap.Int64("collectionID", collectionID),
|
||||
zap.Int64("partitionID", partitionID),
|
||||
zap.Int64("segmentID", segmentID),
|
||||
zap.Int32("segment type", int32(segmentType)),
|
||||
zap.String("segmentType", segmentType.String()),
|
||||
zap.Error(err))
|
||||
return err
|
||||
}
|
||||
@ -206,7 +206,8 @@ func (loader *segmentLoader) loadSegmentInternal(segment *Segment,
|
||||
log.Info("start loading segment data into memory",
|
||||
zap.Int64("collectionID", collectionID),
|
||||
zap.Int64("partitionID", partitionID),
|
||||
zap.Int64("segmentID", segmentID))
|
||||
zap.Int64("segmentID", segmentID),
|
||||
zap.String("segmentType", segment.getType().String()))
|
||||
|
||||
pkFieldID, err := loader.metaReplica.getPKFieldIDByCollectionID(collectionID)
|
||||
if err != nil {
|
||||
@ -310,7 +311,8 @@ func (loader *segmentLoader) loadGrowingSegmentFields(segment *Segment, fieldBin
|
||||
log.Info("log field binlogs done",
|
||||
zap.Int64("collection", segment.collectionID),
|
||||
zap.Int64("segment", segment.segmentID),
|
||||
zap.Any("field", fieldBinlogs))
|
||||
zap.Any("field", fieldBinlogs),
|
||||
zap.String("segmentType", segmentType.String()))
|
||||
|
||||
_, _, insertData, err := iCodec.Deserialize(blobs)
|
||||
if err != nil {
|
||||
@ -337,7 +339,7 @@ func (loader *segmentLoader) loadGrowingSegmentFields(segment *Segment, fieldBin
|
||||
return loader.loadGrowingSegments(segment, rowIDData.(*storage.Int64FieldData).Data, utss, insertData)
|
||||
|
||||
default:
|
||||
err := fmt.Errorf("illegal segmentType=%v when load segment, collectionID=%v", segmentType, segment.collectionID)
|
||||
err := fmt.Errorf("illegal segmentType=%s when load segment, collectionID=%v", segmentType.String(), segment.collectionID)
|
||||
return err
|
||||
}
|
||||
}
|
||||
@ -359,7 +361,8 @@ func (loader *segmentLoader) loadSealedSegmentFields(segment *Segment, fields []
|
||||
log.Info("log field binlogs done",
|
||||
zap.Int64("collection", segment.collectionID),
|
||||
zap.Int64("segment", segment.segmentID),
|
||||
zap.Any("fields", fields))
|
||||
zap.Any("fields", fields),
|
||||
zap.String("segmentType", segment.getType().String()))
|
||||
|
||||
return nil
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user