Make index log better (#20327)

Signed-off-by: cai.zhang <cai.zhang@zilliz.com>

Signed-off-by: cai.zhang <cai.zhang@zilliz.com>
This commit is contained in:
cai.zhang 2022-11-07 14:23:02 +08:00 committed by GitHub
parent 0d18336b21
commit 8ea8f91334
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 100 additions and 199 deletions

View File

@ -174,7 +174,7 @@ func (i *IndexCoord) Init() error {
var initErr error
i.initOnce.Do(func() {
i.UpdateStateCode(commonpb.StateCode_Initializing)
log.Debug("IndexCoord init", zap.Any("stateCode", i.stateCode.Load().(commonpb.StateCode)))
log.Info("IndexCoord init", zap.Any("stateCode", i.stateCode.Load().(commonpb.StateCode)))
i.factory.Init(Params)
@ -190,7 +190,7 @@ func (i *IndexCoord) Init() error {
i.metaTable, err = NewMetaTable(i.etcdKV)
return err
}
log.Debug("IndexCoord try to connect etcd")
log.Info("IndexCoord try to connect etcd")
err = retry.Do(i.loopCtx, connectEtcdFn, retry.Attempts(100))
if err != nil {
log.Error("IndexCoord try to connect etcd failed", zap.Error(err))
@ -198,11 +198,11 @@ func (i *IndexCoord) Init() error {
return
}
log.Debug("IndexCoord try to connect etcd success")
log.Info("IndexCoord try to connect etcd success")
i.nodeManager = NewNodeManager(i.loopCtx)
sessions, revision, err := i.session.GetSessions(typeutil.IndexNodeRole)
log.Debug("IndexCoord", zap.Int("session number", len(sessions)), zap.Int64("revision", revision))
log.Info("IndexCoord", zap.Int("session number", len(sessions)), zap.Int64("revision", revision))
if err != nil {
log.Error("IndexCoord Get IndexNode Sessions error", zap.Error(err))
initErr = err
@ -216,7 +216,7 @@ func (i *IndexCoord) Init() error {
initErr = err
return
}
log.Debug("IndexCoord add node success", zap.String("IndexNode address", Params.IndexCoordCfg.IndexNodeAddress),
log.Info("IndexCoord add node success", zap.String("IndexNode address", Params.IndexCoordCfg.IndexNodeAddress),
zap.Int64("nodeID", Params.IndexCoordCfg.IndexNodeID))
aliveNodeID = append(aliveNodeID, Params.IndexCoordCfg.IndexNodeID)
metrics.IndexCoordIndexNodeNum.WithLabelValues().Inc()
@ -231,7 +231,7 @@ func (i *IndexCoord) Init() error {
aliveNodeID = append(aliveNodeID, session.ServerID)
}
}
log.Debug("IndexCoord", zap.Int("IndexNode number", len(i.nodeManager.GetAllClients())))
log.Info("IndexCoord", zap.Int("IndexNode number", len(i.nodeManager.GetAllClients())))
i.indexBuilder = newIndexBuilder(i.loopCtx, i, i.metaTable, aliveNodeID)
// TODO silverxia add Rewatch logic
@ -243,7 +243,7 @@ func (i *IndexCoord) Init() error {
initErr = err
return
}
log.Debug("IndexCoord new minio chunkManager success")
log.Info("IndexCoord new minio chunkManager success")
i.chunkManager = chunkManager
i.garbageCollector = newGarbageCollector(i.loopCtx, i.metaTable, i.chunkManager, i)
@ -260,12 +260,12 @@ func (i *IndexCoord) Init() error {
initErr = err
return
}
log.Debug("IndexCoord new task scheduler success")
log.Info("IndexCoord new task scheduler success")
i.metricsCacheManager = metricsinfo.NewMetricsCacheManager()
})
log.Debug("IndexCoord init finished", zap.Error(initErr))
log.Info("IndexCoord init finished", zap.Error(initErr))
return initErr
}
@ -390,7 +390,7 @@ func (i *IndexCoord) isHealthy() bool {
// GetComponentStates gets the component states of IndexCoord.
func (i *IndexCoord) GetComponentStates(ctx context.Context) (*milvuspb.ComponentStates, error) {
log.Debug("get IndexCoord component states ...")
log.RatedInfo(10, "get IndexCoord component states ...")
nodeID := common.NotRegisteredID
if i.session != nil && i.session.Registered() {
@ -410,7 +410,7 @@ func (i *IndexCoord) GetComponentStates(ctx context.Context) (*milvuspb.Componen
ErrorCode: commonpb.ErrorCode_Success,
},
}
log.Debug("IndexCoord GetComponentStates", zap.Any("IndexCoord component state", stateInfo))
log.RatedInfo(10, "IndexCoord GetComponentStates", zap.Any("IndexCoord component state", stateInfo))
return ret, nil
}
@ -438,7 +438,7 @@ func (i *IndexCoord) CreateIndex(ctx context.Context, req *indexpb.CreateIndexRe
Reason: msgIndexCoordIsUnhealthy(i.serverID),
}, nil
}
log.Debug("IndexCoord receive create index request", zap.Int64("CollectionID", req.CollectionID),
log.Info("IndexCoord receive create index request", zap.Int64("CollectionID", req.CollectionID),
zap.String("IndexName", req.IndexName), zap.Int64("fieldID", req.FieldID),
zap.Any("TypeParams", req.TypeParams),
zap.Any("IndexParams", req.IndexParams))
@ -472,7 +472,6 @@ func (i *IndexCoord) CreateIndex(ctx context.Context, req *indexpb.CreateIndexRe
ret.Reason = err.Error()
return ret, nil
}
log.Debug("IndexCoord create index enqueue successfully", zap.Int64("IndexID", t.indexID))
err = t.WaitToFinish()
if err != nil {
@ -483,13 +482,15 @@ func (i *IndexCoord) CreateIndex(ctx context.Context, req *indexpb.CreateIndexRe
return ret, nil
}
log.Info("IndexCoord CreateIndex successfully", zap.Int64("IndexID", t.indexID))
ret.ErrorCode = commonpb.ErrorCode_Success
return ret, nil
}
// GetIndexState gets the index state of the index name in the request from Proxy.
func (i *IndexCoord) GetIndexState(ctx context.Context, req *indexpb.GetIndexStateRequest) (*indexpb.GetIndexStateResponse, error) {
log.Info("IndexCoord get index state", zap.Int64("collectionID", req.CollectionID),
log.RatedInfo(10, "IndexCoord get index state", zap.Int64("collectionID", req.CollectionID),
zap.String("indexName", req.IndexName))
if !i.isHealthy() {
@ -527,21 +528,21 @@ func (i *IndexCoord) GetIndexState(ctx context.Context, req *indexpb.GetIndexSta
if state.state != commonpb.IndexState_Finished {
ret.State = state.state
ret.FailReason = state.failReason
log.Info("IndexCoord get index state success", zap.Int64("collectionID", req.CollectionID),
log.RatedInfo(10, "IndexCoord get index state success", zap.Int64("collectionID", req.CollectionID),
zap.String("indexName", req.IndexName), zap.String("state", ret.State.String()))
return ret, nil
}
}
}
log.Info("IndexCoord get index state success", zap.Int64("collectionID", req.CollectionID),
log.RatedInfo(10, "IndexCoord get index state success", zap.Int64("collectionID", req.CollectionID),
zap.String("indexName", req.IndexName), zap.String("state", ret.State.String()))
return ret, nil
}
func (i *IndexCoord) GetSegmentIndexState(ctx context.Context, req *indexpb.GetSegmentIndexStateRequest) (*indexpb.GetSegmentIndexStateResponse, error) {
log.Info("IndexCoord get index state", zap.Int64("collectionID", req.CollectionID),
zap.String("indexName", req.IndexName))
log.RatedInfo(5, "IndexCoord GetSegmentIndexState", zap.Int64("collectionID", req.CollectionID),
zap.String("indexName", req.IndexName), zap.Int64s("segIDs", req.GetSegmentIDs()))
if !i.isHealthy() {
log.Warn(msgIndexCoordIsUnhealthy(i.serverID))
@ -579,6 +580,8 @@ func (i *IndexCoord) GetSegmentIndexState(ctx context.Context, req *indexpb.GetS
FailReason: state.failReason,
})
}
log.RatedInfo(5, "IndexCoord GetSegmentIndexState successfully", zap.Int64("collectionID", req.CollectionID),
zap.String("indexName", req.IndexName))
return ret, nil
}
@ -586,7 +589,7 @@ func (i *IndexCoord) GetSegmentIndexState(ctx context.Context, req *indexpb.GetS
func (i *IndexCoord) completeIndexInfo(ctx context.Context, indexInfo *indexpb.IndexInfo) error {
collectionID := indexInfo.CollectionID
indexName := indexInfo.IndexName
log.Info("IndexCoord completeIndexInfo", zap.Int64("collID", collectionID),
log.RatedDebug(5, "IndexCoord completeIndexInfo", zap.Int64("collID", collectionID),
zap.String("indexName", indexName))
calculateTotalRow := func() (int64, error) {
@ -651,15 +654,15 @@ func (i *IndexCoord) completeIndexInfo(ctx context.Context, indexInfo *indexpb.I
}
log.Debug("IndexCoord completeIndexInfo success", zap.Int64("collID", collectionID),
log.RatedDebug(5, "IndexCoord completeIndexInfo success", zap.Int64("collID", collectionID),
zap.Int64("totalRows", indexInfo.TotalRows), zap.Int64("indexRows", indexInfo.IndexedRows),
zap.Any("state", indexInfo.State), zap.String("failReason", indexInfo.IndexStateFailReason))
zap.String("state", indexInfo.State.String()), zap.String("failReason", indexInfo.IndexStateFailReason))
return nil
}
// GetIndexBuildProgress get the index building progress by num rows.
func (i *IndexCoord) GetIndexBuildProgress(ctx context.Context, req *indexpb.GetIndexBuildProgressRequest) (*indexpb.GetIndexBuildProgressResponse, error) {
log.Info("IndexCoord receive GetIndexBuildProgress request", zap.Int64("collID", req.CollectionID),
log.RatedInfo(5, "IndexCoord receive GetIndexBuildProgress request", zap.Int64("collID", req.CollectionID),
zap.String("indexName", req.IndexName))
if !i.isHealthy() {
log.Warn(msgIndexCoordIsUnhealthy(i.serverID))
@ -717,7 +720,7 @@ func (i *IndexCoord) GetIndexBuildProgress(ctx context.Context, req *indexpb.Get
break
}
log.Debug("IndexCoord get index build progress success", zap.Int64("collID", req.CollectionID),
log.RatedInfo(5, "IndexCoord get index build progress success", zap.Int64("collID", req.CollectionID),
zap.Int64("totalRows", totalRows), zap.Int64("indexRows", indexRows), zap.Int("seg num", len(resp.Infos)))
return &indexpb.GetIndexBuildProgressResponse{
Status: &commonpb.Status{
@ -799,7 +802,7 @@ func (i *IndexCoord) DropIndex(ctx context.Context, req *indexpb.DropIndexReques
// GetIndexInfos gets the index file paths from IndexCoord.
func (i *IndexCoord) GetIndexInfos(ctx context.Context, req *indexpb.GetIndexInfoRequest) (*indexpb.GetIndexInfoResponse, error) {
log.Debug("IndexCoord GetIndexInfos", zap.Int64("collectionID", req.CollectionID),
log.RatedInfo(5, "IndexCoord GetIndexInfos", zap.Int64("collectionID", req.CollectionID),
zap.String("indexName", req.GetIndexName()), zap.Int64s("segIDs", req.GetSegmentIDs()))
if !i.isHealthy() {
log.Warn(msgIndexCoordIsUnhealthy(i.serverID))
@ -851,12 +854,15 @@ func (i *IndexCoord) GetIndexInfos(ctx context.Context, req *indexpb.GetIndexInf
}
}
log.RatedInfo(5, "IndexCoord GetIndexInfos successfully", zap.Int64("collectionID", req.CollectionID),
zap.String("indexName", req.GetIndexName()))
return ret, nil
}
// DescribeIndex describe the index info of the collection.
func (i *IndexCoord) DescribeIndex(ctx context.Context, req *indexpb.DescribeIndexRequest) (*indexpb.DescribeIndexResponse, error) {
log.Debug("IndexCoord DescribeIndex", zap.Int64("collectionID", req.CollectionID), zap.String("indexName", req.GetIndexName()))
log.RatedInfo(5, "IndexCoord DescribeIndex", zap.Int64("collectionID", req.CollectionID), zap.String("indexName", req.GetIndexName()))
if !i.isHealthy() {
log.Warn(msgIndexCoordIsUnhealthy(i.serverID))
return &indexpb.DescribeIndexResponse{
@ -902,6 +908,8 @@ func (i *IndexCoord) DescribeIndex(ctx context.Context, req *indexpb.DescribeInd
indexInfos = append(indexInfos, indexInfo)
}
log.RatedInfo(5, "IndexCoord DescribeIndex", zap.Int64("collectionID", req.CollectionID),
zap.Any("indexInfos", indexInfos))
return &indexpb.DescribeIndexResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
@ -912,7 +920,7 @@ func (i *IndexCoord) DescribeIndex(ctx context.Context, req *indexpb.DescribeInd
// ShowConfigurations returns the configurations of indexCoord matching req.Pattern
func (i *IndexCoord) ShowConfigurations(ctx context.Context, req *internalpb.ShowConfigurationsRequest) (*internalpb.ShowConfigurationsResponse, error) {
log.Debug("IndexCoord.ShowConfigurations", zap.String("pattern", req.Pattern))
log.Info("IndexCoord.ShowConfigurations", zap.String("pattern", req.Pattern))
if !i.isHealthy() {
log.Warn("IndexCoord.ShowConfigurations failed",
zap.Int64("nodeId", i.serverID),
@ -933,7 +941,7 @@ func (i *IndexCoord) ShowConfigurations(ctx context.Context, req *internalpb.Sho
// GetMetrics gets the metrics info of IndexCoord.
func (i *IndexCoord) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
log.Debug("IndexCoord.GetMetrics", zap.Int64("node id", i.serverID), zap.String("req", req.Request))
log.RatedInfo(5, "IndexCoord.GetMetrics", zap.Int64("node id", i.serverID), zap.String("req", req.Request))
if !i.isHealthy() {
log.Warn(msgIndexCoordIsUnhealthy(i.serverID))
@ -988,7 +996,7 @@ func (i *IndexCoord) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsReq
return metrics, nil
}
log.Debug("IndexCoord.GetMetrics failed, request metric type is not implemented yet",
log.Warn("IndexCoord.GetMetrics failed, request metric type is not implemented yet",
zap.Int64("node id", i.session.ServerID),
zap.String("req", req.Request),
zap.String("metric type", metricType))
@ -1029,6 +1037,7 @@ func (i *IndexCoord) CheckHealth(ctx context.Context, req *milvuspb.CheckHealthR
err := group.Wait()
if err != nil || len(errReasons) != 0 {
log.RatedInfo(5, "IndexCoord CheckHealth successfully", zap.Bool("isHealthy", false))
return &milvuspb.CheckHealthResponse{IsHealthy: false, Reasons: errReasons}, nil
}
@ -1046,7 +1055,7 @@ func (i *IndexCoord) watchNodeLoop() {
defer cancel()
defer i.loopWg.Done()
log.Debug("IndexCoord watchNodeLoop start")
log.Info("IndexCoord watchNodeLoop start")
for {
select {
@ -1067,11 +1076,10 @@ func (i *IndexCoord) watchNodeLoop() {
if Params.IndexCoordCfg.BindIndexNodeMode {
continue
}
log.Debug("IndexCoord watchNodeLoop event updated")
switch event.EventType {
case sessionutil.SessionAddEvent:
serverID := event.Session.ServerID
log.Debug("IndexCoord watchNodeLoop SessionAddEvent", zap.Int64("serverID", serverID),
log.Info("IndexCoord watchNodeLoop SessionAddEvent", zap.Int64("serverID", serverID),
zap.String("address", event.Session.Address))
go func() {
err := i.nodeManager.AddNode(serverID, event.Session.Address)
@ -1082,7 +1090,7 @@ func (i *IndexCoord) watchNodeLoop() {
i.metricsCacheManager.InvalidateSystemInfoMetrics()
case sessionutil.SessionDelEvent:
serverID := event.Session.ServerID
log.Debug("IndexCoord watchNodeLoop SessionDelEvent", zap.Int64("serverID", serverID))
log.Info("IndexCoord watchNodeLoop SessionDelEvent", zap.Int64("serverID", serverID))
i.nodeManager.RemoveNode(serverID)
// remove tasks on nodeID
i.indexBuilder.nodeDown(serverID)
@ -1095,7 +1103,7 @@ func (i *IndexCoord) watchNodeLoop() {
func (i *IndexCoord) tryAcquireSegmentReferLock(ctx context.Context, buildID UniqueID, nodeID UniqueID, segIDs []UniqueID) error {
// IndexCoord use buildID instead of taskID.
log.Info("try to acquire segment reference lock", zap.Int64("buildID", buildID),
zap.Int64("ndoeID", nodeID), zap.Int64s("segIDs", segIDs))
zap.Int64("nodeID", nodeID), zap.Int64s("segIDs", segIDs))
ctx1, cancel := context.WithTimeout(ctx, reqTimeoutInterval)
defer cancel()
status, err := i.dataCoordClient.AcquireSegmentLock(ctx1, &datapb.AcquireSegmentLockRequest{
@ -1119,6 +1127,8 @@ func (i *IndexCoord) tryAcquireSegmentReferLock(ctx context.Context, buildID Uni
}
func (i *IndexCoord) tryReleaseSegmentReferLock(ctx context.Context, buildID UniqueID, nodeID UniqueID) error {
log.Info("IndexCoord tryReleaseSegmentReferLock", zap.Int64("buildID", buildID),
zap.Int64("nodeID", nodeID))
releaseLock := func() error {
ctx1, cancel := context.WithTimeout(ctx, reqTimeoutInterval)
defer cancel()
@ -1140,12 +1150,15 @@ func (i *IndexCoord) tryReleaseSegmentReferLock(ctx context.Context, buildID Uni
zap.Int64("nodeID", nodeID), zap.Error(err))
return err
}
log.Info("IndexCoord tryReleaseSegmentReferLock successfully", zap.Int64("buildID", buildID),
zap.Int64("ndoeID", nodeID))
return nil
}
// assignTask sends the index task to the IndexNode, it has a timeout interval, if the IndexNode doesn't respond within
// the interval, it is considered that the task sending failed.
func (i *IndexCoord) assignTask(builderClient types.IndexNode, req *indexpb.CreateJobRequest) error {
log.Info("IndexCoord assignTask", zap.Int64("buildID", req.GetBuildID()))
ctx, cancel := context.WithTimeout(i.loopCtx, i.reqTimeoutInterval)
defer cancel()
resp, err := builderClient.CreateJob(ctx, req)
@ -1158,6 +1171,7 @@ func (i *IndexCoord) assignTask(builderClient types.IndexNode, req *indexpb.Crea
log.Error("IndexCoord assignmentTasksLoop builderClient.CreateIndex failed", zap.String("Reason", resp.Reason))
return errors.New(resp.Reason)
}
log.Info("IndexCoord assignTask successfully", zap.Int64("buildID", req.GetBuildID()))
return nil
}
@ -1195,8 +1209,6 @@ func (i *IndexCoord) createIndexForSegment(segIdx *model.SegmentIndex) (bool, Un
zap.Int64("segID", segIdx.SegmentID), zap.Error(err))
return false, 0, err
}
log.Debug("IndexCoord createIndex Enqueue successfully", zap.Int64("collID", segIdx.CollectionID),
zap.Int64("segID", segIdx.SegmentID), zap.Int64("IndexBuildID", t.segmentIndex.BuildID))
err = t.WaitToFinish()
if err != nil {
@ -1205,12 +1217,13 @@ func (i *IndexCoord) createIndexForSegment(segIdx *model.SegmentIndex) (bool, Un
return false, 0, err
}
metrics.IndexCoordIndexRequestCounter.WithLabelValues(metrics.SuccessLabel).Inc()
log.Debug("IndexCoord create index for segment successfully", zap.Int64("collID", segIdx.CollectionID),
zap.Int64("segID", segIdx.SegmentID), zap.Int64("IndexBuildID", t.segmentIndex.BuildID))
return false, t.segmentIndex.BuildID, nil
}
func (i *IndexCoord) watchFlushedSegmentLoop() {
log.Info("IndexCoord start watching flushed segments...")
log.Info("IndexCoord watchFlushedSegmentLoop start...")
defer i.loopWg.Done()
watchChan := i.etcdKV.WatchWithRevision(util.FlushedSegmentPrefix, i.flushedSegmentWatcher.etcdRevision+1)
@ -1256,12 +1269,12 @@ func (i *IndexCoord) watchFlushedSegmentLoop() {
segmentInfo.ID = segID
}
log.Debug("watchFlushedSegmentLoop watch event",
log.Info("watchFlushedSegmentLoop watch event",
zap.Int64("segID", segmentInfo.GetID()),
zap.Any("isFake", segmentInfo.GetIsFake()))
i.flushedSegmentWatcher.enqueueInternalTask(segmentInfo)
case mvccpb.DELETE:
log.Debug("the segment info has been deleted", zap.String("key", string(event.Kv.Key)))
log.Info("the segment info has been deleted", zap.String("key", string(event.Kv.Key)))
}
}
}
@ -1269,6 +1282,7 @@ func (i *IndexCoord) watchFlushedSegmentLoop() {
}
func (i *IndexCoord) pullSegmentInfo(ctx context.Context, segmentID UniqueID) (*datapb.SegmentInfo, error) {
log.Debug("pullSegmentInfo", zap.Int64("segID", segmentID))
ctx1, cancel := context.WithTimeout(ctx, reqTimeoutInterval)
defer cancel()
resp, err := i.dataCoordClient.GetSegmentInfo(ctx1, &datapb.GetSegmentInfoRequest{
@ -1289,6 +1303,7 @@ func (i *IndexCoord) pullSegmentInfo(ctx context.Context, segmentID UniqueID) (*
}
for _, info := range resp.Infos {
if info.ID == segmentID {
log.Debug("pullSegmentInfo successfully", zap.Int64("segID", segmentID))
return info, nil
}
}

View File

@ -19,10 +19,11 @@ package indexcoord
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/milvus-io/milvus-proto/go-api/commonpb"
"github.com/milvus-io/milvus-proto/go-api/schemapb"
"github.com/milvus-io/milvus/internal/proto/indexpb"
"github.com/stretchr/testify/assert"
)
func Test_getDimension(t *testing.T) {

View File

@ -180,14 +180,14 @@ func (i *IndexNode) Init() error {
var initErr error
i.initOnce.Do(func() {
i.UpdateStateCode(commonpb.StateCode_Initializing)
log.Debug("IndexNode init", zap.Any("State", i.stateCode.Load().(commonpb.StateCode)))
log.Info("IndexNode init", zap.Any("State", i.stateCode.Load().(commonpb.StateCode)))
err := i.initSession()
if err != nil {
log.Error(err.Error())
initErr = err
return
}
log.Debug("IndexNode init session successful", zap.Int64("serverID", i.session.ServerID))
log.Info("IndexNode init session successful", zap.Int64("serverID", i.session.ServerID))
if err != nil {
log.Error("IndexNode NewMinIOKV failed", zap.Error(err))
@ -195,13 +195,13 @@ func (i *IndexNode) Init() error {
return
}
log.Debug("IndexNode NewMinIOKV succeeded")
log.Info("IndexNode NewMinIOKV succeeded")
i.closer = trace.InitTracing("index_node")
i.initKnowhere()
})
log.Debug("Init IndexNode finished", zap.Error(initErr))
log.Info("Init IndexNode finished", zap.Error(initErr))
return initErr
}
@ -216,10 +216,10 @@ func (i *IndexNode) Start() error {
Params.IndexNodeCfg.UpdatedTime = time.Now()
i.UpdateStateCode(commonpb.StateCode_Healthy)
log.Debug("IndexNode", zap.Any("State", i.stateCode.Load()))
log.Info("IndexNode", zap.Any("State", i.stateCode.Load()))
})
log.Debug("IndexNode start finished", zap.Error(startErr))
log.Info("IndexNode start finished", zap.Error(startErr))
return startErr
}
@ -240,7 +240,7 @@ func (i *IndexNode) Stop() error {
}
i.session.Revoke(time.Second)
log.Debug("Index node stopped.")
log.Info("Index node stopped.")
return nil
}
@ -259,84 +259,9 @@ func (i *IndexNode) isHealthy() bool {
return code == commonpb.StateCode_Healthy
}
//// BuildIndex receives request from IndexCoordinator to build an index.
//// Index building is asynchronous, so when an index building request comes, IndexNode records the task and returns.
//func (i *IndexNode) BuildIndex(ctx context.Context, request *indexpb.BuildIndexRequest) (*commonpb.Status, error) {
// if i.stateCode.Load().(commonpb.StateCode) != commonpb.StateCode_Healthy {
// return &commonpb.Status{
// ErrorCode: commonpb.ErrorCode_UnexpectedError,
// Reason: "state code is not healthy",
// }, nil
// }
// log.Info("IndexNode building index ...",
// zap.Int64("clusterID", request.ClusterID),
// zap.Int64("IndexBuildID", request.IndexBuildID),
// zap.Int64("Version", request.IndexVersion),
// zap.Int("binlog paths num", len(request.DataPaths)),
// zap.Any("TypeParams", request.TypeParams),
// zap.Any("IndexParams", request.IndexParams))
//
// sp, ctx2 := trace.StartSpanFromContextWithOperationName(i.loopCtx, "IndexNode-CreateIndex")
// defer sp.Finish()
// sp.SetTag("IndexBuildID", strconv.FormatInt(request.IndexBuildID, 10))
// metrics.IndexNodeBuildIndexTaskCounter.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.TotalLabel).Inc()
//
// t := &IndexBuildTask{
// BaseTask: BaseTask{
// ctx: ctx2,
// done: make(chan error),
// },
// req: request,
// cm: i.chunkManager,
// etcdKV: i.etcdKV,
// nodeID: paramtable.GetNodeID(),
// serializedSize: 0,
// }
//
// ret := &commonpb.Status{
// ErrorCode: commonpb.ErrorCode_Success,
// }
//
// err := i.sched.IndexBuildQueue.Enqueue(t)
// if err != nil {
// log.Warn("IndexNode failed to schedule", zap.Int64("indexBuildID", request.IndexBuildID), zap.Error(err))
// ret.ErrorCode = commonpb.ErrorCode_UnexpectedError
// ret.Reason = err.Error()
// metrics.IndexNodeBuildIndexTaskCounter.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.FailLabel).Inc()
// return ret, nil
// }
// log.Info("IndexNode successfully scheduled", zap.Int64("indexBuildID", request.IndexBuildID))
//
// metrics.IndexNodeBuildIndexTaskCounter.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10), metrics.SuccessLabel).Inc()
// return ret, nil
//}
//
//// GetTaskSlots gets how many task the IndexNode can still perform.
//func (i *IndexNode) GetTaskSlots(ctx context.Context, req *indexpb.GetTaskSlotsRequest) (*indexpb.GetTaskSlotsResponse, error) {
// if i.stateCode.Load().(commonpb.StateCode) != commonpb.StateCode_Healthy {
// return &indexpb.GetTaskSlotsResponse{
// Status: &commonpb.Status{
// ErrorCode: commonpb.ErrorCode_UnexpectedError,
// Reason: "state code is not healthy",
// },
// }, nil
// }
//
// log.Info("IndexNode GetTaskSlots received")
// ret := &indexpb.GetTaskSlotsResponse{
// Status: &commonpb.Status{
// ErrorCode: commonpb.ErrorCode_Success,
// },
// }
//
// ret.Slots = int64(i.sched.GetTaskSlots())
// log.Info("IndexNode GetTaskSlots success", zap.Int64("slots", ret.Slots))
// return ret, nil
//}
// GetComponentStates gets the component states of IndexNode.
func (i *IndexNode) GetComponentStates(ctx context.Context) (*milvuspb.ComponentStates, error) {
log.Debug("get IndexNode components states ...")
log.RatedInfo(10, "get IndexNode components states ...")
nodeID := common.NotRegisteredID
if i.session != nil && i.session.Registered() {
nodeID = i.session.ServerID
@ -356,7 +281,7 @@ func (i *IndexNode) GetComponentStates(ctx context.Context) (*milvuspb.Component
},
}
log.Debug("IndexNode Component states",
log.RatedInfo(10, "IndexNode Component states",
zap.Any("State", ret.State),
zap.Any("Status", ret.Status),
zap.Any("SubcomponentStates", ret.SubcomponentStates))
@ -365,7 +290,7 @@ func (i *IndexNode) GetComponentStates(ctx context.Context) (*milvuspb.Component
// GetTimeTickChannel gets the time tick channel of IndexNode.
func (i *IndexNode) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
log.Debug("get IndexNode time tick channel ...")
log.RatedInfo(10, "get IndexNode time tick channel ...")
return &milvuspb.StringResponse{
Status: &commonpb.Status{
@ -376,7 +301,7 @@ func (i *IndexNode) GetTimeTickChannel(ctx context.Context) (*milvuspb.StringRes
// GetStatisticsChannel gets the statistics channel of IndexNode.
func (i *IndexNode) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringResponse, error) {
log.Debug("get IndexNode statistics channel ...")
log.RatedInfo(10, "get IndexNode statistics channel ...")
return &milvuspb.StringResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,
@ -411,63 +336,3 @@ func (i *IndexNode) ShowConfigurations(ctx context.Context, req *internalpb.Show
func (i *IndexNode) SetAddress(address string) {
i.address = address
}
//// GetMetrics gets the metrics info of IndexNode.
//// TODO(dragondriver): cache the Metrics and set a retention to the cache
//func (i *IndexNode) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) {
// if !i.isHealthy() {
// log.Warn("IndexNode.GetMetrics failed",
// zap.Int64("node_id", paramtable.GetNodeID()),
// zap.String("req", req.Request),
// zap.Error(errIndexNodeIsUnhealthy(paramtable.GetNodeID())))
//
// return &milvuspb.GetMetricsResponse{
// Status: &commonpb.Status{
// ErrorCode: commonpb.ErrorCode_UnexpectedError,
// Reason: msgIndexNodeIsUnhealthy(paramtable.GetNodeID()),
// },
// Response: "",
// }, nil
// }
//
// metricType, err := metricsinfo.ParseMetricType(req.Request)
// if err != nil {
// log.Warn("IndexNode.GetMetrics failed to parse metric type",
// zap.Int64("node_id", paramtable.GetNodeID()),
// zap.String("req", req.Request),
// zap.Error(err))
//
// return &milvuspb.GetMetricsResponse{
// Status: &commonpb.Status{
// ErrorCode: commonpb.ErrorCode_UnexpectedError,
// Reason: err.Error(),
// },
// Response: "",
// }, nil
// }
//
// if metricType == metricsinfo.SystemInfoMetrics {
// metrics, err := getSystemInfoMetrics(ctx, req, i)
//
// log.Debug("IndexNode.GetMetrics",
// zap.Int64("node_id", paramtable.GetNodeID()),
// zap.String("req", req.Request),
// zap.String("metric_type", metricType),
// zap.Error(err))
//
// return metrics, nil
// }
//
// log.Warn("IndexNode.GetMetrics failed, request metric type is not implemented yet",
// zap.Int64("node_id", paramtable.GetNodeID()),
// zap.String("req", req.Request),
// zap.String("metric_type", metricType))
//
// return &milvuspb.GetMetricsResponse{
// Status: &commonpb.Status{
// ErrorCode: commonpb.ErrorCode_UnexpectedError,
// Reason: metricsinfo.MsgUnimplementedMetric,
// },
// Response: "",
// }, nil
//}

View File

@ -1,3 +1,19 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package indexnode
import (
@ -136,7 +152,7 @@ func (i *IndexNode) QueryJobs(ctx context.Context, req *indexpb.QueryJobsRequest
ret.IndexInfos[i].IndexFileKeys = info.fileKeys
ret.IndexInfos[i].SerializedSize = info.serializedSize
ret.IndexInfos[i].FailReason = info.failReason
log.Ctx(ctx).Debug("querying index build task", zap.String("ClusterID", req.ClusterID),
log.RatedDebug(5, "querying index build task", zap.String("ClusterID", req.ClusterID),
zap.Int64("IndexBuildID", buildID), zap.String("state", info.state.String()),
zap.String("fail reason", info.failReason))
}
@ -145,7 +161,7 @@ func (i *IndexNode) QueryJobs(ctx context.Context, req *indexpb.QueryJobsRequest
}
func (i *IndexNode) DropJobs(ctx context.Context, req *indexpb.DropJobsRequest) (*commonpb.Status, error) {
log.Ctx(ctx).Debug("drop index build jobs", zap.String("ClusterID", req.ClusterID), zap.Int64s("IndexBuildIDs", req.BuildIDs))
log.Ctx(ctx).Info("drop index build jobs", zap.String("ClusterID", req.ClusterID), zap.Int64s("IndexBuildIDs", req.BuildIDs))
stateCode := i.stateCode.Load().(commonpb.StateCode)
if stateCode != commonpb.StateCode_Healthy {
log.Ctx(ctx).Warn("index node not ready", zap.Int32("state", int32(stateCode)), zap.String("ClusterID", req.ClusterID))
@ -164,7 +180,7 @@ func (i *IndexNode) DropJobs(ctx context.Context, req *indexpb.DropJobsRequest)
info.cancel()
}
}
log.Ctx(ctx).Debug("drop index build jobs success", zap.String("ClusterID", req.ClusterID),
log.Ctx(ctx).Info("drop index build jobs success", zap.String("ClusterID", req.ClusterID),
zap.Int64s("IndexBuildIDs", req.BuildIDs))
return &commonpb.Status{
ErrorCode: commonpb.ErrorCode_Success,

View File

@ -38,7 +38,6 @@ import (
"github.com/milvus-io/milvus/internal/util/indexcgowrapper"
"github.com/milvus-io/milvus/internal/util/indexparamcheck"
"github.com/milvus-io/milvus/internal/util/indexparams"
"github.com/milvus-io/milvus/internal/util/logutil"
"github.com/milvus-io/milvus/internal/util/metautil"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/milvus-io/milvus/internal/util/retry"
@ -141,12 +140,13 @@ func (it *indexBuildTask) GetState() commonpb.IndexState {
func (it *indexBuildTask) OnEnqueue(ctx context.Context) error {
it.statistic.StartTime = time.Now().UnixMicro()
it.statistic.PodID = it.node.GetNodeID()
log.Ctx(ctx).Debug("IndexNode IndexBuilderTask Enqueue")
log.Ctx(ctx).Info("IndexNode IndexBuilderTask Enqueue", zap.Int64("buildID", it.BuildID), zap.Int64("segID", it.segmentID))
return nil
}
func (it *indexBuildTask) Prepare(ctx context.Context) error {
logutil.Logger(ctx).Info("Begin to prepare indexBuildTask", zap.Int64("buildID", it.BuildID), zap.Int64("Collection", it.collectionID), zap.Int64("SegmentIf", it.segmentID))
log.Ctx(ctx).Info("Begin to prepare indexBuildTask", zap.Int64("buildID", it.BuildID),
zap.Int64("Collection", it.collectionID), zap.Int64("SegmentID", it.segmentID))
typeParams := make(map[string]string)
indexParams := make(map[string]string)
@ -173,7 +173,8 @@ func (it *indexBuildTask) Prepare(ctx context.Context) error {
// ignore error
}
}
logutil.Logger(ctx).Info("Successfully prepare indexBuildTask", zap.Int64("buildID", it.BuildID), zap.Int64("Collection", it.collectionID), zap.Int64("SegmentIf", it.segmentID))
log.Ctx(ctx).Info("Successfully prepare indexBuildTask", zap.Int64("buildID", it.BuildID),
zap.Int64("Collection", it.collectionID), zap.Int64("SegmentIf", it.segmentID))
return nil
}
@ -226,9 +227,11 @@ func (it *indexBuildTask) LoadData(ctx context.Context) error {
err = it.decodeBlobs(ctx, blobs)
if err != nil {
logutil.Logger(ctx).Info("failed to decode blobs", zap.Int64("buildID", it.BuildID), zap.Int64("Collection", it.collectionID), zap.Int64("SegmentIf", it.segmentID), zap.Error(err))
log.Ctx(ctx).Info("failed to decode blobs", zap.Int64("buildID", it.BuildID),
zap.Int64("Collection", it.collectionID), zap.Int64("SegmentIf", it.segmentID), zap.Error(err))
} else {
logutil.Logger(ctx).Info("Successfully load data", zap.Int64("buildID", it.BuildID), zap.Int64("Collection", it.collectionID), zap.Int64("SegmentIf", it.segmentID))
log.Ctx(ctx).Info("Successfully load data", zap.Int64("buildID", it.BuildID),
zap.Int64("Collection", it.collectionID), zap.Int64("SegmentIf", it.segmentID))
}
return err
}
@ -298,7 +301,8 @@ func (it *indexBuildTask) BuildIndex(ctx context.Context) error {
encodeIndexFileDur := it.tr.Record("index codec serialize done")
metrics.IndexNodeEncodeIndexFileLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Observe(float64(encodeIndexFileDur.Milliseconds()))
it.indexBlobs = serializedIndexBlobs
logutil.Logger(ctx).Info("Successfully build index", zap.Int64("buildID", it.BuildID), zap.Int64("Collection", it.collectionID), zap.Int64("SegmentID", it.segmentID))
log.Ctx(ctx).Info("Successfully build index", zap.Int64("buildID", it.BuildID),
zap.Int64("Collection", it.collectionID), zap.Int64("SegmentID", it.segmentID))
return nil
}
@ -437,7 +441,7 @@ func (it *indexBuildTask) SaveIndexFiles(ctx context.Context) error {
it.savePaths = savePaths
it.statistic.EndTime = time.Now().UnixMicro()
it.node.storeIndexFilesAndStatistic(it.ClusterID, it.BuildID, saveFileKeys, it.serializedSize, &it.statistic)
log.Ctx(ctx).Debug("save index files done", zap.Strings("IndexFiles", savePaths))
log.Ctx(ctx).Info("save index files done", zap.Strings("IndexFiles", savePaths))
saveIndexFileDur := it.tr.Record("index file save done")
metrics.IndexNodeSaveIndexFileLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Observe(float64(saveIndexFileDur.Milliseconds()))
it.tr.Elapse("index building all done")
@ -497,7 +501,7 @@ func (it *indexBuildTask) SaveDiskAnnIndexFiles(ctx context.Context) error {
it.statistic.EndTime = time.Now().UnixMicro()
it.node.storeIndexFilesAndStatistic(it.ClusterID, it.BuildID, saveFileKeys, it.serializedSize, &it.statistic)
log.Ctx(ctx).Debug("save index files done", zap.Strings("IndexFiles", savePaths))
log.Ctx(ctx).Info("save index files done", zap.Strings("IndexFiles", savePaths))
saveIndexFileDur := it.tr.Record("index file save done")
metrics.IndexNodeSaveIndexFileLatency.WithLabelValues(strconv.FormatInt(paramtable.GetNodeID(), 10)).Observe(float64(saveIndexFileDur.Milliseconds()))
it.tr.Elapse("index building all done")
@ -522,7 +526,7 @@ func (it *indexBuildTask) decodeBlobs(ctx context.Context, blobs []*storage.Blob
it.partitionID = partitionID
it.segmentID = segmentID
log.Ctx(ctx).Debug("indexnode deserialize data success",
log.Ctx(ctx).Info("indexnode deserialize data success",
zap.Int64("index id", it.req.IndexID),
zap.String("index name", it.req.IndexName),
zap.Int64("collectionID", it.collectionID),