diff --git a/internal/indexcoord/index_coord.go b/internal/indexcoord/index_coord.go index d5fecb7bbe..af9d87e866 100644 --- a/internal/indexcoord/index_coord.go +++ b/internal/indexcoord/index_coord.go @@ -174,7 +174,7 @@ func (i *IndexCoord) Init() error { var initErr error i.initOnce.Do(func() { i.UpdateStateCode(commonpb.StateCode_Initializing) - log.Debug("IndexCoord init", zap.Any("stateCode", i.stateCode.Load().(commonpb.StateCode))) + log.Info("IndexCoord init", zap.Any("stateCode", i.stateCode.Load().(commonpb.StateCode))) i.factory.Init(Params) @@ -190,7 +190,7 @@ func (i *IndexCoord) Init() error { i.metaTable, err = NewMetaTable(i.etcdKV) return err } - log.Debug("IndexCoord try to connect etcd") + log.Info("IndexCoord try to connect etcd") err = retry.Do(i.loopCtx, connectEtcdFn, retry.Attempts(100)) if err != nil { log.Error("IndexCoord try to connect etcd failed", zap.Error(err)) @@ -198,11 +198,11 @@ func (i *IndexCoord) Init() error { return } - log.Debug("IndexCoord try to connect etcd success") + log.Info("IndexCoord try to connect etcd success") i.nodeManager = NewNodeManager(i.loopCtx) sessions, revision, err := i.session.GetSessions(typeutil.IndexNodeRole) - log.Debug("IndexCoord", zap.Int("session number", len(sessions)), zap.Int64("revision", revision)) + log.Info("IndexCoord", zap.Int("session number", len(sessions)), zap.Int64("revision", revision)) if err != nil { log.Error("IndexCoord Get IndexNode Sessions error", zap.Error(err)) initErr = err @@ -216,7 +216,7 @@ func (i *IndexCoord) Init() error { initErr = err return } - log.Debug("IndexCoord add node success", zap.String("IndexNode address", Params.IndexCoordCfg.IndexNodeAddress), + log.Info("IndexCoord add node success", zap.String("IndexNode address", Params.IndexCoordCfg.IndexNodeAddress), zap.Int64("nodeID", Params.IndexCoordCfg.IndexNodeID)) aliveNodeID = append(aliveNodeID, Params.IndexCoordCfg.IndexNodeID) metrics.IndexCoordIndexNodeNum.WithLabelValues().Inc() @@ -231,7 +231,7 @@ func (i *IndexCoord) Init() error { aliveNodeID = append(aliveNodeID, session.ServerID) } } - log.Debug("IndexCoord", zap.Int("IndexNode number", len(i.nodeManager.GetAllClients()))) + log.Info("IndexCoord", zap.Int("IndexNode number", len(i.nodeManager.GetAllClients()))) i.indexBuilder = newIndexBuilder(i.loopCtx, i, i.metaTable, aliveNodeID) // TODO silverxia add Rewatch logic @@ -243,7 +243,7 @@ func (i *IndexCoord) Init() error { initErr = err return } - log.Debug("IndexCoord new minio chunkManager success") + log.Info("IndexCoord new minio chunkManager success") i.chunkManager = chunkManager i.garbageCollector = newGarbageCollector(i.loopCtx, i.metaTable, i.chunkManager, i) @@ -260,12 +260,12 @@ func (i *IndexCoord) Init() error { initErr = err return } - log.Debug("IndexCoord new task scheduler success") + log.Info("IndexCoord new task scheduler success") i.metricsCacheManager = metricsinfo.NewMetricsCacheManager() }) - log.Debug("IndexCoord init finished", zap.Error(initErr)) + log.Info("IndexCoord init finished", zap.Error(initErr)) return initErr } @@ -390,7 +390,7 @@ func (i *IndexCoord) isHealthy() bool { // GetComponentStates gets the component states of IndexCoord. func (i *IndexCoord) GetComponentStates(ctx context.Context) (*milvuspb.ComponentStates, error) { - log.Debug("get IndexCoord component states ...") + log.RatedInfo(10, "get IndexCoord component states ...") nodeID := common.NotRegisteredID if i.session != nil && i.session.Registered() { @@ -410,7 +410,7 @@ func (i *IndexCoord) GetComponentStates(ctx context.Context) (*milvuspb.Componen ErrorCode: commonpb.ErrorCode_Success, }, } - log.Debug("IndexCoord GetComponentStates", zap.Any("IndexCoord component state", stateInfo)) + log.RatedInfo(10, "IndexCoord GetComponentStates", zap.Any("IndexCoord component state", stateInfo)) return ret, nil } @@ -438,7 +438,7 @@ func (i *IndexCoord) CreateIndex(ctx context.Context, req *indexpb.CreateIndexRe Reason: msgIndexCoordIsUnhealthy(i.serverID), }, nil } - log.Debug("IndexCoord receive create index request", zap.Int64("CollectionID", req.CollectionID), + log.Info("IndexCoord receive create index request", zap.Int64("CollectionID", req.CollectionID), zap.String("IndexName", req.IndexName), zap.Int64("fieldID", req.FieldID), zap.Any("TypeParams", req.TypeParams), zap.Any("IndexParams", req.IndexParams)) @@ -472,7 +472,6 @@ func (i *IndexCoord) CreateIndex(ctx context.Context, req *indexpb.CreateIndexRe ret.Reason = err.Error() return ret, nil } - log.Debug("IndexCoord create index enqueue successfully", zap.Int64("IndexID", t.indexID)) err = t.WaitToFinish() if err != nil { @@ -483,13 +482,15 @@ func (i *IndexCoord) CreateIndex(ctx context.Context, req *indexpb.CreateIndexRe return ret, nil } + log.Info("IndexCoord CreateIndex successfully", zap.Int64("IndexID", t.indexID)) + ret.ErrorCode = commonpb.ErrorCode_Success return ret, nil } // GetIndexState gets the index state of the index name in the request from Proxy. func (i *IndexCoord) GetIndexState(ctx context.Context, req *indexpb.GetIndexStateRequest) (*indexpb.GetIndexStateResponse, error) { - log.Info("IndexCoord get index state", zap.Int64("collectionID", req.CollectionID), + log.RatedInfo(10, "IndexCoord get index state", zap.Int64("collectionID", req.CollectionID), zap.String("indexName", req.IndexName)) if !i.isHealthy() { @@ -527,21 +528,21 @@ func (i *IndexCoord) GetIndexState(ctx context.Context, req *indexpb.GetIndexSta if state.state != commonpb.IndexState_Finished { ret.State = state.state ret.FailReason = state.failReason - log.Info("IndexCoord get index state success", zap.Int64("collectionID", req.CollectionID), + log.RatedInfo(10, "IndexCoord get index state success", zap.Int64("collectionID", req.CollectionID), zap.String("indexName", req.IndexName), zap.String("state", ret.State.String())) return ret, nil } } } - log.Info("IndexCoord get index state success", zap.Int64("collectionID", req.CollectionID), + log.RatedInfo(10, "IndexCoord get index state success", zap.Int64("collectionID", req.CollectionID), zap.String("indexName", req.IndexName), zap.String("state", ret.State.String())) return ret, nil } func (i *IndexCoord) GetSegmentIndexState(ctx context.Context, req *indexpb.GetSegmentIndexStateRequest) (*indexpb.GetSegmentIndexStateResponse, error) { - log.Info("IndexCoord get index state", zap.Int64("collectionID", req.CollectionID), - zap.String("indexName", req.IndexName)) + log.RatedInfo(5, "IndexCoord GetSegmentIndexState", zap.Int64("collectionID", req.CollectionID), + zap.String("indexName", req.IndexName), zap.Int64s("segIDs", req.GetSegmentIDs())) if !i.isHealthy() { log.Warn(msgIndexCoordIsUnhealthy(i.serverID)) @@ -579,6 +580,8 @@ func (i *IndexCoord) GetSegmentIndexState(ctx context.Context, req *indexpb.GetS FailReason: state.failReason, }) } + log.RatedInfo(5, "IndexCoord GetSegmentIndexState successfully", zap.Int64("collectionID", req.CollectionID), + zap.String("indexName", req.IndexName)) return ret, nil } @@ -586,7 +589,7 @@ func (i *IndexCoord) GetSegmentIndexState(ctx context.Context, req *indexpb.GetS func (i *IndexCoord) completeIndexInfo(ctx context.Context, indexInfo *indexpb.IndexInfo) error { collectionID := indexInfo.CollectionID indexName := indexInfo.IndexName - log.Info("IndexCoord completeIndexInfo", zap.Int64("collID", collectionID), + log.RatedDebug(5, "IndexCoord completeIndexInfo", zap.Int64("collID", collectionID), zap.String("indexName", indexName)) calculateTotalRow := func() (int64, error) { @@ -651,15 +654,15 @@ func (i *IndexCoord) completeIndexInfo(ctx context.Context, indexInfo *indexpb.I } - log.Debug("IndexCoord completeIndexInfo success", zap.Int64("collID", collectionID), + log.RatedDebug(5, "IndexCoord completeIndexInfo success", zap.Int64("collID", collectionID), zap.Int64("totalRows", indexInfo.TotalRows), zap.Int64("indexRows", indexInfo.IndexedRows), - zap.Any("state", indexInfo.State), zap.String("failReason", indexInfo.IndexStateFailReason)) + zap.String("state", indexInfo.State.String()), zap.String("failReason", indexInfo.IndexStateFailReason)) return nil } // GetIndexBuildProgress get the index building progress by num rows. func (i *IndexCoord) GetIndexBuildProgress(ctx context.Context, req *indexpb.GetIndexBuildProgressRequest) (*indexpb.GetIndexBuildProgressResponse, error) { - log.Info("IndexCoord receive GetIndexBuildProgress request", zap.Int64("collID", req.CollectionID), + log.RatedInfo(5, "IndexCoord receive GetIndexBuildProgress request", zap.Int64("collID", req.CollectionID), zap.String("indexName", req.IndexName)) if !i.isHealthy() { log.Warn(msgIndexCoordIsUnhealthy(i.serverID)) @@ -717,7 +720,7 @@ func (i *IndexCoord) GetIndexBuildProgress(ctx context.Context, req *indexpb.Get break } - log.Debug("IndexCoord get index build progress success", zap.Int64("collID", req.CollectionID), + log.RatedInfo(5, "IndexCoord get index build progress success", zap.Int64("collID", req.CollectionID), zap.Int64("totalRows", totalRows), zap.Int64("indexRows", indexRows), zap.Int("seg num", len(resp.Infos))) return &indexpb.GetIndexBuildProgressResponse{ Status: &commonpb.Status{ @@ -799,7 +802,7 @@ func (i *IndexCoord) DropIndex(ctx context.Context, req *indexpb.DropIndexReques // GetIndexInfos gets the index file paths from IndexCoord. func (i *IndexCoord) GetIndexInfos(ctx context.Context, req *indexpb.GetIndexInfoRequest) (*indexpb.GetIndexInfoResponse, error) { - log.Debug("IndexCoord GetIndexInfos", zap.Int64("collectionID", req.CollectionID), + log.RatedInfo(5, "IndexCoord GetIndexInfos", zap.Int64("collectionID", req.CollectionID), zap.String("indexName", req.GetIndexName()), zap.Int64s("segIDs", req.GetSegmentIDs())) if !i.isHealthy() { log.Warn(msgIndexCoordIsUnhealthy(i.serverID)) @@ -851,12 +854,15 @@ func (i *IndexCoord) GetIndexInfos(ctx context.Context, req *indexpb.GetIndexInf } } + log.RatedInfo(5, "IndexCoord GetIndexInfos successfully", zap.Int64("collectionID", req.CollectionID), + zap.String("indexName", req.GetIndexName())) + return ret, nil } // DescribeIndex describe the index info of the collection. func (i *IndexCoord) DescribeIndex(ctx context.Context, req *indexpb.DescribeIndexRequest) (*indexpb.DescribeIndexResponse, error) { - log.Debug("IndexCoord DescribeIndex", zap.Int64("collectionID", req.CollectionID), zap.String("indexName", req.GetIndexName())) + log.RatedInfo(5, "IndexCoord DescribeIndex", zap.Int64("collectionID", req.CollectionID), zap.String("indexName", req.GetIndexName())) if !i.isHealthy() { log.Warn(msgIndexCoordIsUnhealthy(i.serverID)) return &indexpb.DescribeIndexResponse{ @@ -902,6 +908,8 @@ func (i *IndexCoord) DescribeIndex(ctx context.Context, req *indexpb.DescribeInd indexInfos = append(indexInfos, indexInfo) } + log.RatedInfo(5, "IndexCoord DescribeIndex", zap.Int64("collectionID", req.CollectionID), + zap.Any("indexInfos", indexInfos)) return &indexpb.DescribeIndexResponse{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_Success, @@ -912,7 +920,7 @@ func (i *IndexCoord) DescribeIndex(ctx context.Context, req *indexpb.DescribeInd // ShowConfigurations returns the configurations of indexCoord matching req.Pattern func (i *IndexCoord) ShowConfigurations(ctx context.Context, req *internalpb.ShowConfigurationsRequest) (*internalpb.ShowConfigurationsResponse, error) { - log.Debug("IndexCoord.ShowConfigurations", zap.String("pattern", req.Pattern)) + log.Info("IndexCoord.ShowConfigurations", zap.String("pattern", req.Pattern)) if !i.isHealthy() { log.Warn("IndexCoord.ShowConfigurations failed", zap.Int64("nodeId", i.serverID), @@ -933,7 +941,7 @@ func (i *IndexCoord) ShowConfigurations(ctx context.Context, req *internalpb.Sho // GetMetrics gets the metrics info of IndexCoord. func (i *IndexCoord) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) { - log.Debug("IndexCoord.GetMetrics", zap.Int64("node id", i.serverID), zap.String("req", req.Request)) + log.RatedInfo(5, "IndexCoord.GetMetrics", zap.Int64("node id", i.serverID), zap.String("req", req.Request)) if !i.isHealthy() { log.Warn(msgIndexCoordIsUnhealthy(i.serverID)) @@ -988,7 +996,7 @@ func (i *IndexCoord) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsReq return metrics, nil } - log.Debug("IndexCoord.GetMetrics failed, request metric type is not implemented yet", + log.Warn("IndexCoord.GetMetrics failed, request metric type is not implemented yet", zap.Int64("node id", i.session.ServerID), zap.String("req", req.Request), zap.String("metric type", metricType)) @@ -1029,6 +1037,7 @@ func (i *IndexCoord) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthR err := group.Wait() if err != nil || len(errReasons) != 0 { + log.RatedInfo(5, "IndexCoord CheckHealth successfully", zap.Bool("isHealthy", false)) return &milvuspb.CheckHealthResponse{IsHealthy: false, Reasons: errReasons}, nil } @@ -1046,7 +1055,7 @@ func (i *IndexCoord) watchNodeLoop() { defer cancel() defer i.loopWg.Done() - log.Debug("IndexCoord watchNodeLoop start") + log.Info("IndexCoord watchNodeLoop start") for { select { @@ -1067,11 +1076,10 @@ func (i *IndexCoord) watchNodeLoop() { if Params.IndexCoordCfg.BindIndexNodeMode { continue } - log.Debug("IndexCoord watchNodeLoop event updated") switch event.EventType { case sessionutil.SessionAddEvent: serverID := event.Session.ServerID - log.Debug("IndexCoord watchNodeLoop SessionAddEvent", zap.Int64("serverID", serverID), + log.Info("IndexCoord watchNodeLoop SessionAddEvent", zap.Int64("serverID", serverID), zap.String("address", event.Session.Address)) go func() { err := i.nodeManager.AddNode(serverID, event.Session.Address) @@ -1082,7 +1090,7 @@ func (i *IndexCoord) watchNodeLoop() { i.metricsCacheManager.InvalidateSystemInfoMetrics() case sessionutil.SessionDelEvent: serverID := event.Session.ServerID - log.Debug("IndexCoord watchNodeLoop SessionDelEvent", zap.Int64("serverID", serverID)) + log.Info("IndexCoord watchNodeLoop SessionDelEvent", zap.Int64("serverID", serverID)) i.nodeManager.RemoveNode(serverID) // remove tasks on nodeID i.indexBuilder.nodeDown(serverID) @@ -1095,7 +1103,7 @@ func (i *IndexCoord) watchNodeLoop() { func (i *IndexCoord) tryAcquireSegmentReferLock(ctx context.Context, buildID UniqueID, nodeID UniqueID, segIDs []UniqueID) error { // IndexCoord use buildID instead of taskID. log.Info("try to acquire segment reference lock", zap.Int64("buildID", buildID), - zap.Int64("ndoeID", nodeID), zap.Int64s("segIDs", segIDs)) + zap.Int64("nodeID", nodeID), zap.Int64s("segIDs", segIDs)) ctx1, cancel := context.WithTimeout(ctx, reqTimeoutInterval) defer cancel() status, err := i.dataCoordClient.AcquireSegmentLock(ctx1, &datapb.AcquireSegmentLockRequest{ @@ -1119,6 +1127,8 @@ func (i *IndexCoord) tryAcquireSegmentReferLock(ctx context.Context, buildID Uni } func (i *IndexCoord) tryReleaseSegmentReferLock(ctx context.Context, buildID UniqueID, nodeID UniqueID) error { + log.Info("IndexCoord tryReleaseSegmentReferLock", zap.Int64("buildID", buildID), + zap.Int64("nodeID", nodeID)) releaseLock := func() error { ctx1, cancel := context.WithTimeout(ctx, reqTimeoutInterval) defer cancel() @@ -1140,12 +1150,15 @@ func (i *IndexCoord) tryReleaseSegmentReferLock(ctx context.Context, buildID Uni zap.Int64("nodeID", nodeID), zap.Error(err)) return err } + log.Info("IndexCoord tryReleaseSegmentReferLock successfully", zap.Int64("buildID", buildID), + zap.Int64("ndoeID", nodeID)) return nil } // assignTask sends the index task to the IndexNode, it has a timeout interval, if the IndexNode doesn't respond within // the interval, it is considered that the task sending failed. func (i *IndexCoord) assignTask(builderClient types.IndexNode, req *indexpb.CreateJobRequest) error { + log.Info("IndexCoord assignTask", zap.Int64("buildID", req.GetBuildID())) ctx, cancel := context.WithTimeout(i.loopCtx, i.reqTimeoutInterval) defer cancel() resp, err := builderClient.CreateJob(ctx, req) @@ -1158,6 +1171,7 @@ func (i *IndexCoord) assignTask(builderClient types.IndexNode, req *indexpb.Crea log.Error("IndexCoord assignmentTasksLoop builderClient.CreateIndex failed", zap.String("Reason", resp.Reason)) return errors.New(resp.Reason) } + log.Info("IndexCoord assignTask successfully", zap.Int64("buildID", req.GetBuildID())) return nil } @@ -1195,8 +1209,6 @@ func (i *IndexCoord) createIndexForSegment(segIdx *model.SegmentIndex) (bool, Un zap.Int64("segID", segIdx.SegmentID), zap.Error(err)) return false, 0, err } - log.Debug("IndexCoord createIndex Enqueue successfully", zap.Int64("collID", segIdx.CollectionID), - zap.Int64("segID", segIdx.SegmentID), zap.Int64("IndexBuildID", t.segmentIndex.BuildID)) err = t.WaitToFinish() if err != nil { @@ -1205,12 +1217,13 @@ func (i *IndexCoord) createIndexForSegment(segIdx *model.SegmentIndex) (bool, Un return false, 0, err } metrics.IndexCoordIndexRequestCounter.WithLabelValues(metrics.SuccessLabel).Inc() - + log.Debug("IndexCoord create index for segment successfully", zap.Int64("collID", segIdx.CollectionID), + zap.Int64("segID", segIdx.SegmentID), zap.Int64("IndexBuildID", t.segmentIndex.BuildID)) return false, t.segmentIndex.BuildID, nil } func (i *IndexCoord) watchFlushedSegmentLoop() { - log.Info("IndexCoord start watching flushed segments...") + log.Info("IndexCoord watchFlushedSegmentLoop start...") defer i.loopWg.Done() watchChan := i.etcdKV.WatchWithRevision(util.FlushedSegmentPrefix, i.flushedSegmentWatcher.etcdRevision+1) @@ -1256,12 +1269,12 @@ func (i *IndexCoord) watchFlushedSegmentLoop() { segmentInfo.ID = segID } - log.Debug("watchFlushedSegmentLoop watch event", + log.Info("watchFlushedSegmentLoop watch event", zap.Int64("segID", segmentInfo.GetID()), zap.Any("isFake", segmentInfo.GetIsFake())) i.flushedSegmentWatcher.enqueueInternalTask(segmentInfo) case mvccpb.DELETE: - log.Debug("the segment info has been deleted", zap.String("key", string(event.Kv.Key))) + log.Info("the segment info has been deleted", zap.String("key", string(event.Kv.Key))) } } } @@ -1269,6 +1282,7 @@ func (i *IndexCoord) watchFlushedSegmentLoop() { } func (i *IndexCoord) pullSegmentInfo(ctx context.Context, segmentID UniqueID) (*datapb.SegmentInfo, error) { + log.Debug("pullSegmentInfo", zap.Int64("segID", segmentID)) ctx1, cancel := context.WithTimeout(ctx, reqTimeoutInterval) defer cancel() resp, err := i.dataCoordClient.GetSegmentInfo(ctx1, &datapb.GetSegmentInfoRequest{ @@ -1289,6 +1303,7 @@ func (i *IndexCoord) pullSegmentInfo(ctx context.Context, segmentID UniqueID) (* } for _, info := range resp.Infos { if info.ID == segmentID { + log.Debug("pullSegmentInfo successfully", zap.Int64("segID", segmentID)) return info, nil } } diff --git a/internal/indexcoord/util_test.go b/internal/indexcoord/util_test.go index b0ad70d659..bb23ae5bd5 100644 --- a/internal/indexcoord/util_test.go +++ b/internal/indexcoord/util_test.go @@ -19,10 +19,11 @@ package indexcoord import ( "testing" + "github.com/stretchr/testify/assert" + "github.com/milvus-io/milvus-proto/go-api/commonpb" "github.com/milvus-io/milvus-proto/go-api/schemapb" "github.com/milvus-io/milvus/internal/proto/indexpb" - "github.com/stretchr/testify/assert" ) func Test_getDimension(t *testing.T) { diff --git a/internal/indexnode/indexnode.go b/internal/indexnode/indexnode.go index 44aba3c42e..ce0397d756 100644 --- a/internal/indexnode/indexnode.go +++ b/internal/indexnode/indexnode.go @@ -180,14 +180,14 @@ func (i *IndexNode) Init() error { var initErr error i.initOnce.Do(func() { i.UpdateStateCode(commonpb.StateCode_Initializing) - log.Debug("IndexNode init", zap.Any("State", i.stateCode.Load().(commonpb.StateCode))) + log.Info("IndexNode init", zap.Any("State", i.stateCode.Load().(commonpb.StateCode))) err := i.initSession() if err != nil { log.Error(err.Error()) initErr = err return } - log.Debug("IndexNode init session successful", zap.Int64("serverID", i.session.ServerID)) + log.Info("IndexNode init session successful", zap.Int64("serverID", i.session.ServerID)) if err != nil { log.Error("IndexNode NewMinIOKV failed", zap.Error(err)) @@ -195,13 +195,13 @@ func (i *IndexNode) Init() error { return } - log.Debug("IndexNode NewMinIOKV succeeded") + log.Info("IndexNode NewMinIOKV succeeded") i.closer = trace.InitTracing("index_node") i.initKnowhere() }) - log.Debug("Init IndexNode finished", zap.Error(initErr)) + log.Info("Init IndexNode finished", zap.Error(initErr)) return initErr } @@ -216,10 +216,10 @@ func (i *IndexNode) Start() error { Params.IndexNodeCfg.UpdatedTime = time.Now() i.UpdateStateCode(commonpb.StateCode_Healthy) - log.Debug("IndexNode", zap.Any("State", i.stateCode.Load())) + log.Info("IndexNode", zap.Any("State", i.stateCode.Load())) }) - log.Debug("IndexNode start finished", zap.Error(startErr)) + log.Info("IndexNode start finished", zap.Error(startErr)) return startErr } @@ -240,7 +240,7 @@ func (i *IndexNode) Stop() error { } i.session.Revoke(time.Second) - log.Debug("Index node stopped.") + log.Info("Index node stopped.") return nil } @@ -259,84 +259,9 @@ func (i *IndexNode) isHealthy() bool { return code == commonpb.StateCode_Healthy } -//// BuildIndex receives request from IndexCoordinator to build an index. -//// Index building is asynchronous, so when an index building request comes, IndexNode records the task and returns. -//func (i *IndexNode) BuildIndex(ctx context.Context, request *indexpb.BuildIndexRequest) (*commonpb.Status, error) { -// if i.stateCode.Load().(commonpb.StateCode) != commonpb.StateCode_Healthy { -// return &commonpb.Status{ -// ErrorCode: commonpb.ErrorCode_UnexpectedError, -// Reason: "state code is not healthy", -// }, nil -// } -// log.Info("IndexNode building index ...", -// zap.Int64("clusterID", request.ClusterID), -// zap.Int64("IndexBuildID", request.IndexBuildID), -// zap.Int64("Version", request.IndexVersion), -// zap.Int("binlog paths num", len(request.DataPaths)), -// zap.Any("TypeParams", request.TypeParams), -// zap.Any("IndexParams", request.IndexParams)) -// -// sp, ctx2 := trace.StartSpanFromContextWithOperationName(i.loopCtx, "IndexNode-CreateIndex") -// defer sp.Finish() -// sp.SetTag("IndexBuildID", strconv.FormatInt(request.IndexBuildID, 10)) -// metrics.IndexNodeBuildIndexTaskCounter.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.TotalLabel).Inc() -// -// t := &IndexBuildTask{ -// BaseTask: BaseTask{ -// ctx: ctx2, -// done: make(chan error), -// }, -// req: request, -// cm: i.chunkManager, -// etcdKV: i.etcdKV, -// nodeID: paramtable.GetNodeID(), -// serializedSize: 0, -// } -// -// ret := &commonpb.Status{ -// ErrorCode: commonpb.ErrorCode_Success, -// } -// -// err := i.sched.IndexBuildQueue.Enqueue(t) -// if err != nil { -// log.Warn("IndexNode failed to schedule", zap.Int64("indexBuildID", request.IndexBuildID), zap.Error(err)) -// ret.ErrorCode = commonpb.ErrorCode_UnexpectedError -// ret.Reason = err.Error() -// metrics.IndexNodeBuildIndexTaskCounter.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.FailLabel).Inc() -// return ret, nil -// } -// log.Info("IndexNode successfully scheduled", zap.Int64("indexBuildID", request.IndexBuildID)) -// -// metrics.IndexNodeBuildIndexTaskCounter.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.SuccessLabel).Inc() -// return ret, nil -//} -// -//// GetTaskSlots gets how many task the IndexNode can still perform. -//func (i *IndexNode) GetTaskSlots(ctx context.Context, req *indexpb.GetTaskSlotsRequest) (*indexpb.GetTaskSlotsResponse, error) { -// if i.stateCode.Load().(commonpb.StateCode) != commonpb.StateCode_Healthy { -// return &indexpb.GetTaskSlotsResponse{ -// Status: &commonpb.Status{ -// ErrorCode: commonpb.ErrorCode_UnexpectedError, -// Reason: "state code is not healthy", -// }, -// }, nil -// } -// -// log.Info("IndexNode GetTaskSlots received") -// ret := &indexpb.GetTaskSlotsResponse{ -// Status: &commonpb.Status{ -// ErrorCode: commonpb.ErrorCode_Success, -// }, -// } -// -// ret.Slots = int64(i.sched.GetTaskSlots()) -// log.Info("IndexNode GetTaskSlots success", zap.Int64("slots", ret.Slots)) -// return ret, nil -//} - // GetComponentStates gets the component states of IndexNode. func (i *IndexNode) GetComponentStates(ctx context.Context) (*milvuspb.ComponentStates, error) { - log.Debug("get IndexNode components states ...") + log.RatedInfo(10, "get IndexNode components states ...") nodeID := common.NotRegisteredID if i.session != nil && i.session.Registered() { nodeID = i.session.ServerID @@ -356,7 +281,7 @@ func (i *IndexNode) GetComponentStates(ctx context.Context) (*milvuspb.Component }, } - log.Debug("IndexNode Component states", + log.RatedInfo(10, "IndexNode Component states", zap.Any("State", ret.State), zap.Any("Status", ret.Status), zap.Any("SubcomponentStates", ret.SubcomponentStates)) @@ -365,7 +290,7 @@ func (i *IndexNode) GetComponentStates(ctx context.Context) (*milvuspb.Component // GetTimeTickChannel gets the time tick channel of IndexNode. func (i *IndexNode) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) { - log.Debug("get IndexNode time tick channel ...") + log.RatedInfo(10, "get IndexNode time tick channel ...") return &milvuspb.StringResponse{ Status: &commonpb.Status{ @@ -376,7 +301,7 @@ func (i *IndexNode) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringRes // GetStatisticsChannel gets the statistics channel of IndexNode. func (i *IndexNode) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) { - log.Debug("get IndexNode statistics channel ...") + log.RatedInfo(10, "get IndexNode statistics channel ...") return &milvuspb.StringResponse{ Status: &commonpb.Status{ ErrorCode: commonpb.ErrorCode_Success, @@ -411,63 +336,3 @@ func (i *IndexNode) ShowConfigurations(ctx context.Context, req *internalpb.Show func (i *IndexNode) SetAddress(address string) { i.address = address } - -//// GetMetrics gets the metrics info of IndexNode. -//// TODO(dragondriver): cache the Metrics and set a retention to the cache -//func (i *IndexNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) { -// if !i.isHealthy() { -// log.Warn("IndexNode.GetMetrics failed", -// zap.Int64("node_id", paramtable.GetNodeID()), -// zap.String("req", req.Request), -// zap.Error(errIndexNodeIsUnhealthy(paramtable.GetNodeID()))) -// -// return &milvuspb.GetMetricsResponse{ -// Status: &commonpb.Status{ -// ErrorCode: commonpb.ErrorCode_UnexpectedError, -// Reason: msgIndexNodeIsUnhealthy(paramtable.GetNodeID()), -// }, -// Response: "", -// }, nil -// } -// -// metricType, err := metricsinfo.ParseMetricType(req.Request) -// if err != nil { -// log.Warn("IndexNode.GetMetrics failed to parse metric type", -// zap.Int64("node_id", paramtable.GetNodeID()), -// zap.String("req", req.Request), -// zap.Error(err)) -// -// return &milvuspb.GetMetricsResponse{ -// Status: &commonpb.Status{ -// ErrorCode: commonpb.ErrorCode_UnexpectedError, -// Reason: err.Error(), -// }, -// Response: "", -// }, nil -// } -// -// if metricType == metricsinfo.SystemInfoMetrics { -// metrics, err := getSystemInfoMetrics(ctx, req, i) -// -// log.Debug("IndexNode.GetMetrics", -// zap.Int64("node_id", paramtable.GetNodeID()), -// zap.String("req", req.Request), -// zap.String("metric_type", metricType), -// zap.Error(err)) -// -// return metrics, nil -// } -// -// log.Warn("IndexNode.GetMetrics failed, request metric type is not implemented yet", -// zap.Int64("node_id", paramtable.GetNodeID()), -// zap.String("req", req.Request), -// zap.String("metric_type", metricType)) -// -// return &milvuspb.GetMetricsResponse{ -// Status: &commonpb.Status{ -// ErrorCode: commonpb.ErrorCode_UnexpectedError, -// Reason: metricsinfo.MsgUnimplementedMetric, -// }, -// Response: "", -// }, nil -//} diff --git a/internal/indexnode/indexnode_service.go b/internal/indexnode/indexnode_service.go index 94c010367d..e0cbcb835b 100644 --- a/internal/indexnode/indexnode_service.go +++ b/internal/indexnode/indexnode_service.go @@ -1,3 +1,19 @@ +// Licensed to the LF AI & Data foundation under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package indexnode import ( @@ -136,7 +152,7 @@ func (i *IndexNode) QueryJobs(ctx context.Context, req *indexpb.QueryJobsRequest ret.IndexInfos[i].IndexFileKeys = info.fileKeys ret.IndexInfos[i].SerializedSize = info.serializedSize ret.IndexInfos[i].FailReason = info.failReason - log.Ctx(ctx).Debug("querying index build task", zap.String("ClusterID", req.ClusterID), + log.RatedDebug(5, "querying index build task", zap.String("ClusterID", req.ClusterID), zap.Int64("IndexBuildID", buildID), zap.String("state", info.state.String()), zap.String("fail reason", info.failReason)) } @@ -145,7 +161,7 @@ func (i *IndexNode) QueryJobs(ctx context.Context, req *indexpb.QueryJobsRequest } func (i *IndexNode) DropJobs(ctx context.Context, req *indexpb.DropJobsRequest) (*commonpb.Status, error) { - log.Ctx(ctx).Debug("drop index build jobs", zap.String("ClusterID", req.ClusterID), zap.Int64s("IndexBuildIDs", req.BuildIDs)) + log.Ctx(ctx).Info("drop index build jobs", zap.String("ClusterID", req.ClusterID), zap.Int64s("IndexBuildIDs", req.BuildIDs)) stateCode := i.stateCode.Load().(commonpb.StateCode) if stateCode != commonpb.StateCode_Healthy { log.Ctx(ctx).Warn("index node not ready", zap.Int32("state", int32(stateCode)), zap.String("ClusterID", req.ClusterID)) @@ -164,7 +180,7 @@ func (i *IndexNode) DropJobs(ctx context.Context, req *indexpb.DropJobsRequest) info.cancel() } } - log.Ctx(ctx).Debug("drop index build jobs success", zap.String("ClusterID", req.ClusterID), + log.Ctx(ctx).Info("drop index build jobs success", zap.String("ClusterID", req.ClusterID), zap.Int64s("IndexBuildIDs", req.BuildIDs)) return &commonpb.Status{ ErrorCode: commonpb.ErrorCode_Success, diff --git a/internal/indexnode/task.go b/internal/indexnode/task.go index 8fa03e4baf..0894cb930f 100644 --- a/internal/indexnode/task.go +++ b/internal/indexnode/task.go @@ -38,7 +38,6 @@ import ( "github.com/milvus-io/milvus/internal/util/indexcgowrapper" "github.com/milvus-io/milvus/internal/util/indexparamcheck" "github.com/milvus-io/milvus/internal/util/indexparams" - "github.com/milvus-io/milvus/internal/util/logutil" "github.com/milvus-io/milvus/internal/util/metautil" "github.com/milvus-io/milvus/internal/util/paramtable" "github.com/milvus-io/milvus/internal/util/retry" @@ -141,12 +140,13 @@ func (it *indexBuildTask) GetState() commonpb.IndexState { func (it *indexBuildTask) OnEnqueue(ctx context.Context) error { it.statistic.StartTime = time.Now().UnixMicro() it.statistic.PodID = it.node.GetNodeID() - log.Ctx(ctx).Debug("IndexNode IndexBuilderTask Enqueue") + log.Ctx(ctx).Info("IndexNode IndexBuilderTask Enqueue", zap.Int64("buildID", it.BuildID), zap.Int64("segID", it.segmentID)) return nil } func (it *indexBuildTask) Prepare(ctx context.Context) error { - logutil.Logger(ctx).Info("Begin to prepare indexBuildTask", zap.Int64("buildID", it.BuildID), zap.Int64("Collection", it.collectionID), zap.Int64("SegmentIf", it.segmentID)) + log.Ctx(ctx).Info("Begin to prepare indexBuildTask", zap.Int64("buildID", it.BuildID), + zap.Int64("Collection", it.collectionID), zap.Int64("SegmentID", it.segmentID)) typeParams := make(map[string]string) indexParams := make(map[string]string) @@ -173,7 +173,8 @@ func (it *indexBuildTask) Prepare(ctx context.Context) error { // ignore error } } - logutil.Logger(ctx).Info("Successfully prepare indexBuildTask", zap.Int64("buildID", it.BuildID), zap.Int64("Collection", it.collectionID), zap.Int64("SegmentIf", it.segmentID)) + log.Ctx(ctx).Info("Successfully prepare indexBuildTask", zap.Int64("buildID", it.BuildID), + zap.Int64("Collection", it.collectionID), zap.Int64("SegmentIf", it.segmentID)) return nil } @@ -226,9 +227,11 @@ func (it *indexBuildTask) LoadData(ctx context.Context) error { err = it.decodeBlobs(ctx, blobs) if err != nil { - logutil.Logger(ctx).Info("failed to decode blobs", zap.Int64("buildID", it.BuildID), zap.Int64("Collection", it.collectionID), zap.Int64("SegmentIf", it.segmentID), zap.Error(err)) + log.Ctx(ctx).Info("failed to decode blobs", zap.Int64("buildID", it.BuildID), + zap.Int64("Collection", it.collectionID), zap.Int64("SegmentIf", it.segmentID), zap.Error(err)) } else { - logutil.Logger(ctx).Info("Successfully load data", zap.Int64("buildID", it.BuildID), zap.Int64("Collection", it.collectionID), zap.Int64("SegmentIf", it.segmentID)) + log.Ctx(ctx).Info("Successfully load data", zap.Int64("buildID", it.BuildID), + zap.Int64("Collection", it.collectionID), zap.Int64("SegmentIf", it.segmentID)) } return err } @@ -298,7 +301,8 @@ func (it *indexBuildTask) BuildIndex(ctx context.Context) error { encodeIndexFileDur := it.tr.Record("index codec serialize done") metrics.IndexNodeEncodeIndexFileLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Observe(float64(encodeIndexFileDur.Milliseconds())) it.indexBlobs = serializedIndexBlobs - logutil.Logger(ctx).Info("Successfully build index", zap.Int64("buildID", it.BuildID), zap.Int64("Collection", it.collectionID), zap.Int64("SegmentID", it.segmentID)) + log.Ctx(ctx).Info("Successfully build index", zap.Int64("buildID", it.BuildID), + zap.Int64("Collection", it.collectionID), zap.Int64("SegmentID", it.segmentID)) return nil } @@ -437,7 +441,7 @@ func (it *indexBuildTask) SaveIndexFiles(ctx context.Context) error { it.savePaths = savePaths it.statistic.EndTime = time.Now().UnixMicro() it.node.storeIndexFilesAndStatistic(it.ClusterID, it.BuildID, saveFileKeys, it.serializedSize, &it.statistic) - log.Ctx(ctx).Debug("save index files done", zap.Strings("IndexFiles", savePaths)) + log.Ctx(ctx).Info("save index files done", zap.Strings("IndexFiles", savePaths)) saveIndexFileDur := it.tr.Record("index file save done") metrics.IndexNodeSaveIndexFileLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Observe(float64(saveIndexFileDur.Milliseconds())) it.tr.Elapse("index building all done") @@ -497,7 +501,7 @@ func (it *indexBuildTask) SaveDiskAnnIndexFiles(ctx context.Context) error { it.statistic.EndTime = time.Now().UnixMicro() it.node.storeIndexFilesAndStatistic(it.ClusterID, it.BuildID, saveFileKeys, it.serializedSize, &it.statistic) - log.Ctx(ctx).Debug("save index files done", zap.Strings("IndexFiles", savePaths)) + log.Ctx(ctx).Info("save index files done", zap.Strings("IndexFiles", savePaths)) saveIndexFileDur := it.tr.Record("index file save done") metrics.IndexNodeSaveIndexFileLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Observe(float64(saveIndexFileDur.Milliseconds())) it.tr.Elapse("index building all done") @@ -522,7 +526,7 @@ func (it *indexBuildTask) decodeBlobs(ctx context.Context, blobs []*storage.Blob it.partitionID = partitionID it.segmentID = segmentID - log.Ctx(ctx).Debug("indexnode deserialize data success", + log.Ctx(ctx).Info("indexnode deserialize data success", zap.Int64("index id", it.req.IndexID), zap.String("index name", it.req.IndexName), zap.Int64("collectionID", it.collectionID),