From fc8061bb86de96ff93702a10434fba5fd4ba22f8 Mon Sep 17 00:00:00 2001 From: Enwei Jiao Date: Mon, 26 Sep 2022 17:36:54 +0800 Subject: [PATCH] improve logs to debug growing segment load failed (#19419) Signed-off-by: Enwei Jiao Signed-off-by: Enwei Jiao --- internal/config/file_source.go | 1 + internal/querynode/flow_graph_filter_dm_node.go | 10 ++++++---- internal/querynode/flow_graph_insert_node.go | 13 +++++++------ internal/querynode/segment_loader.go | 11 ++--------- internal/querynode/task.go | 2 ++ internal/util/paramtable/base_table.go | 2 +- 6 files changed, 19 insertions(+), 20 deletions(-) diff --git a/internal/config/file_source.go b/internal/config/file_source.go index 141afce957..ff393231e8 100644 --- a/internal/config/file_source.go +++ b/internal/config/file_source.go @@ -82,6 +82,7 @@ func (fs *FileSource) loadFromFile() error { yamlReader.SetConfigFile(configFile) if err := yamlReader.ReadInConfig(); err != nil { + log.Warn("Read config failed", zap.Error(err)) return err } diff --git a/internal/querynode/flow_graph_filter_dm_node.go b/internal/querynode/flow_graph_filter_dm_node.go index d207591170..9824d9cd70 100644 --- a/internal/querynode/flow_graph_filter_dm_node.go +++ b/internal/querynode/flow_graph_filter_dm_node.go @@ -216,15 +216,17 @@ func (fdmNode *filterDmNode) filterInvalidInsertMessage(msg *msgstream.InsertMsg if segmentInfo.DmlPosition == nil { log.Warn("filter unFlushed segment without checkPoint", zap.String("channel", fdmNode.channel), - zap.Any("collectionID", msg.CollectionID), - zap.Any("partitionID", msg.PartitionID)) + zap.Int64("collectionID", msg.CollectionID), + zap.Int64("partitionID", msg.PartitionID), + zap.Int64("segmentID", msg.SegmentID)) continue } if msg.SegmentID == segmentInfo.ID && msg.EndTs() < segmentInfo.DmlPosition.Timestamp { log.Debug("filter invalid insert message, segments are excluded segments", zap.String("channel", fdmNode.channel), - zap.Any("collectionID", msg.CollectionID), - zap.Any("partitionID", msg.PartitionID)) + zap.Int64("collectionID", msg.CollectionID), + zap.Int64("partitionID", msg.PartitionID), + zap.Int64("segmentID", msg.SegmentID)) return nil, nil } } diff --git a/internal/querynode/flow_graph_insert_node.go b/internal/querynode/flow_graph_insert_node.go index b087f11970..84679ab48e 100644 --- a/internal/querynode/flow_graph_insert_node.go +++ b/internal/querynode/flow_graph_insert_node.go @@ -137,11 +137,12 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { panic(err) } if !has { + log.Info("Add growing segment", zap.Int64("collectionID", insertMsg.CollectionID), zap.Int64("segmentID", insertMsg.SegmentID)) err = iNode.metaReplica.addSegment(insertMsg.SegmentID, insertMsg.PartitionID, insertMsg.CollectionID, insertMsg.ShardName, 0, segmentTypeGrowing) if err != nil { // error occurs when collection or partition cannot be found, collection and partition should be created before err = fmt.Errorf("insertNode addSegment failed, err = %s", err) - log.Error(err.Error(), zap.Int64("collection", iNode.collectionID), zap.String("channel", iNode.channel)) + log.Error(err.Error(), zap.Int64("collectionID", iNode.collectionID), zap.String("channel", iNode.channel)) panic(err) } } @@ -150,7 +151,7 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { if err != nil { // occurs only when schema doesn't have dim param, this should not happen err = fmt.Errorf("failed to transfer msgStream.insertMsg to storage.InsertRecord, err = %s", err) - log.Error(err.Error(), zap.Int64("collection", iNode.collectionID), zap.String("channel", iNode.channel)) + log.Error(err.Error(), zap.Int64("collectionID", iNode.collectionID), zap.String("channel", iNode.channel)) panic(err) } @@ -165,7 +166,7 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { if err != nil { // error occurs when cannot find collection or data is misaligned, should not happen err = fmt.Errorf("failed to get primary keys, err = %d", err) - log.Error(err.Error(), zap.Int64("collection", iNode.collectionID), zap.String("channel", iNode.channel)) + log.Error(err.Error(), zap.Int64("collectionID", iNode.collectionID), zap.String("channel", iNode.channel)) panic(err) } iData.insertPKs[insertMsg.SegmentID] = append(iData.insertPKs[insertMsg.SegmentID], pks...) @@ -177,7 +178,7 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { if err != nil { // should not happen, segment should be created before err = fmt.Errorf("insertNode getSegmentByID failed, err = %s", err) - log.Error(err.Error(), zap.Int64("collection", iNode.collectionID), zap.String("channel", iNode.channel)) + log.Error(err.Error(), zap.Int64("collectionID", iNode.collectionID), zap.String("channel", iNode.channel)) panic(err) } @@ -187,11 +188,11 @@ func (iNode *insertNode) Operate(in []flowgraph.Msg) []flowgraph.Msg { if err != nil { // error occurs when cgo function `PreInsert` failed err = fmt.Errorf("segmentPreInsert failed, segmentID = %d, err = %s", segmentID, err) - log.Error(err.Error(), zap.Int64("collection", iNode.collectionID), zap.String("channel", iNode.channel)) + log.Error(err.Error(), zap.Int64("collectionID", iNode.collectionID), zap.String("channel", iNode.channel)) panic(err) } iData.insertOffset[segmentID] = offset - log.Debug("insertNode operator", zap.Int("insert size", numOfRecords), zap.Int64("insert offset", offset), zap.Int64("segment id", segmentID), zap.Int64("collection", iNode.collectionID), zap.String("channel", iNode.channel)) + log.Debug("insertNode operator", zap.Int("insert size", numOfRecords), zap.Int64("insert offset", offset), zap.Int64("segmentID", segmentID), zap.Int64("collectionID", iNode.collectionID), zap.String("channel", iNode.channel)) targetSegment.updateBloomFilter(iData.insertPKs[segmentID]) } } diff --git a/internal/querynode/segment_loader.go b/internal/querynode/segment_loader.go index b1ef77abd5..4c265e4981 100644 --- a/internal/querynode/segment_loader.go +++ b/internal/querynode/segment_loader.go @@ -87,6 +87,7 @@ func (loader *segmentLoader) LoadSegment(req *querypb.LoadSegmentsRequest, segme return fmt.Errorf("nil base message when load segment, collectionID = %d", req.CollectionID) } + log := log.With(zap.Int64("collectionID", req.CollectionID), zap.String("segmentType", segmentType.String())) // no segment needs to load, return segmentNum := len(req.Infos) @@ -95,10 +96,7 @@ func (loader *segmentLoader) LoadSegment(req *querypb.LoadSegmentsRequest, segme return nil } - log.Info("segmentLoader start loading...", - zap.Any("collectionID", req.CollectionID), - zap.Any("segmentNum", segmentNum), - zap.Any("segmentType", segmentType.String())) + log.Info("segmentLoader start loading...", zap.Any("segmentNum", segmentNum)) // check memory limit min := func(first int, values ...int) int { @@ -152,10 +150,8 @@ func (loader *segmentLoader) LoadSegment(req *querypb.LoadSegmentsRequest, segme segment, err := newSegment(collection, segmentID, partitionID, collectionID, vChannelID, segmentType, req.GetVersion(), loader.cgoPool) if err != nil { log.Error("load segment failed when create new segment", - zap.Int64("collectionID", collectionID), zap.Int64("partitionID", partitionID), zap.Int64("segmentID", segmentID), - zap.String("segmentType", segmentType.String()), zap.Error(err)) segmentGC() return err @@ -166,7 +162,6 @@ func (loader *segmentLoader) LoadSegment(req *querypb.LoadSegmentsRequest, segme loadFileFunc := func(idx int) error { loadInfo := req.Infos[idx] - collectionID := loadInfo.CollectionID partitionID := loadInfo.PartitionID segmentID := loadInfo.SegmentID segment := newSegments[segmentID] @@ -175,10 +170,8 @@ func (loader *segmentLoader) LoadSegment(req *querypb.LoadSegmentsRequest, segme err := loader.loadFiles(segment, loadInfo) if err != nil { log.Error("load segment failed when load data into memory", - zap.Int64("collectionID", collectionID), zap.Int64("partitionID", partitionID), zap.Int64("segmentID", segmentID), - zap.String("segmentType", segmentType.String()), zap.Error(err)) return err } diff --git a/internal/querynode/task.go b/internal/querynode/task.go index f09d045470..e9838202ee 100644 --- a/internal/querynode/task.go +++ b/internal/querynode/task.go @@ -206,6 +206,8 @@ func (w *watchDmChannelsTask) Execute(ctx context.Context) (err error) { InsertChannel: ufInfo.InsertChannel, }) unFlushedSegmentIDs = append(unFlushedSegmentIDs, ufInfo.GetID()) + } else { + log.Info("skip segment which binlog is empty", zap.Int64("segmentID", ufInfo.ID)) } } } diff --git a/internal/util/paramtable/base_table.go b/internal/util/paramtable/base_table.go index 7ac9725226..e297f4b351 100644 --- a/internal/util/paramtable/base_table.go +++ b/internal/util/paramtable/base_table.go @@ -167,7 +167,7 @@ func (gp *BaseTable) initConfPath() string { if err != nil { panic(err) } - configDir = runPath + "/configs/" + configDir = runPath + "/configs" if _, err := os.Stat(configDir); err != nil { _, fpath, _, _ := runtime.Caller(0) configDir = path.Dir(fpath) + "/../../../configs"