Change QueryCoord Log level (#16590)

Signed-off-by: xiaofan-luan <xiaofan.luan@zilliz.com>
This commit is contained in:
Xiaofan 2022-04-26 11:29:54 +08:00 committed by GitHub
parent fbc7fe1cdc
commit c1ff0cec8f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 205 additions and 203 deletions

View File

@ -75,7 +75,7 @@ func newChannelUnsubscribeHandler(ctx context.Context, kv *etcdkv.EtcdKV, factor
// reloadFromKV reload unsolved channels to unsubscribe
func (csh *channelUnsubscribeHandler) reloadFromKV() error {
log.Debug("start reload unsubscribe channelInfo from kv")
log.Info("start reload unsubscribe channelInfo from kv")
_, channelInfoValues, err := csh.kvClient.LoadWithPrefix(unsubscribeChannelInfoPrefix)
if err != nil {
return err
@ -116,7 +116,7 @@ func (csh *channelUnsubscribeHandler) addUnsubscribeChannelInfo(info *querypb.Un
}
csh.channelInfos.PushBack(info)
csh.downNodeChan <- info.NodeID
log.Debug("add unsubscribeChannelInfo to handler", zap.Int64("nodeID", info.NodeID))
log.Info("add unsubscribeChannelInfo to handler", zap.Int64("nodeID", info.NodeID))
}
}
@ -126,7 +126,7 @@ func (csh *channelUnsubscribeHandler) handleChannelUnsubscribeLoop() {
for {
select {
case <-csh.ctx.Done():
log.Debug("channelUnsubscribeHandler ctx done, handleChannelUnsubscribeLoop end")
log.Info("channelUnsubscribeHandler ctx done, handleChannelUnsubscribeLoop end")
return
case <-csh.downNodeChan:
channelInfo := csh.channelInfos.Front().Value.(*querypb.UnsubscribeChannelInfo)
@ -136,7 +136,7 @@ func (csh *channelUnsubscribeHandler) handleChannelUnsubscribeLoop() {
subName := funcutil.GenChannelSubName(Params.CommonCfg.QueryNodeSubName, collectionID, nodeID)
err := unsubscribeChannels(csh.ctx, csh.factory, subName, collectionChannels.Channels)
if err != nil {
log.Debug("unsubscribe channels failed", zap.Int64("nodeID", nodeID))
log.Error("unsubscribe channels failed", zap.Int64("nodeID", nodeID))
panic(err)
}
}
@ -147,7 +147,7 @@ func (csh *channelUnsubscribeHandler) handleChannelUnsubscribeLoop() {
log.Error("remove unsubscribe channelInfo from etcd failed", zap.Int64("nodeID", nodeID))
panic(err)
}
log.Debug("unsubscribe channels success", zap.Int64("nodeID", nodeID))
log.Info("unsubscribe channels success", zap.Int64("nodeID", nodeID))
}
}
}

View File

@ -146,10 +146,10 @@ func (c *queryNodeCluster) reloadFromKV() error {
onlineSessionMap[nodeID] = session
}
for nodeID, session := range onlineSessionMap {
log.Debug("reloadFromKV: register a queryNode to cluster", zap.Any("nodeID", nodeID))
log.Info("reloadFromKV: register a queryNode to cluster", zap.Any("nodeID", nodeID))
err := c.registerNode(c.ctx, session, nodeID, disConnect)
if err != nil {
log.Error("QueryNode failed to register", zap.Int64("nodeID", nodeID), zap.String("error info", err.Error()))
log.Warn("QueryNode failed to register", zap.Int64("nodeID", nodeID), zap.String("error info", err.Error()))
return err
}
toLoadMetaNodeIDs = append(toLoadMetaNodeIDs, nodeID)
@ -159,25 +159,25 @@ func (c *queryNodeCluster) reloadFromKV() error {
// load node information before power off from etcd
oldStringNodeIDs, oldNodeSessions, err := c.client.LoadWithPrefix(queryNodeInfoPrefix)
if err != nil {
log.Error("reloadFromKV: get previous node info from etcd error", zap.Error(err))
log.Warn("reloadFromKV: get previous node info from etcd error", zap.Error(err))
return err
}
for index := range oldStringNodeIDs {
nodeID, err := strconv.ParseInt(filepath.Base(oldStringNodeIDs[index]), 10, 64)
if err != nil {
log.Error("watchNodeLoop: parse nodeID error", zap.Error(err))
log.Warn("watchNodeLoop: parse nodeID error", zap.Error(err))
return err
}
if _, ok := onlineSessionMap[nodeID]; !ok {
session := &sessionutil.Session{}
err = json.Unmarshal([]byte(oldNodeSessions[index]), session)
if err != nil {
log.Error("watchNodeLoop: unmarshal session error", zap.Error(err))
log.Warn("watchNodeLoop: unmarshal session error", zap.Error(err))
return err
}
err = c.registerNode(context.Background(), session, nodeID, offline)
if err != nil {
log.Debug("reloadFromKV: failed to add queryNode to cluster", zap.Int64("nodeID", nodeID), zap.String("error info", err.Error()))
log.Warn("reloadFromKV: failed to add queryNode to cluster", zap.Int64("nodeID", nodeID), zap.String("error info", err.Error()))
return err
}
toLoadMetaNodeIDs = append(toLoadMetaNodeIDs, nodeID)
@ -214,7 +214,7 @@ func (c *queryNodeCluster) loadSegments(ctx context.Context, nodeID int64, in *q
if targetNode != nil {
err := targetNode.loadSegments(ctx, in)
if err != nil {
log.Debug("loadSegments: queryNode load segments error", zap.Int64("nodeID", nodeID), zap.String("error info", err.Error()))
log.Warn("loadSegments: queryNode load segments error", zap.Int64("nodeID", nodeID), zap.String("error info", err.Error()))
return err
}
@ -238,7 +238,7 @@ func (c *queryNodeCluster) releaseSegments(ctx context.Context, nodeID int64, in
err := targetNode.releaseSegments(ctx, in)
if err != nil {
log.Debug("releaseSegments: queryNode release segments error", zap.Int64("nodeID", nodeID), zap.String("error info", err.Error()))
log.Warn("releaseSegments: queryNode release segments error", zap.Int64("nodeID", nodeID), zap.String("error info", err.Error()))
return err
}
@ -259,7 +259,6 @@ func (c *queryNodeCluster) watchDmChannels(ctx context.Context, nodeID int64, in
if targetNode != nil {
err := targetNode.watchDmChannels(ctx, in)
if err != nil {
log.Debug("watchDmChannels: queryNode watch dm channel error", zap.String("error", err.Error()))
return err
}
dmChannelWatchInfo := make([]*querypb.DmChannelWatchInfo, len(in.Infos))
@ -274,7 +273,7 @@ func (c *queryNodeCluster) watchDmChannels(ctx context.Context, nodeID int64, in
err = c.clusterMeta.setDmChannelInfos(dmChannelWatchInfo)
if err != nil {
log.Debug("watchDmChannels: update dmChannelWatchInfos to meta failed", zap.String("error", err.Error()))
// TODO DML channel maybe leaked, need to release dml if no related segment
return err
}
@ -294,7 +293,6 @@ func (c *queryNodeCluster) watchDeltaChannels(ctx context.Context, nodeID int64,
if targetNode != nil {
err := targetNode.watchDeltaChannels(ctx, in)
if err != nil {
log.Debug("watchDeltaChannels: queryNode watch delta channel error", zap.String("error", err.Error()))
return err
}
@ -334,7 +332,6 @@ func (c *queryNodeCluster) addQueryChannel(ctx context.Context, nodeID int64, in
}
msgPosition, err := c.clusterMeta.sendSealedSegmentChangeInfos(in.CollectionID, in.QueryChannel, emptyChangeInfo)
if err != nil {
log.Error("addQueryChannel: get latest messageID of query channel error", zap.String("queryChannel", in.QueryChannel), zap.Error(err))
return err
}
@ -342,7 +339,6 @@ func (c *queryNodeCluster) addQueryChannel(ctx context.Context, nodeID int64, in
in.SeekPosition = msgPosition
err = targetNode.addQueryChannel(ctx, in)
if err != nil {
log.Error("addQueryChannel: queryNode add query channel error", zap.String("queryChannel", in.QueryChannel), zap.Error(err))
return err
}
return nil
@ -361,7 +357,7 @@ func (c *queryNodeCluster) removeQueryChannel(ctx context.Context, nodeID int64,
if targetNode != nil {
err := targetNode.removeQueryChannel(ctx, in)
if err != nil {
log.Debug("removeQueryChannel: queryNode remove query channel error", zap.String("error", err.Error()))
log.Warn("removeQueryChannel: queryNode remove query channel error", zap.String("error", err.Error()))
return err
}
@ -382,7 +378,6 @@ func (c *queryNodeCluster) releaseCollection(ctx context.Context, nodeID int64,
if targetNode != nil {
err := targetNode.releaseCollection(ctx, in)
if err != nil {
log.Debug("releaseCollection: queryNode release collection error", zap.String("error", err.Error()))
return err
}
@ -403,7 +398,6 @@ func (c *queryNodeCluster) releasePartitions(ctx context.Context, nodeID int64,
if targetNode != nil {
err := targetNode.releasePartitions(ctx, in)
if err != nil {
log.Debug("releasePartitions: queryNode release partitions error", zap.String("error", err.Error()))
return err
}
@ -561,7 +555,7 @@ func (c *queryNodeCluster) registerNode(ctx context.Context, session *sessionuti
if _, ok := c.nodes[id]; !ok {
sessionJSON, err := json.Marshal(session)
if err != nil {
log.Debug("registerNode: marshal session error", zap.Int64("nodeID", id), zap.Any("address", session))
log.Warn("registerNode: marshal session error", zap.Int64("nodeID", id), zap.Any("address", session))
return err
}
key := fmt.Sprintf("%s/%d", queryNodeInfoPrefix, id)
@ -571,7 +565,7 @@ func (c *queryNodeCluster) registerNode(ctx context.Context, session *sessionuti
}
node, err := c.newNodeFn(ctx, session.Address, id, c.client)
if err != nil {
log.Debug("registerNode: create a new QueryNode failed", zap.Int64("nodeID", id), zap.Error(err))
log.Warn("registerNode: create a new QueryNode failed", zap.Int64("nodeID", id), zap.Error(err))
return err
}
c.setNodeState(id, node, state)
@ -580,7 +574,7 @@ func (c *queryNodeCluster) registerNode(ctx context.Context, session *sessionuti
}
c.nodes[id] = node
metrics.QueryCoordNumQueryNodes.WithLabelValues().Inc()
log.Debug("registerNode: create a new QueryNode", zap.Int64("nodeID", id), zap.String("address", session.Address), zap.Any("state", state))
log.Info("registerNode: create a new QueryNode", zap.Int64("nodeID", id), zap.String("address", session.Address), zap.Any("state", state))
return nil
}
return fmt.Errorf("registerNode: QueryNode %d alredy exists in cluster", id)
@ -613,7 +607,7 @@ func (c *queryNodeCluster) removeNodeInfo(nodeID int64) error {
delete(c.nodes, nodeID)
metrics.QueryCoordNumQueryNodes.WithLabelValues().Dec()
log.Debug("removeNodeInfo: delete nodeInfo in cluster MetaReplica", zap.Int64("nodeID", nodeID))
log.Info("removeNodeInfo: delete nodeInfo in cluster MetaReplica", zap.Int64("nodeID", nodeID))
return nil
}
@ -625,7 +619,7 @@ func (c *queryNodeCluster) stopNode(nodeID int64) {
if node, ok := c.nodes[nodeID]; ok {
node.stop()
c.setNodeState(nodeID, node, offline)
log.Debug("stopNode: queryNode offline", zap.Int64("nodeID", nodeID))
log.Info("stopNode: queryNode offline", zap.Int64("nodeID", nodeID))
}
}

View File

@ -68,7 +68,7 @@ func (broker *globalMetaBroker) releaseDQLMessageStream(ctx context.Context, col
log.Error("releaseDQLMessageStream occur error", zap.Int64("collectionID", collectionID), zap.Error(err))
return err
}
log.Debug("releaseDQLMessageStream successfully", zap.Int64("collectionID", collectionID))
log.Info("releaseDQLMessageStream successfully", zap.Int64("collectionID", collectionID))
return nil
}
@ -93,7 +93,7 @@ func (broker *globalMetaBroker) showPartitionIDs(ctx context.Context, collection
log.Error("showPartition failed", zap.Int64("collectionID", collectionID), zap.Error(err))
return nil, err
}
log.Debug("show partition successfully", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", showPartitionResponse.PartitionIDs))
log.Info("show partition successfully", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", showPartitionResponse.PartitionIDs))
return showPartitionResponse.PartitionIDs, nil
}
@ -119,7 +119,7 @@ func (broker *globalMetaBroker) getRecoveryInfo(ctx context.Context, collectionI
log.Error("get recovery info failed", zap.Int64("collectionID", collectionID), zap.Int64("partitionID", partitionID), zap.Error(err))
return nil, nil, err
}
log.Debug("get recovery info successfully",
log.Info("get recovery info successfully",
zap.Int64("collectionID", collectionID),
zap.Int64("partitionID", partitionID),
zap.Int("num channels", len(recoveryInfo.Channels)),
@ -156,14 +156,14 @@ func (broker *globalMetaBroker) getIndexBuildID(ctx context.Context, collectionI
}
if !response.EnableIndex {
log.Debug("describe segment from rootCoord successfully",
log.Info("describe segment from rootCoord successfully",
zap.Int64("collectionID", collectionID),
zap.Int64("segmentID", segmentID),
zap.Bool("enableIndex", false))
return false, 0, nil
}
log.Debug("describe segment from rootCoord successfully",
log.Info("describe segment from rootCoord successfully",
zap.Int64("collectionID", collectionID),
zap.Int64("segmentID", segmentID),
zap.Bool("enableIndex", true),
@ -190,7 +190,7 @@ func (broker *globalMetaBroker) getIndexFilePaths(ctx context.Context, buildID i
log.Error(err.Error())
return nil, err
}
log.Debug("get index info from indexCoord successfully", zap.Int64("buildID", buildID))
log.Info("get index info from indexCoord successfully", zap.Int64("buildID", buildID))
return pathResponse.FilePaths, nil
}
@ -266,7 +266,7 @@ func (broker *globalMetaBroker) parseIndexInfo(ctx context.Context, segmentID Un
return err
}
log.Debug("set index info success", zap.Int64("segmentID", segmentID), zap.Int64("fieldID", indexInfo.FieldID), zap.Int64("buildID", buildID))
log.Info("set index info success", zap.Int64("segmentID", segmentID), zap.Int64("fieldID", indexInfo.FieldID), zap.Int64("buildID", buildID))
return nil
}
@ -317,7 +317,7 @@ func (broker *globalMetaBroker) describeSegments(ctx context.Context, collection
return nil, err
}
log.Debug("describe segments successfully",
log.Info("describe segments successfully",
zap.Int64("collection", collectionID),
zap.Int64s("segments", segmentIDs))

View File

@ -90,7 +90,7 @@ func (qc *QueryCoord) GetStatisticsChannel(ctx context.Context) (*milvuspb.Strin
// ShowCollections return all the collections that have been loaded
func (qc *QueryCoord) ShowCollections(ctx context.Context, req *querypb.ShowCollectionsRequest) (*querypb.ShowCollectionsResponse, error) {
log.Debug("show collection start",
log.Info("show collection start",
zap.String("role", typeutil.QueryCoordRole),
zap.Int64s("collectionIDs", req.CollectionIDs),
zap.Int64("msgID", req.Base.MsgID))
@ -118,7 +118,7 @@ func (qc *QueryCoord) ShowCollections(ctx context.Context, req *querypb.ShowColl
for _, id := range inMemoryCollectionIDs {
inMemoryPercentages = append(inMemoryPercentages, ID2collectionInfo[id].InMemoryPercentage)
}
log.Debug("show collection end",
log.Info("show collection end",
zap.String("role", typeutil.QueryCoordRole),
zap.Int64s("collections", inMemoryCollectionIDs),
zap.Int64s("inMemoryPercentage", inMemoryPercentages),
@ -145,7 +145,7 @@ func (qc *QueryCoord) ShowCollections(ctx context.Context, req *querypb.ShowColl
}
inMemoryPercentages = append(inMemoryPercentages, ID2collectionInfo[id].InMemoryPercentage)
}
log.Debug("show collection end",
log.Info("show collection end",
zap.String("role", typeutil.QueryCoordRole),
zap.Int64s("collections", req.CollectionIDs),
zap.Int64("msgID", req.Base.MsgID),
@ -163,7 +163,7 @@ func (qc *QueryCoord) LoadCollection(ctx context.Context, req *querypb.LoadColle
collectionID := req.CollectionID
//schema := req.Schema
log.Debug("loadCollectionRequest received",
log.Info("loadCollectionRequest received",
zap.String("role", typeutil.QueryCoordRole),
zap.Int64("collectionID", collectionID),
zap.Int64("msgID", req.Base.MsgID))
@ -184,7 +184,7 @@ func (qc *QueryCoord) LoadCollection(ctx context.Context, req *querypb.LoadColle
if collectionInfo, err := qc.meta.getCollectionInfoByID(collectionID); err == nil {
// if collection has been loaded by load collection request, return success
if collectionInfo.LoadType == querypb.LoadType_LoadCollection {
log.Debug("collection has already been loaded, return load success directly",
log.Info("collection has already been loaded, return load success directly",
zap.String("role", typeutil.QueryCoordRole),
zap.Int64("collectionID", collectionID),
zap.Int64("msgID", req.Base.MsgID))
@ -247,7 +247,7 @@ func (qc *QueryCoord) LoadCollection(ctx context.Context, req *querypb.LoadColle
return status, nil
}
log.Debug("loadCollectionRequest completed",
log.Info("loadCollectionRequest completed",
zap.String("role", typeutil.QueryCoordRole),
zap.Int64("collectionID", collectionID),
zap.Int64("msgID", req.Base.MsgID))
@ -260,7 +260,7 @@ func (qc *QueryCoord) ReleaseCollection(ctx context.Context, req *querypb.Releas
metrics.QueryCoordReleaseCount.WithLabelValues(metrics.TotalLabel).Inc()
//dbID := req.DbID
collectionID := req.CollectionID
log.Debug("releaseCollectionRequest received",
log.Info("releaseCollectionRequest received",
zap.String("role", typeutil.QueryCoordRole),
zap.Int64("collectionID", collectionID),
zap.Int64("msgID", req.Base.MsgID))
@ -281,7 +281,7 @@ func (qc *QueryCoord) ReleaseCollection(ctx context.Context, req *querypb.Releas
// if collection has not been loaded into memory, return release collection successfully
hasCollection := qc.meta.hasCollection(collectionID)
if !hasCollection {
log.Debug("release collection end, the collection has not been loaded into QueryNode",
log.Info("release collection end, the collection has not been loaded into QueryNode",
zap.String("role", typeutil.QueryCoordRole),
zap.Int64("collectionID", collectionID),
zap.Int64("msgID", req.Base.MsgID))
@ -326,7 +326,7 @@ func (qc *QueryCoord) ReleaseCollection(ctx context.Context, req *querypb.Releas
return status, nil
}
log.Debug("releaseCollectionRequest completed",
log.Info("releaseCollectionRequest completed",
zap.String("role", typeutil.QueryCoordRole),
zap.Int64("collectionID", collectionID),
zap.Int64("msgID", req.Base.MsgID))
@ -341,7 +341,7 @@ func (qc *QueryCoord) ReleaseCollection(ctx context.Context, req *querypb.Releas
// ShowPartitions return all the partitions that have been loaded
func (qc *QueryCoord) ShowPartitions(ctx context.Context, req *querypb.ShowPartitionsRequest) (*querypb.ShowPartitionsResponse, error) {
collectionID := req.CollectionID
log.Debug("show partitions start",
log.Info("show partitions start",
zap.String("role", typeutil.QueryCoordRole),
zap.Int64("collectionID", collectionID),
zap.Int64s("partitionIDs", req.PartitionIDs),
@ -383,7 +383,7 @@ func (qc *QueryCoord) ShowPartitions(ctx context.Context, req *querypb.ShowParti
for _, id := range inMemoryPartitionIDs {
inMemoryPercentages = append(inMemoryPercentages, ID2PartitionState[id].InMemoryPercentage)
}
log.Debug("show partitions end",
log.Info("show partitions end",
zap.String("role", typeutil.QueryCoordRole),
zap.Int64("collectionID", collectionID),
zap.Int64("msgID", req.Base.MsgID),
@ -413,7 +413,7 @@ func (qc *QueryCoord) ShowPartitions(ctx context.Context, req *querypb.ShowParti
inMemoryPercentages = append(inMemoryPercentages, ID2PartitionState[id].InMemoryPercentage)
}
log.Debug("show partitions end",
log.Info("show partitions end",
zap.String("role", typeutil.QueryCoordRole),
zap.Int64("collectionID", collectionID),
zap.Int64s("partitionIDs", req.PartitionIDs),
@ -433,7 +433,7 @@ func (qc *QueryCoord) LoadPartitions(ctx context.Context, req *querypb.LoadParti
collectionID := req.CollectionID
partitionIDs := req.PartitionIDs
log.Debug("loadPartitionRequest received",
log.Info("loadPartitionRequest received",
zap.String("role", typeutil.QueryCoordRole),
zap.Int64("collectionID", collectionID),
zap.Int64s("partitionIDs", partitionIDs),
@ -509,7 +509,7 @@ func (qc *QueryCoord) LoadPartitions(ctx context.Context, req *querypb.LoadParti
return status, nil
}
log.Debug("loadPartitionRequest completed, all partitions to load have already been loaded into memory",
log.Info("loadPartitionRequest completed, all partitions to load have already been loaded into memory",
zap.String("role", typeutil.QueryCoordRole),
zap.Int64("collectionID", req.CollectionID),
zap.Int64s("partitionIDs", partitionIDs),
@ -557,7 +557,7 @@ func (qc *QueryCoord) LoadPartitions(ctx context.Context, req *querypb.LoadParti
return status, nil
}
log.Debug("loadPartitionRequest completed",
log.Info("loadPartitionRequest completed",
zap.String("role", typeutil.QueryCoordRole),
zap.Int64("collectionID", req.CollectionID),
zap.Int64s("partitionIDs", partitionIDs),
@ -573,7 +573,7 @@ func (qc *QueryCoord) ReleasePartitions(ctx context.Context, req *querypb.Releas
//dbID := req.DbID
collectionID := req.CollectionID
partitionIDs := req.PartitionIDs
log.Debug("releasePartitionRequest received",
log.Info("releasePartitionRequest received",
zap.String("role", typeutil.QueryCoordRole),
zap.Int64("collectionID", req.CollectionID),
zap.Int64s("partitionIDs", partitionIDs),
@ -586,7 +586,7 @@ func (qc *QueryCoord) ReleasePartitions(ctx context.Context, req *querypb.Releas
status.ErrorCode = commonpb.ErrorCode_UnexpectedError
err := errors.New("QueryCoord is not healthy")
status.Reason = err.Error()
log.Error("release partition failed", zap.String("role", typeutil.QueryCoordRole), zap.Int64("msgID", req.Base.MsgID), zap.Error(err))
log.Warn("release partition failed", zap.String("role", typeutil.QueryCoordRole), zap.Int64("msgID", req.Base.MsgID), zap.Error(err))
metrics.QueryCoordReleaseCount.WithLabelValues(metrics.FailLabel).Inc()
return status, nil
@ -639,7 +639,7 @@ func (qc *QueryCoord) ReleasePartitions(ctx context.Context, req *querypb.Releas
}
}
} else {
log.Debug("release partitions end, the collection has not been loaded into QueryNode",
log.Info("release partitions end, the collection has not been loaded into QueryNode",
zap.String("role", typeutil.QueryCoordRole),
zap.Int64("collectionID", req.CollectionID),
zap.Int64("msgID", req.Base.MsgID))
@ -649,7 +649,7 @@ func (qc *QueryCoord) ReleasePartitions(ctx context.Context, req *querypb.Releas
}
if len(toReleasedPartitions) == 0 {
log.Debug("release partitions end, the partitions has not been loaded into QueryNode",
log.Info("release partitions end, the partitions has not been loaded into QueryNode",
zap.String("role", typeutil.QueryCoordRole),
zap.Int64("collectionID", req.CollectionID),
zap.Int64s("partitionIDs", partitionIDs),
@ -663,7 +663,7 @@ func (qc *QueryCoord) ReleasePartitions(ctx context.Context, req *querypb.Releas
baseTask := newBaseTask(qc.loopCtx, querypb.TriggerCondition_GrpcRequest)
if releaseCollection {
// if all loaded partitions will be released from memory, then upgrade release partitions request to release collection request
log.Debug(fmt.Sprintf("all partitions of collection %d will released from QueryNode, so release the collection directly", collectionID),
log.Info(fmt.Sprintf("all partitions of collection %d will released from QueryNode, so release the collection directly", collectionID),
zap.String("role", typeutil.QueryCoordRole),
zap.Int64("msgID", req.Base.MsgID))
msgBase := req.Base
@ -690,7 +690,7 @@ func (qc *QueryCoord) ReleasePartitions(ctx context.Context, req *querypb.Releas
}
err := qc.scheduler.Enqueue(releaseTask)
if err != nil {
log.Error("releasePartitionRequest failed to add execute task to scheduler",
log.Warn("releasePartitionRequest failed to add execute task to scheduler",
zap.String("role", typeutil.QueryCoordRole),
zap.Int64("collectionID", collectionID),
zap.Int64s("partitionIDs", partitionIDs),
@ -705,7 +705,7 @@ func (qc *QueryCoord) ReleasePartitions(ctx context.Context, req *querypb.Releas
err = releaseTask.waitToFinish()
if err != nil {
log.Error("releasePartitionRequest failed",
log.Warn("releasePartitionRequest failed",
zap.String("role", typeutil.QueryCoordRole),
zap.Int64("collectionID", req.CollectionID),
zap.Int64s("partitionIDs", partitionIDs),
@ -718,7 +718,7 @@ func (qc *QueryCoord) ReleasePartitions(ctx context.Context, req *querypb.Releas
return status, nil
}
log.Debug("releasePartitionRequest completed",
log.Info("releasePartitionRequest completed",
zap.String("role", typeutil.QueryCoordRole),
zap.Int64("collectionID", collectionID),
zap.Int64s("partitionIDs", partitionIDs),
@ -767,7 +767,7 @@ func (qc *QueryCoord) CreateQueryChannel(ctx context.Context, req *querypb.Creat
// GetPartitionStates returns state of the partition, including notExist, notPresent, onDisk, partitionInMemory, inMemory, partitionInGPU, InGPU
func (qc *QueryCoord) GetPartitionStates(ctx context.Context, req *querypb.GetPartitionStatesRequest) (*querypb.GetPartitionStatesResponse, error) {
log.Debug("getPartitionStatesRequest received",
log.Info("getPartitionStatesRequest received",
zap.String("role", typeutil.QueryCoordRole),
zap.Int64("collectionID", req.CollectionID),
zap.Int64s("partitionIDs", req.PartitionIDs),
@ -780,7 +780,7 @@ func (qc *QueryCoord) GetPartitionStates(ctx context.Context, req *querypb.GetPa
status.ErrorCode = commonpb.ErrorCode_UnexpectedError
err := errors.New("QueryCoord is not healthy")
status.Reason = err.Error()
log.Error("getPartitionStates failed", zap.String("role", typeutil.QueryCoordRole), zap.Int64("msgID", req.Base.MsgID), zap.Error(err))
log.Warn("getPartitionStates failed", zap.String("role", typeutil.QueryCoordRole), zap.Int64("msgID", req.Base.MsgID), zap.Error(err))
return &querypb.GetPartitionStatesResponse{
Status: status,
}, nil
@ -810,7 +810,7 @@ func (qc *QueryCoord) GetPartitionStates(ctx context.Context, req *querypb.GetPa
}
partitionStates = append(partitionStates, partitionState)
}
log.Debug("getPartitionStatesRequest completed",
log.Info("getPartitionStatesRequest completed",
zap.String("role", typeutil.QueryCoordRole),
zap.Int64("collectionID", req.CollectionID),
zap.Int64s("partitionIDs", req.PartitionIDs),
@ -823,7 +823,7 @@ func (qc *QueryCoord) GetPartitionStates(ctx context.Context, req *querypb.GetPa
// GetSegmentInfo returns information of all the segments on queryNodes, and the information includes memSize, numRow, indexName, indexID ...
func (qc *QueryCoord) GetSegmentInfo(ctx context.Context, req *querypb.GetSegmentInfoRequest) (*querypb.GetSegmentInfoResponse, error) {
log.Debug("getSegmentInfoRequest received",
log.Info("getSegmentInfoRequest received",
zap.String("role", typeutil.QueryCoordRole),
zap.Int64("collectionID", req.CollectionID),
zap.Int64s("segmentIDs", req.SegmentIDs),
@ -836,7 +836,7 @@ func (qc *QueryCoord) GetSegmentInfo(ctx context.Context, req *querypb.GetSegmen
status.ErrorCode = commonpb.ErrorCode_UnexpectedError
err := errors.New("QueryCoord is not healthy")
status.Reason = err.Error()
log.Error("getSegmentInfo failed", zap.String("role", typeutil.QueryCoordRole), zap.Int64("msgID", req.Base.MsgID), zap.Error(err))
log.Warn("getSegmentInfo failed", zap.String("role", typeutil.QueryCoordRole), zap.Int64("msgID", req.Base.MsgID), zap.Error(err))
return &querypb.GetSegmentInfoResponse{
Status: status,
}, nil
@ -865,7 +865,7 @@ func (qc *QueryCoord) GetSegmentInfo(ctx context.Context, req *querypb.GetSegmen
totalNumRows += info.NumRows
totalMemSize += info.MemSize
}
log.Debug("getSegmentInfoRequest completed",
log.Info("getSegmentInfoRequest completed",
zap.String("role", typeutil.QueryCoordRole),
zap.Int64("collectionID", req.CollectionID),
zap.Int64("msgID", req.Base.MsgID),
@ -879,7 +879,7 @@ func (qc *QueryCoord) GetSegmentInfo(ctx context.Context, req *querypb.GetSegmen
// LoadBalance would do a load balancing operation between query nodes
func (qc *QueryCoord) LoadBalance(ctx context.Context, req *querypb.LoadBalanceRequest) (*commonpb.Status, error) {
log.Debug("loadBalanceRequest received",
log.Info("loadBalanceRequest received",
zap.String("role", typeutil.QueryCoordRole),
zap.Int64s("source nodeIDs", req.SourceNodeIDs),
zap.Int64s("dst nodeIDs", req.DstNodeIDs),
@ -894,7 +894,7 @@ func (qc *QueryCoord) LoadBalance(ctx context.Context, req *querypb.LoadBalanceR
status.ErrorCode = commonpb.ErrorCode_UnexpectedError
err := errors.New("QueryCoord is not healthy")
status.Reason = err.Error()
log.Error("loadBalance failed", zap.String("role", typeutil.QueryCoordRole), zap.Int64("msgID", req.Base.MsgID), zap.Error(err))
log.Warn("loadBalance failed", zap.String("role", typeutil.QueryCoordRole), zap.Int64("msgID", req.Base.MsgID), zap.Error(err))
return status, nil
}
@ -909,7 +909,7 @@ func (qc *QueryCoord) LoadBalance(ctx context.Context, req *querypb.LoadBalanceR
}
err := qc.scheduler.Enqueue(loadBalanceTask)
if err != nil {
log.Error("loadBalanceRequest failed to add execute task to scheduler",
log.Warn("loadBalanceRequest failed to add execute task to scheduler",
zap.String("role", typeutil.QueryCoordRole),
zap.Int64("msgID", req.Base.MsgID),
zap.Error(err))
@ -920,13 +920,13 @@ func (qc *QueryCoord) LoadBalance(ctx context.Context, req *querypb.LoadBalanceR
err = loadBalanceTask.waitToFinish()
if err != nil {
log.Error("loadBalanceRequest failed", zap.String("role", typeutil.QueryCoordRole), zap.Int64("msgID", req.Base.MsgID), zap.Error(err))
log.Warn("loadBalanceRequest failed", zap.String("role", typeutil.QueryCoordRole), zap.Int64("msgID", req.Base.MsgID), zap.Error(err))
status.ErrorCode = commonpb.ErrorCode_UnexpectedError
status.Reason = err.Error()
return status, nil
}
log.Debug("loadBalanceRequest completed",
log.Info("loadBalanceRequest completed",
zap.String("role", typeutil.QueryCoordRole),
zap.Int64s("source nodeIDs", req.SourceNodeIDs),
zap.Int64s("dst nodeIDs", req.DstNodeIDs),
@ -954,14 +954,14 @@ func (qc *QueryCoord) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRe
if qc.stateCode.Load() != internalpb.StateCode_Healthy {
err := errors.New("QueryCoord is not healthy")
getMetricsResponse.Status.Reason = err.Error()
log.Error("getMetrics failed", zap.String("role", typeutil.QueryCoordRole), zap.Int64("msgID", req.Base.MsgID), zap.Error(err))
log.Warn("getMetrics failed", zap.String("role", typeutil.QueryCoordRole), zap.Int64("msgID", req.Base.MsgID), zap.Error(err))
return getMetricsResponse, nil
}
metricType, err := metricsinfo.ParseMetricType(req.Request)
if err != nil {
getMetricsResponse.Status.Reason = err.Error()
log.Error("getMetrics failed to parse metric type",
log.Warn("getMetrics failed to parse metric type",
zap.String("role", typeutil.QueryCoordRole),
zap.Int64("msgID", req.Base.MsgID),
zap.Error(err))
@ -1009,7 +1009,7 @@ func (qc *QueryCoord) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRe
err = errors.New(metricsinfo.MsgUnimplementedMetric)
getMetricsResponse.Status.Reason = err.Error()
log.Error("getMetrics failed",
log.Warn("getMetrics failed",
zap.String("role", typeutil.QueryCoordRole),
zap.String("req", req.Request),
zap.Int64("msgID", req.Base.MsgID),
@ -1020,7 +1020,7 @@ func (qc *QueryCoord) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRe
// GetReplicas gets replicas of a certain collection
func (qc *QueryCoord) GetReplicas(ctx context.Context, req *milvuspb.GetReplicasRequest) (*milvuspb.GetReplicasResponse, error) {
log.Debug("GetReplicas received",
log.Info("GetReplicas received",
zap.String("role", typeutil.QueryCoordRole),
zap.Int64("collectionID", req.CollectionID),
zap.Int64("msgID", req.Base.MsgID))
@ -1033,7 +1033,7 @@ func (qc *QueryCoord) GetReplicas(ctx context.Context, req *milvuspb.GetReplicas
status.ErrorCode = commonpb.ErrorCode_UnexpectedError
err := errors.New("QueryCoord is not healthy")
status.Reason = err.Error()
log.Error("GetReplicasResponse failed", zap.String("role", typeutil.QueryCoordRole), zap.Int64("msgID", req.Base.MsgID), zap.Error(err))
log.Warn("GetReplicasResponse failed", zap.String("role", typeutil.QueryCoordRole), zap.Int64("msgID", req.Base.MsgID), zap.Error(err))
return &milvuspb.GetReplicasResponse{
Status: status,
}, nil
@ -1043,7 +1043,7 @@ func (qc *QueryCoord) GetReplicas(ctx context.Context, req *milvuspb.GetReplicas
if err != nil {
status.ErrorCode = commonpb.ErrorCode_MetaFailed
status.Reason = err.Error()
log.Error("GetReplicasResponse failed to get replicas",
log.Warn("GetReplicasResponse failed to get replicas",
zap.String("role", typeutil.QueryCoordRole),
zap.Int64("collectionID", req.CollectionID),
zap.Int64("msgID", req.Base.MsgID),
@ -1083,6 +1083,11 @@ func (qc *QueryCoord) GetReplicas(ctx context.Context, req *milvuspb.GetReplicas
}
}
log.Info("GetReplicas finished",
zap.String("role", typeutil.QueryCoordRole),
zap.Int64("collectionID", req.CollectionID),
zap.Any("replicas", replicas))
return &milvuspb.GetReplicasResponse{
Status: status,
Replicas: replicas,
@ -1091,7 +1096,7 @@ func (qc *QueryCoord) GetReplicas(ctx context.Context, req *milvuspb.GetReplicas
// GetShardLeaders gets shard leaders of a certain collection
func (qc *QueryCoord) GetShardLeaders(ctx context.Context, req *querypb.GetShardLeadersRequest) (*querypb.GetShardLeadersResponse, error) {
log.Debug("GetShardLeaders received",
log.Info("GetShardLeaders received",
zap.String("role", typeutil.QueryCoordRole),
zap.Int64("collectionID", req.CollectionID),
zap.Int64("msgID", req.Base.MsgID))
@ -1104,7 +1109,7 @@ func (qc *QueryCoord) GetShardLeaders(ctx context.Context, req *querypb.GetShard
status.ErrorCode = commonpb.ErrorCode_UnexpectedError
err := errors.New("QueryCoord is not healthy")
status.Reason = err.Error()
log.Error("GetShardLeadersResponse failed", zap.String("role", typeutil.QueryCoordRole), zap.Int64("msgID", req.Base.MsgID), zap.Error(err))
log.Warn("GetShardLeadersResponse failed", zap.String("role", typeutil.QueryCoordRole), zap.Int64("msgID", req.Base.MsgID), zap.Error(err))
return &querypb.GetShardLeadersResponse{
Status: status,
}, nil
@ -1114,7 +1119,7 @@ func (qc *QueryCoord) GetShardLeaders(ctx context.Context, req *querypb.GetShard
if err != nil {
status.ErrorCode = commonpb.ErrorCode_MetaFailed
status.Reason = err.Error()
log.Error("GetShardLeadersResponse failed to get replicas",
log.Warn("GetShardLeadersResponse failed to get replicas",
zap.String("role", typeutil.QueryCoordRole),
zap.Int64("collectionID", req.CollectionID),
zap.Int64("msgID", req.Base.MsgID),
@ -1148,6 +1153,11 @@ func (qc *QueryCoord) GetShardLeaders(ctx context.Context, req *querypb.GetShard
shardLeaderLists = append(shardLeaderLists, shard)
}
log.Info("GetShardLeaders finished",
zap.String("role", typeutil.QueryCoordRole),
zap.Int64("collectionID", req.CollectionID),
zap.Any("replicas", shardLeaderLists))
return &querypb.GetShardLeadersResponse{
Status: status,
Shards: shardLeaderLists,

View File

@ -122,7 +122,7 @@ func (ic *IndexChecker) reloadFromKV() error {
// in case handoffReqChan is full, and block start process
go ic.enqueueHandoffReq(segmentInfo)
} else {
log.Debug("reloadFromKV: collection/partition has not been loaded, remove req from etcd", zap.Any("segmentInfo", segmentInfo))
log.Info("reloadFromKV: collection/partition has not been loaded, remove req from etcd", zap.Any("segmentInfo", segmentInfo))
buildQuerySegmentPath := fmt.Sprintf("%s/%d/%d/%d", handoffSegmentPrefix, segmentInfo.CollectionID, segmentInfo.PartitionID, segmentInfo.SegmentID)
err = ic.client.Remove(buildQuerySegmentPath)
if err != nil {
@ -130,7 +130,7 @@ func (ic *IndexChecker) reloadFromKV() error {
return err
}
}
log.Debug("reloadFromKV: process handoff request done", zap.Any("segmentInfo", segmentInfo))
log.Info("reloadFromKV: process handoff request done", zap.Any("segmentInfo", segmentInfo))
}
return nil
@ -207,7 +207,7 @@ func (ic *IndexChecker) checkIndexLoop() {
continue
}
log.Debug("checkIndexLoop: segment has been compacted and dropped before handoff", zap.Int64("segmentID", segmentInfo.SegmentID))
log.Info("checkIndexLoop: segment has been compacted and dropped before handoff", zap.Int64("segmentID", segmentInfo.SegmentID))
}
buildQuerySegmentPath := fmt.Sprintf("%s/%d/%d/%d", handoffSegmentPrefix, segmentInfo.CollectionID, segmentInfo.PartitionID, segmentInfo.SegmentID)
@ -220,8 +220,7 @@ func (ic *IndexChecker) checkIndexLoop() {
}
case segmentInfo := <-ic.unIndexedSegmentsChan:
//TODO:: check index after load collection/partition, some segments may don't has index when loading
log.Debug("checkIndexLoop: start check index for segment which has not loaded index", zap.Int64("segmentID", segmentInfo.SegmentID))
log.Warn("checkIndexLoop: start check index for segment which has not loaded index", zap.Int64("segmentID", segmentInfo.SegmentID))
}
}
}
@ -237,7 +236,7 @@ func (ic *IndexChecker) processHandoffAfterIndexDone() {
collectionID := segmentInfo.CollectionID
partitionID := segmentInfo.PartitionID
segmentID := segmentInfo.SegmentID
log.Debug("processHandoffAfterIndexDone: handoff segment start", zap.Any("segmentInfo", segmentInfo))
log.Info("processHandoffAfterIndexDone: handoff segment start", zap.Any("segmentInfo", segmentInfo))
baseTask := newBaseTask(ic.ctx, querypb.TriggerCondition_Handoff)
handoffReq := &querypb.HandoffSegmentsRequest{
Base: &commonpb.MsgBase{
@ -265,7 +264,7 @@ func (ic *IndexChecker) processHandoffAfterIndexDone() {
log.Warn("processHandoffAfterIndexDone: handoffTask failed", zap.Error(err))
}
log.Debug("processHandoffAfterIndexDone: handoffTask completed", zap.Any("segment infos", handoffTask.SegmentInfos))
log.Info("processHandoffAfterIndexDone: handoffTask completed", zap.Any("segment infos", handoffTask.SegmentInfos))
}()
// once task enqueue, etcd data can be cleaned, handoffTask will recover from taskScheduler's reloadFromKV()

View File

@ -166,8 +166,6 @@ func newMeta(ctx context.Context, kv kv.MetaKv, factory dependency.Factory, idAl
}
func (m *MetaReplica) reloadFromKV() error {
log.Debug("start reload from kv")
log.Info("recovery collections...")
collectionKeys, collectionValues, err := m.getKvClient().LoadWithPrefix(collectionMetaPrefix)
if err != nil {
@ -288,7 +286,7 @@ func (m *MetaReplica) reloadFromKV() error {
}
//TODO::update partition states
log.Debug("reload from kv finished")
log.Info("reload from kv finished")
return nil
}
@ -484,7 +482,7 @@ func (m *MetaReplica) addPartitions(collectionID UniqueID, partitionIDs []Unique
collectionInfo.PartitionStates = newPartitionStates
collectionInfo.ReleasedPartitionIDs = newReleasedPartitionIDs
log.Debug("add a partition to MetaReplica", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", collectionInfo.PartitionIDs))
log.Info("add a partition to MetaReplica", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", collectionInfo.PartitionIDs))
err := saveGlobalCollectionInfo(collectionID, collectionInfo, m.getKvClient())
if err != nil {
log.Error("save collectionInfo error", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", collectionInfo.PartitionIDs), zap.Any("error", err.Error()))
@ -824,7 +822,7 @@ func (m *MetaReplica) sendSealedSegmentChangeInfos(collectionID UniqueID, queryC
return nil, fmt.Errorf("sendSealedSegmentChangeInfos: length of the positions in stream is not correct, collectionID = %d, query channel = %s, len = %d", collectionID, queryChannel, len(messageIDs))
}
log.Debug("updateGlobalSealedSegmentInfos: send sealed segment change info to queryChannel", zap.Any("msgPack", msgPack))
log.Info("updateGlobalSealedSegmentInfos: send sealed segment change info to queryChannel", zap.Any("msgPack", msgPack))
return &internalpb.MsgPosition{
ChannelName: queryChannel,
MsgID: messageIDs[0].Serialize(),
@ -926,7 +924,6 @@ func (m *MetaReplica) setDmChannelInfos(dmChannelWatchInfos []*querypb.DmChannel
err := saveDmChannelWatchInfos(dmChannelWatchInfos, m.getKvClient())
if err != nil {
log.Error("save dmChannelWatchInfo error", zap.Any("error", err.Error()))
return err
}
for _, channelInfo := range dmChannelWatchInfos {
@ -943,7 +940,7 @@ func (m *MetaReplica) createQueryChannel(collectionID UniqueID) *querypb.QueryCh
allocatedQueryChannel := fmt.Sprintf("%s-0", Params.CommonCfg.QueryCoordSearch)
allocatedQueryResultChannel := fmt.Sprintf("%s-0", Params.CommonCfg.QueryCoordSearchResult)
log.Debug("query coordinator is creating query channel",
log.Info("query coordinator is creating query channel",
zap.String("query channel name", allocatedQueryChannel),
zap.String("query result channel name", allocatedQueryResultChannel))
@ -987,7 +984,7 @@ func (m *MetaReplica) setDeltaChannel(collectionID UniqueID, infos []*datapb.Vch
log.Error("save delta channel info error", zap.Int64("collectionID", collectionID), zap.Error(err))
return err
}
log.Debug("save delta channel infos to meta", zap.Any("collectionID", collectionID))
log.Info("save delta channel infos to meta", zap.Any("collectionID", collectionID))
m.deltaChannelInfos[collectionID] = infos
return nil
}
@ -1025,7 +1022,7 @@ func (m *MetaReplica) getQueryStreamByID(collectionID UniqueID, queryChannel str
queryStream.AsProducer([]string{queryChannel})
m.queryStreams[collectionID] = queryStream
log.Debug("getQueryStreamByID: create query msgStream for collection", zap.Int64("collectionID", collectionID))
log.Info("getQueryStreamByID: create query msgStream for collection", zap.Int64("collectionID", collectionID))
}
return queryStream, nil

View File

@ -136,7 +136,7 @@ func (qc *QueryCoord) initSession() error {
// Init function initializes the queryCoord's meta, cluster, etcdKV and task scheduler
func (qc *QueryCoord) Init() error {
log.Debug("query coordinator start init, session info", zap.String("metaPath", Params.EtcdCfg.MetaRootPath), zap.String("address", Params.QueryCoordCfg.Address))
log.Info("query coordinator start init, session info", zap.String("metaPath", Params.EtcdCfg.MetaRootPath), zap.String("address", Params.QueryCoordCfg.Address))
var initError error
qc.initOnce.Do(func() {
err := qc.initSession()
@ -145,7 +145,6 @@ func (qc *QueryCoord) Init() error {
initError = err
return
}
log.Debug("queryCoord try to connect etcd")
etcdKV := etcdkv.NewEtcdKV(qc.etcdCli, Params.EtcdCfg.MetaRootPath)
qc.kvClient = etcdKV
log.Debug("query coordinator try to connect etcd success")
@ -155,7 +154,7 @@ func (qc *QueryCoord) Init() error {
idAllocator := allocator.NewGlobalIDAllocator("idTimestamp", idAllocatorKV)
initError = idAllocator.Initialize()
if initError != nil {
log.Debug("query coordinator idAllocator initialize failed", zap.Error(initError))
log.Error("query coordinator idAllocator initialize failed", zap.Error(initError))
return
}
qc.idAllocator = func() (UniqueID, error) {
@ -220,20 +219,20 @@ func (qc *QueryCoord) Init() error {
qc.metricsCacheManager = metricsinfo.NewMetricsCacheManager()
})
log.Debug("QueryCoord init success")
log.Info("QueryCoord init success")
return initError
}
// Start function starts the goroutines to watch the meta and node updates
func (qc *QueryCoord) Start() error {
qc.scheduler.Start()
log.Debug("start scheduler ...")
log.Info("start scheduler ...")
qc.indexChecker.start()
log.Debug("start index checker ...")
log.Info("start index checker ...")
qc.handler.start()
log.Debug("start channel unsubscribe loop ...")
log.Info("start channel unsubscribe loop ...")
Params.QueryCoordCfg.CreatedTime = time.Now()
Params.QueryCoordCfg.UpdatedTime = time.Now()
@ -260,17 +259,17 @@ func (qc *QueryCoord) Stop() error {
if qc.scheduler != nil {
qc.scheduler.Close()
log.Debug("close scheduler ...")
log.Info("close scheduler ...")
}
if qc.indexChecker != nil {
qc.indexChecker.close()
log.Debug("close index checker ...")
log.Info("close index checker ...")
}
if qc.handler != nil {
qc.handler.close()
log.Debug("close channel unsubscribe loop ...")
log.Info("close channel unsubscribe loop ...")
}
if qc.loopCancel != nil {
@ -278,6 +277,7 @@ func (qc *QueryCoord) Stop() error {
log.Info("cancel the loop of QueryCoord")
}
log.Warn("Query Coord stopped successfully...")
qc.loopWg.Wait()
qc.session.Revoke(time.Second)
return nil
@ -342,7 +342,7 @@ func (qc *QueryCoord) watchNodeLoop() {
ctx, cancel := context.WithCancel(qc.loopCtx)
defer cancel()
defer qc.loopWg.Done()
log.Debug("QueryCoord start watch node loop")
log.Info("QueryCoord start watch node loop")
unallocatedNodes := qc.getUnallocatedNodes()
for _, n := range unallocatedNodes {
@ -372,7 +372,7 @@ func (qc *QueryCoord) watchNodeLoop() {
}
//TODO::deal enqueue error
qc.scheduler.Enqueue(loadBalanceTask)
log.Debug("start a loadBalance task", zap.Any("task", loadBalanceTask))
log.Info("start a loadBalance task", zap.Any("task", loadBalanceTask))
}
// TODO silverxia add Rewatch logic
@ -464,7 +464,7 @@ func (qc *QueryCoord) handleNodeEvent(ctx context.Context) {
switch event.EventType {
case sessionutil.SessionAddEvent:
serverID := event.Session.ServerID
log.Debug("start add a QueryNode to cluster", zap.Any("nodeID", serverID))
log.Info("start add a QueryNode to cluster", zap.Any("nodeID", serverID))
err := qc.cluster.registerNode(ctx, event.Session, serverID, disConnect)
if err != nil {
log.Error("QueryCoord failed to register a QueryNode", zap.Int64("nodeID", serverID), zap.String("error info", err.Error()))
@ -476,7 +476,7 @@ func (qc *QueryCoord) handleNodeEvent(ctx context.Context) {
qc.metricsCacheManager.InvalidateSystemInfoMetrics()
case sessionutil.SessionDelEvent:
serverID := event.Session.ServerID
log.Debug("get a del event after QueryNode down", zap.Int64("nodeID", serverID))
log.Info("get a del event after QueryNode down", zap.Int64("nodeID", serverID))
nodeExist := qc.cluster.hasNode(serverID)
if !nodeExist {
log.Error("QueryNode not exist", zap.Int64("nodeID", serverID))
@ -504,7 +504,7 @@ func (qc *QueryCoord) handleNodeEvent(ctx context.Context) {
qc.metricsCacheManager.InvalidateSystemInfoMetrics()
//TODO:: deal enqueue error
qc.scheduler.Enqueue(loadBalanceTask)
log.Debug("start a loadBalance task", zap.Any("task", loadBalanceTask))
log.Info("start a loadBalance task", zap.Any("task", loadBalanceTask))
}
}
}
@ -515,7 +515,7 @@ func (qc *QueryCoord) watchHandoffSegmentLoop() {
defer cancel()
defer qc.loopWg.Done()
log.Debug("QueryCoord start watch segment loop")
log.Info("QueryCoord start watch segment loop")
watchChan := qc.kvClient.WatchWithRevision(handoffSegmentPrefix, qc.indexChecker.revision+1)
@ -536,9 +536,9 @@ func (qc *QueryCoord) watchHandoffSegmentLoop() {
validHandoffReq, _ := qc.indexChecker.verifyHandoffReqValid(segmentInfo)
if Params.QueryCoordCfg.AutoHandoff && validHandoffReq {
qc.indexChecker.enqueueHandoffReq(segmentInfo)
log.Debug("watchHandoffSegmentLoop: enqueue a handoff request to index checker", zap.Any("segment info", segmentInfo))
log.Info("watchHandoffSegmentLoop: enqueue a handoff request to index checker", zap.Any("segment info", segmentInfo))
} else {
log.Debug("watchHandoffSegmentLoop: collection/partition has not been loaded or autoHandoff equal to false, remove req from etcd", zap.Any("segmentInfo", segmentInfo))
log.Info("watchHandoffSegmentLoop: collection/partition has not been loaded or autoHandoff equal to false, remove req from etcd", zap.Any("segmentInfo", segmentInfo))
buildQuerySegmentPath := fmt.Sprintf("%s/%d/%d/%d", handoffSegmentPrefix, segmentInfo.CollectionID, segmentInfo.PartitionID, segmentInfo.SegmentID)
err = qc.kvClient.Remove(buildQuerySegmentPath)
if err != nil {
@ -558,7 +558,7 @@ func (qc *QueryCoord) loadBalanceSegmentLoop() {
ctx, cancel := context.WithCancel(qc.loopCtx)
defer cancel()
defer qc.loopWg.Done()
log.Debug("QueryCoord start load balance segment loop")
log.Info("QueryCoord start load balance segment loop")
timer := time.NewTicker(time.Duration(Params.QueryCoordCfg.BalanceIntervalSeconds) * time.Second)
@ -625,7 +625,7 @@ func (qc *QueryCoord) loadBalanceSegmentLoop() {
nodeID2SegmentInfos[nodeID] = leastSegmentInfos
}
}
log.Debug("loadBalanceSegmentLoop: memory usage rate of all online QueryNode", zap.Any("mem rate", nodeID2MemUsageRate))
log.Info("loadBalanceSegmentLoop: memory usage rate of all online QueryNode", zap.Any("mem rate", nodeID2MemUsageRate))
if len(availableNodeIDs) <= 1 {
log.Warn("loadBalanceSegmentLoop: there are too few available query nodes to balance", zap.Int64s("onlineNodeIDs", onlineNodeIDs), zap.Int64s("availableNodeIDs", availableNodeIDs))
continue
@ -695,7 +695,7 @@ func (qc *QueryCoord) loadBalanceSegmentLoop() {
}
for _, t := range loadBalanceTasks {
qc.scheduler.Enqueue(t)
log.Debug("loadBalanceSegmentLoop: enqueue a loadBalance task", zap.Any("task", t))
log.Info("loadBalanceSegmentLoop: enqueue a loadBalance task", zap.Any("task", t))
err := t.waitToFinish()
if err != nil {
// if failed, wait for next balance loop
@ -703,10 +703,10 @@ func (qc *QueryCoord) loadBalanceSegmentLoop() {
// it also may be other abnormal errors
log.Error("loadBalanceSegmentLoop: balance task execute failed", zap.Any("task", t))
} else {
log.Debug("loadBalanceSegmentLoop: balance task execute success", zap.Any("task", t))
log.Info("loadBalanceSegmentLoop: balance task execute success", zap.Any("task", t))
}
}
log.Debug("loadBalanceSegmentLoop: load balance Done in this loop", zap.Any("tasks", loadBalanceTasks))
log.Info("loadBalanceSegmentLoop: load balance Done in this loop", zap.Any("tasks", loadBalanceTasks))
}
}
}

View File

@ -127,7 +127,7 @@ func (qn *queryNode) start() error {
qn.state = online
}
qn.stateLock.Unlock()
log.Debug("start: queryNode client start success", zap.Int64("nodeID", qn.id), zap.String("address", qn.address))
log.Info("start: queryNode client start success", zap.Int64("nodeID", qn.id), zap.String("address", qn.address))
return nil
}
@ -295,7 +295,7 @@ func (qn *queryNode) removeQueryChannel(ctx context.Context, in *querypb.RemoveQ
func (qn *queryNode) releaseCollection(ctx context.Context, in *querypb.ReleaseCollectionRequest) error {
if !qn.isOnline() {
log.Debug("ReleaseCollection: the QueryNode has been offline, the release request is no longer needed", zap.Int64("nodeID", qn.id))
log.Warn("ReleaseCollection: the QueryNode has been offline, the release request is no longer needed", zap.Int64("nodeID", qn.id))
return nil
}

View File

@ -74,7 +74,7 @@ func shuffleSegmentsToQueryNode(ctx context.Context, reqs []*querypb.LoadSegment
}
if len(availableNodeIDs) > 0 {
log.Debug("shuffleSegmentsToQueryNode: shuffle segment to available QueryNode", zap.Int64s("available nodeIDs", availableNodeIDs))
log.Info("shuffleSegmentsToQueryNode: shuffle segment to available QueryNode", zap.Int64s("available nodeIDs", availableNodeIDs))
for _, req := range reqs {
sort.Slice(availableNodeIDs, func(i, j int) bool {
return nodeID2NumSegment[availableNodeIDs[i]] < nodeID2NumSegment[availableNodeIDs[j]]
@ -109,7 +109,7 @@ func shuffleSegmentsToQueryNodeV2(ctx context.Context, reqs []*querypb.LoadSegme
dataSizePerReq[offset] = reqSize
}
log.Debug("shuffleSegmentsToQueryNodeV2: get the segment size of loadReqs end", zap.Int64s("segment size of reqs", dataSizePerReq))
log.Info("shuffleSegmentsToQueryNodeV2: get the segment size of loadReqs end", zap.Int64s("segment size of reqs", dataSizePerReq))
for {
// online nodes map and totalMem, usedMem, memUsage of every node
totalMem := make(map[int64]uint64)
@ -151,7 +151,7 @@ func shuffleSegmentsToQueryNodeV2(ctx context.Context, reqs []*querypb.LoadSegme
queryNodeInfo := nodeInfo.(*queryNode)
// avoid allocate segment to node which memUsageRate is high
if queryNodeInfo.memUsageRate >= Params.QueryCoordCfg.OverloadedMemoryThresholdPercentage {
log.Debug("shuffleSegmentsToQueryNodeV2: queryNode memUsageRate large than MaxMemUsagePerNode", zap.Int64("nodeID", nodeID), zap.Float64("current rate", queryNodeInfo.memUsageRate))
log.Info("shuffleSegmentsToQueryNodeV2: queryNode memUsageRate large than MaxMemUsagePerNode", zap.Int64("nodeID", nodeID), zap.Float64("current rate", queryNodeInfo.memUsageRate))
continue
}
@ -160,7 +160,7 @@ func shuffleSegmentsToQueryNodeV2(ctx context.Context, reqs []*querypb.LoadSegme
availableNodeIDs = append(availableNodeIDs, nodeID)
}
if len(availableNodeIDs) > 0 {
log.Debug("shuffleSegmentsToQueryNodeV2: shuffle segment to available QueryNode", zap.Int64s("available nodeIDs", availableNodeIDs))
log.Info("shuffleSegmentsToQueryNodeV2: shuffle segment to available QueryNode", zap.Int64s("available nodeIDs", availableNodeIDs))
memoryInsufficient := false
for offset, sizeOfReq := range dataSizePerReq {
// sort nodes by memUsageRate, low to high
@ -190,7 +190,7 @@ func shuffleSegmentsToQueryNodeV2(ctx context.Context, reqs []*querypb.LoadSegme
// shuffle segment success
if !memoryInsufficient {
log.Debug("shuffleSegmentsToQueryNodeV2: shuffle segment to query node success")
log.Info("shuffleSegmentsToQueryNodeV2: shuffle segment to query node success")
return nil
}

View File

@ -363,7 +363,7 @@ func (lct *loadCollectionTask) preExecute(ctx context.Context) error {
collectionID := lct.CollectionID
schema := lct.Schema
lct.setResultInfo(nil)
log.Debug("start do loadCollectionTask",
log.Info("start do loadCollectionTask",
zap.Int64("msgID", lct.getTaskID()),
zap.Int64("collectionID", collectionID),
zap.Stringer("schema", schema),
@ -381,7 +381,7 @@ func (lct *loadCollectionTask) execute(ctx context.Context) error {
lct.setResultInfo(err)
return err
}
log.Debug("loadCollectionTask: get collection's all partitionIDs", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", partitionIds), zap.Int64("msgID", lct.Base.MsgID))
log.Info("loadCollectionTask: get collection's all partitionIDs", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", partitionIds), zap.Int64("msgID", lct.Base.MsgID))
var (
replicas = make([]*milvuspb.ReplicaInfo, lct.ReplicaNumber)
@ -524,10 +524,10 @@ func (lct *loadCollectionTask) execute(ctx context.Context) error {
DmChannelName: task.WatchDmChannelsRequest.Infos[0].ChannelName,
})
}
log.Debug("loadCollectionTask: add a childTask", zap.Int64("collectionID", collectionID), zap.Int32("task type", int32(internalTask.msgType())), zap.Int64("msgID", lct.Base.MsgID))
log.Info("loadCollectionTask: add a childTask", zap.Int64("collectionID", collectionID), zap.Int32("task type", int32(internalTask.msgType())), zap.Int64("msgID", lct.Base.MsgID))
}
metrics.QueryCoordNumChildTasks.WithLabelValues().Add(float64(len(internalTasks)))
log.Debug("loadCollectionTask: assign child task done", zap.Int64("collectionID", collectionID), zap.Int64("msgID", lct.Base.MsgID))
log.Info("loadCollectionTask: assign child task done", zap.Int64("collectionID", collectionID), zap.Int64("msgID", lct.Base.MsgID))
}
err = lct.meta.addCollection(collectionID, querypb.LoadType_LoadCollection, lct.Schema)
@ -553,7 +553,7 @@ func (lct *loadCollectionTask) execute(ctx context.Context) error {
}
}
log.Debug("LoadCollection execute done",
log.Info("LoadCollection execute done",
zap.Int64("msgID", lct.getTaskID()),
zap.Int64("collectionID", collectionID))
return nil
@ -570,7 +570,7 @@ func (lct *loadCollectionTask) postExecute(ctx context.Context) error {
}
}
log.Debug("loadCollectionTask postExecute done",
log.Info("loadCollectionTask postExecute done",
zap.Int64("msgID", lct.getTaskID()),
zap.Int64("collectionID", collectionID))
return nil
@ -605,7 +605,7 @@ func (lct *loadCollectionTask) rollBack(ctx context.Context) []task {
panic(err)
}
log.Debug("loadCollectionTask: generate rollBack task for loadCollectionTask", zap.Int64("collectionID", lct.CollectionID), zap.Int64("msgID", lct.Base.MsgID))
log.Info("loadCollectionTask: generate rollBack task for loadCollectionTask", zap.Int64("collectionID", lct.CollectionID), zap.Int64("msgID", lct.Base.MsgID))
return resultTasks
}
@ -650,7 +650,7 @@ func (rct *releaseCollectionTask) updateTaskProcess() {
func (rct *releaseCollectionTask) preExecute(context.Context) error {
collectionID := rct.CollectionID
rct.setResultInfo(nil)
log.Debug("start do releaseCollectionTask",
log.Info("start do releaseCollectionTask",
zap.Int64("msgID", rct.getTaskID()),
zap.Int64("collectionID", collectionID))
return nil
@ -686,7 +686,7 @@ func (rct *releaseCollectionTask) execute(ctx context.Context) error {
}
rct.addChildTask(releaseCollectionTask)
log.Debug("releaseCollectionTask: add a releaseCollectionTask to releaseCollectionTask's childTask", zap.Any("task", releaseCollectionTask))
log.Info("releaseCollectionTask: add a releaseCollectionTask to releaseCollectionTask's childTask", zap.Any("task", releaseCollectionTask))
}
} else {
err := rct.cluster.releaseCollection(ctx, rct.NodeID, rct.ReleaseCollectionRequest)
@ -698,7 +698,7 @@ func (rct *releaseCollectionTask) execute(ctx context.Context) error {
}
}
log.Debug("releaseCollectionTask Execute done",
log.Info("releaseCollectionTask Execute done",
zap.Int64("msgID", rct.getTaskID()),
zap.Int64("collectionID", collectionID),
zap.Int64("nodeID", rct.NodeID))
@ -711,7 +711,7 @@ func (rct *releaseCollectionTask) postExecute(context.Context) error {
rct.clearChildTasks()
}
log.Debug("releaseCollectionTask postExecute done",
log.Info("releaseCollectionTask postExecute done",
zap.Int64("msgID", rct.getTaskID()),
zap.Int64("collectionID", collectionID),
zap.Int64("nodeID", rct.NodeID))
@ -805,7 +805,7 @@ func (lpt *loadPartitionTask) preExecute(context.Context) error {
collectionID := lpt.CollectionID
lpt.setResultInfo(nil)
log.Debug("start do loadPartitionTask",
log.Info("start do loadPartitionTask",
zap.Int64("msgID", lpt.getTaskID()),
zap.Int64("collectionID", collectionID))
return nil
@ -954,10 +954,10 @@ func (lpt *loadPartitionTask) execute(ctx context.Context) error {
DmChannelName: task.WatchDmChannelsRequest.Infos[0].ChannelName,
})
}
log.Debug("loadPartitionTask: add a childTask", zap.Int64("collectionID", collectionID), zap.Int32("task type", int32(internalTask.msgType())))
log.Info("loadPartitionTask: add a childTask", zap.Int64("collectionID", collectionID), zap.Int32("task type", int32(internalTask.msgType())))
}
metrics.QueryCoordNumChildTasks.WithLabelValues().Add(float64(len(internalTasks)))
log.Debug("loadPartitionTask: assign child task done", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", partitionIDs), zap.Int64("msgID", lpt.Base.MsgID))
log.Info("loadPartitionTask: assign child task done", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", partitionIDs), zap.Int64("msgID", lpt.Base.MsgID))
}
err = lpt.meta.addCollection(collectionID, querypb.LoadType_LoadPartition, lpt.Schema)
@ -983,7 +983,7 @@ func (lpt *loadPartitionTask) execute(ctx context.Context) error {
}
}
log.Debug("loadPartitionTask Execute done",
log.Info("loadPartitionTask Execute done",
zap.Int64("msgID", lpt.getTaskID()),
zap.Int64("collectionID", collectionID),
zap.Int64s("partitionIDs", partitionIDs),
@ -1003,7 +1003,7 @@ func (lpt *loadPartitionTask) postExecute(ctx context.Context) error {
}
}
log.Debug("loadPartitionTask postExecute done",
log.Info("loadPartitionTask postExecute done",
zap.Int64("msgID", lpt.getTaskID()),
zap.Int64("collectionID", collectionID),
zap.Int64s("partitionIDs", partitionIDs))
@ -1042,7 +1042,7 @@ func (lpt *loadPartitionTask) rollBack(ctx context.Context) []task {
log.Error("loadPartitionTask: release collection info from meta failed", zap.Int64("collectionID", collectionID), zap.Int64("msgID", lpt.Base.MsgID), zap.Error(err))
panic(err)
}
log.Debug("loadPartitionTask: generate rollBack task for loadPartitionTask", zap.Int64("collectionID", collectionID), zap.Int64("msgID", lpt.Base.MsgID))
log.Info("loadPartitionTask: generate rollBack task for loadPartitionTask", zap.Int64("collectionID", collectionID), zap.Int64("msgID", lpt.Base.MsgID))
return resultTasks
}
@ -1089,7 +1089,7 @@ func (rpt *releasePartitionTask) preExecute(context.Context) error {
collectionID := rpt.CollectionID
partitionIDs := rpt.PartitionIDs
rpt.setResultInfo(nil)
log.Debug("start do releasePartitionTask",
log.Info("start do releasePartitionTask",
zap.Int64("msgID", rpt.getTaskID()),
zap.Int64("collectionID", collectionID),
zap.Int64s("partitionIDs", partitionIDs))
@ -1117,7 +1117,7 @@ func (rpt *releasePartitionTask) execute(ctx context.Context) error {
meta: rpt.meta,
}
rpt.addChildTask(releasePartitionTask)
log.Debug("releasePartitionTask: add a releasePartitionTask to releasePartitionTask's childTask", zap.Int64("collectionID", collectionID), zap.Int64("msgID", rpt.Base.MsgID))
log.Info("releasePartitionTask: add a releasePartitionTask to releasePartitionTask's childTask", zap.Int64("collectionID", collectionID), zap.Int64("msgID", rpt.Base.MsgID))
}
} else {
err := rpt.cluster.releasePartitions(ctx, rpt.NodeID, rpt.ReleasePartitionsRequest)
@ -1129,7 +1129,7 @@ func (rpt *releasePartitionTask) execute(ctx context.Context) error {
}
}
log.Debug("releasePartitionTask Execute done",
log.Info("releasePartitionTask Execute done",
zap.Int64("msgID", rpt.getTaskID()),
zap.Int64("collectionID", collectionID),
zap.Int64s("partitionIDs", partitionIDs),
@ -1144,7 +1144,7 @@ func (rpt *releasePartitionTask) postExecute(context.Context) error {
rpt.clearChildTasks()
}
log.Debug("releasePartitionTask postExecute done",
log.Info("releasePartitionTask postExecute done",
zap.Int64("msgID", rpt.getTaskID()),
zap.Int64("collectionID", collectionID),
zap.Int64s("partitionIDs", partitionIDs),
@ -1207,7 +1207,7 @@ func (lst *loadSegmentTask) preExecute(context.Context) error {
segmentIDs = append(segmentIDs, info.SegmentID)
}
lst.setResultInfo(nil)
log.Debug("start do loadSegmentTask",
log.Info("start do loadSegmentTask",
zap.Int64s("segmentIDs", segmentIDs),
zap.Int64("loaded nodeID", lst.DstNodeID),
zap.Int64("taskID", lst.getTaskID()))
@ -1224,13 +1224,13 @@ func (lst *loadSegmentTask) execute(ctx context.Context) error {
return err
}
log.Debug("loadSegmentTask Execute done",
log.Info("loadSegmentTask Execute done",
zap.Int64("taskID", lst.getTaskID()))
return nil
}
func (lst *loadSegmentTask) postExecute(context.Context) error {
log.Debug("loadSegmentTask postExecute done",
log.Info("loadSegmentTask postExecute done",
zap.Int64("taskID", lst.getTaskID()))
return nil
}
@ -1306,7 +1306,7 @@ func (rst *releaseSegmentTask) timestamp() Timestamp {
func (rst *releaseSegmentTask) preExecute(context.Context) error {
segmentIDs := rst.SegmentIDs
rst.setResultInfo(nil)
log.Debug("start do releaseSegmentTask",
log.Info("start do releaseSegmentTask",
zap.Int64s("segmentIDs", segmentIDs),
zap.Int64("loaded nodeID", rst.NodeID),
zap.Int64("taskID", rst.getTaskID()))
@ -1323,7 +1323,7 @@ func (rst *releaseSegmentTask) execute(ctx context.Context) error {
return err
}
log.Debug("releaseSegmentTask Execute done",
log.Info("releaseSegmentTask Execute done",
zap.Int64s("segmentIDs", rst.SegmentIDs),
zap.Int64("taskID", rst.getTaskID()))
return nil
@ -1331,7 +1331,7 @@ func (rst *releaseSegmentTask) execute(ctx context.Context) error {
func (rst *releaseSegmentTask) postExecute(context.Context) error {
segmentIDs := rst.SegmentIDs
log.Debug("releaseSegmentTask postExecute done",
log.Info("releaseSegmentTask postExecute done",
zap.Int64s("segmentIDs", segmentIDs),
zap.Int64("taskID", rst.getTaskID()))
return nil
@ -1385,7 +1385,7 @@ func (wdt *watchDmChannelTask) preExecute(context.Context) error {
channels = append(channels, info.ChannelName)
}
wdt.setResultInfo(nil)
log.Debug("start do watchDmChannelTask",
log.Info("start do watchDmChannelTask",
zap.Strings("dmChannels", channels),
zap.Int64("loaded nodeID", wdt.NodeID),
zap.Int64("taskID", wdt.getTaskID()))
@ -1402,13 +1402,13 @@ func (wdt *watchDmChannelTask) execute(ctx context.Context) error {
return err
}
log.Debug("watchDmChannelsTask Execute done",
log.Info("watchDmChannelsTask Execute done",
zap.Int64("taskID", wdt.getTaskID()))
return nil
}
func (wdt *watchDmChannelTask) postExecute(context.Context) error {
log.Debug("watchDmChannelTask postExecute done",
log.Info("watchDmChannelTask postExecute done",
zap.Int64("taskID", wdt.getTaskID()))
return nil
}
@ -1500,7 +1500,7 @@ func (wdt *watchDeltaChannelTask) preExecute(context.Context) error {
channels = append(channels, info.ChannelName)
}
wdt.setResultInfo(nil)
log.Debug("start do watchDeltaChannelTask",
log.Info("start do watchDeltaChannelTask",
zap.Strings("deltaChannels", channels),
zap.Int64("loaded nodeID", wdt.NodeID),
zap.Int64("taskID", wdt.getTaskID()))
@ -1517,13 +1517,13 @@ func (wdt *watchDeltaChannelTask) execute(ctx context.Context) error {
return err
}
log.Debug("watchDeltaChannelsTask Execute done",
log.Info("watchDeltaChannelsTask Execute done",
zap.Int64("taskID", wdt.getTaskID()))
return nil
}
func (wdt *watchDeltaChannelTask) postExecute(context.Context) error {
log.Debug("watchDeltaChannelTask postExecute done",
log.Info("watchDeltaChannelTask postExecute done",
zap.Int64("taskID", wdt.getTaskID()))
return nil
}
@ -1570,7 +1570,7 @@ func (wqt *watchQueryChannelTask) updateTaskProcess() {
func (wqt *watchQueryChannelTask) preExecute(context.Context) error {
wqt.setResultInfo(nil)
log.Debug("start do watchQueryChannelTask",
log.Info("start do watchQueryChannelTask",
zap.Int64("collectionID", wqt.CollectionID),
zap.String("queryChannel", wqt.QueryChannel),
zap.String("queryResultChannel", wqt.QueryResultChannel),
@ -1584,12 +1584,15 @@ func (wqt *watchQueryChannelTask) execute(ctx context.Context) error {
err := wqt.cluster.addQueryChannel(wqt.ctx, wqt.NodeID, wqt.AddQueryChannelRequest)
if err != nil {
log.Warn("watchQueryChannelTask: watchQueryChannel occur error", zap.Int64("taskID", wqt.getTaskID()), zap.Error(err))
log.Warn("watchQueryChannelTask: watchQueryChannel occur error",
zap.Int64("taskID", wqt.getTaskID()),
zap.String("channel", wqt.AddQueryChannelRequest.QueryChannel),
zap.Error(err))
wqt.setResultInfo(err)
return err
}
log.Debug("watchQueryChannelTask Execute done",
log.Info("watchQueryChannelTask Execute done",
zap.Int64("collectionID", wqt.CollectionID),
zap.String("queryChannel", wqt.QueryChannel),
zap.String("queryResultChannel", wqt.QueryResultChannel),
@ -1598,7 +1601,7 @@ func (wqt *watchQueryChannelTask) execute(ctx context.Context) error {
}
func (wqt *watchQueryChannelTask) postExecute(context.Context) error {
log.Debug("watchQueryChannelTask postExecute done",
log.Info("watchQueryChannelTask postExecute done",
zap.Int64("collectionID", wqt.CollectionID),
zap.String("queryChannel", wqt.QueryChannel),
zap.String("queryResultChannel", wqt.QueryResultChannel),
@ -1638,7 +1641,7 @@ func (ht *handoffTask) preExecute(context.Context) error {
for _, info := range segmentInfos {
segmentIDs = append(segmentIDs, info.SegmentID)
}
log.Debug("start do handoff segments task",
log.Info("start do handoff segments task",
zap.Int64s("segmentIDs", segmentIDs))
return nil
}
@ -1652,12 +1655,12 @@ func (ht *handoffTask) execute(ctx context.Context) error {
collectionInfo, err := ht.meta.getCollectionInfoByID(collectionID)
if err != nil {
log.Debug("handoffTask: collection has not been loaded into memory", zap.Int64("collectionID", collectionID), zap.Int64("segmentID", segmentID))
log.Warn("handoffTask: collection has not been loaded into memory", zap.Int64("collectionID", collectionID), zap.Int64("segmentID", segmentID))
continue
}
if collectionInfo.LoadType == querypb.LoadType_LoadCollection && ht.meta.hasReleasePartition(collectionID, partitionID) {
log.Debug("handoffTask: partition has not been released", zap.Int64("collectionID", collectionID), zap.Int64("partitionID", partitionID))
log.Warn("handoffTask: partition has not been released", zap.Int64("collectionID", collectionID), zap.Int64("partitionID", partitionID))
continue
}
@ -1669,7 +1672,7 @@ func (ht *handoffTask) execute(ctx context.Context) error {
}
if collectionInfo.LoadType != querypb.LoadType_LoadCollection && !partitionLoaded {
log.Debug("handoffTask: partition has not been loaded into memory", zap.Int64("collectionID", collectionID), zap.Int64("partitionID", partitionID), zap.Int64("segmentID", segmentID))
log.Warn("handoffTask: partition has not been loaded into memory", zap.Int64("collectionID", collectionID), zap.Int64("partitionID", partitionID), zap.Int64("segmentID", segmentID))
continue
}
@ -1759,7 +1762,7 @@ func (ht *handoffTask) execute(ctx context.Context) error {
}
for _, internalTask := range internalTasks {
ht.addChildTask(internalTask)
log.Debug("handoffTask: add a childTask", zap.Int32("task type", int32(internalTask.msgType())), zap.Int64("segmentID", segmentID))
log.Info("handoffTask: add a childTask", zap.Int32("task type", int32(internalTask.msgType())), zap.Int64("segmentID", segmentID))
}
} else {
err = fmt.Errorf("sealed segment has been exist on query node, segmentID is %d", segmentID)
@ -1769,10 +1772,7 @@ func (ht *handoffTask) execute(ctx context.Context) error {
}
}
log.Debug("handoffTask: assign child task done", zap.Any("segmentInfos", segmentInfos))
log.Debug("handoffTask Execute done",
zap.Int64("taskID", ht.getTaskID()))
log.Info("handoffTask: assign child task done", zap.Any("segmentInfos", segmentInfos), zap.Int64("taskID", ht.getTaskID()))
return nil
}
@ -1781,9 +1781,7 @@ func (ht *handoffTask) postExecute(context.Context) error {
ht.clearChildTasks()
}
log.Debug("handoffTask postExecute done",
zap.Int64("taskID", ht.getTaskID()))
log.Info("handoffTask postExecute done", zap.Int64("taskID", ht.getTaskID()))
return nil
}
@ -1826,7 +1824,7 @@ func (lbt *loadBalanceTask) timestamp() Timestamp {
func (lbt *loadBalanceTask) preExecute(context.Context) error {
lbt.setResultInfo(nil)
log.Debug("start do loadBalanceTask",
log.Info("start do loadBalanceTask",
zap.Int32("trigger type", int32(lbt.triggerCondition)),
zap.Int64s("sourceNodeIDs", lbt.SourceNodeIDs),
zap.Any("balanceReason", lbt.BalanceReason),
@ -1887,7 +1885,7 @@ func (lbt *loadBalanceTask) checkForManualLoadBalance() error {
lbt.replicaID = replicaID
log.Debug("start do loadBalanceTask",
log.Info("start do loadBalanceTask",
zap.Int32("trigger type", int32(lbt.triggerCondition)),
zap.Int64s("sourceNodeIDs", lbt.SourceNodeIDs),
zap.Any("balanceReason", lbt.BalanceReason),
@ -1939,7 +1937,7 @@ func (lbt *loadBalanceTask) execute(ctx context.Context) error {
} else {
toRecoverPartitionIDs = collectionInfo.PartitionIDs
}
log.Debug("loadBalanceTask: get collection's all partitionIDs", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", toRecoverPartitionIDs))
log.Info("loadBalanceTask: get collection's all partitionIDs", zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", toRecoverPartitionIDs))
replica, err := lbt.getReplica(nodeID, collectionID)
if err != nil {
lbt.setResultInfo(err)
@ -2038,9 +2036,9 @@ func (lbt *loadBalanceTask) execute(ctx context.Context) error {
}
for _, internalTask := range internalTasks {
lbt.addChildTask(internalTask)
log.Debug("loadBalanceTask: add a childTask", zap.Int32("task type", int32(internalTask.msgType())), zap.Any("task", internalTask))
log.Info("loadBalanceTask: add a childTask", zap.Int32("task type", int32(internalTask.msgType())), zap.Any("task", internalTask))
}
log.Debug("loadBalanceTask: assign child task done", zap.Int64s("sourceNodeIDs", lbt.SourceNodeIDs))
log.Info("loadBalanceTask: assign child task done", zap.Int64s("sourceNodeIDs", lbt.SourceNodeIDs))
}
if lbt.triggerCondition == querypb.TriggerCondition_LoadBalance {
@ -2173,12 +2171,12 @@ func (lbt *loadBalanceTask) execute(ctx context.Context) error {
}
for _, internalTask := range internalTasks {
lbt.addChildTask(internalTask)
log.Debug("loadBalanceTask: add a childTask", zap.Int32("task type", int32(internalTask.msgType())), zap.Any("balance request", lbt.LoadBalanceRequest))
log.Info("loadBalanceTask: add a childTask", zap.Int32("task type", int32(internalTask.msgType())), zap.Any("balance request", lbt.LoadBalanceRequest))
}
log.Debug("loadBalanceTask: assign child task done", zap.Any("balance request", lbt.LoadBalanceRequest))
log.Info("loadBalanceTask: assign child task done", zap.Any("balance request", lbt.LoadBalanceRequest))
}
log.Debug("loadBalanceTask Execute done",
log.Info("loadBalanceTask Execute done",
zap.Int32("trigger type", int32(lbt.triggerCondition)),
zap.Int64s("sourceNodeIDs", lbt.SourceNodeIDs),
zap.Any("balanceReason", lbt.BalanceReason),
@ -2217,7 +2215,7 @@ func (lbt *loadBalanceTask) postExecute(context.Context) error {
}
}
log.Debug("loadBalanceTask postExecute done",
log.Info("loadBalanceTask postExecute done",
zap.Int32("trigger type", int32(lbt.triggerCondition)),
zap.Int64s("sourceNodeIDs", lbt.SourceNodeIDs),
zap.Any("balanceReason", lbt.BalanceReason),
@ -2230,21 +2228,21 @@ func assignInternalTask(ctx context.Context,
loadSegmentRequests []*querypb.LoadSegmentsRequest,
watchDmChannelRequests []*querypb.WatchDmChannelsRequest,
wait bool, excludeNodeIDs []int64, includeNodeIDs []int64, replicaID int64) ([]task, error) {
log.Debug("assignInternalTask: start assign task to query node")
internalTasks := make([]task, 0)
err := cluster.allocateSegmentsToQueryNode(ctx, loadSegmentRequests, wait, excludeNodeIDs, includeNodeIDs, replicaID)
if err != nil {
log.Error("assignInternalTask: assign segment to node failed", zap.Any("load segments requests", loadSegmentRequests))
log.Error("assignInternalTask: assign segment to node failed", zap.Error(err))
return nil, err
}
log.Debug("assignInternalTask: assign segment to node success")
log.Info("assignInternalTask: assign segment to node success", zap.Int("load segments", len(loadSegmentRequests)))
err = cluster.allocateChannelsToQueryNode(ctx, watchDmChannelRequests, wait, excludeNodeIDs, includeNodeIDs, replicaID)
if err != nil {
log.Error("assignInternalTask: assign dmChannel to node failed", zap.Any("watch dmChannel requests", watchDmChannelRequests))
log.Error("assignInternalTask: assign dmChannel to node failed", zap.Error(err))
return nil, err
}
log.Debug("assignInternalTask: assign dmChannel to node success")
log.Info("assignInternalTask: assign dmChannel to node success", zap.Int("watch dmchannels", len(watchDmChannelRequests)))
if len(loadSegmentRequests) > 0 {
sort.Slice(loadSegmentRequests, func(i, j int) bool {
@ -2335,7 +2333,7 @@ func mergeWatchDeltaChannelInfo(infos []*datapb.VchannelInfo) []*datapb.Vchannel
for _, index := range minPositions {
result = append(result, infos[index])
}
log.Debug("merge delta channels finished",
log.Info("merge delta channels finished",
zap.Any("origin info length", len(infos)),
zap.Any("merged info length", len(result)),
)

View File

@ -22,6 +22,7 @@ import (
"errors"
"fmt"
"path/filepath"
"reflect"
"strconv"
"sync"
@ -110,7 +111,6 @@ func (queue *taskQueue) popTask() task {
defer queue.Unlock()
if queue.tasks.Len() <= 0 {
log.Warn("sorry, but the unissued task list is empty!")
return nil
}
@ -467,6 +467,7 @@ func (scheduler *TaskScheduler) Enqueue(t task) error {
}
func (scheduler *TaskScheduler) processTask(t task) error {
log.Info("begin to process task", zap.Int64("taskID", t.getTaskID()), zap.String("task", reflect.TypeOf(t).String()))
var taskInfoKey string
// assign taskID for childTask and update triggerTask's childTask to etcd
updateKVFn := func(parentTask task) error {
@ -547,11 +548,13 @@ func (scheduler *TaskScheduler) processTask(t task) error {
span.LogFields(oplog.Int64("processTask: scheduler process Execute", t.getTaskID()))
err = t.execute(ctx)
if err != nil {
log.Warn("failed to execute task", zap.Error(err))
trace.LogError(span, err)
return err
}
err = updateKVFn(t)
if err != nil {
log.Warn("failed to execute task", zap.Error(err))
trace.LogError(span, err)
t.setResultInfo(err)
return err
@ -619,12 +622,15 @@ func (scheduler *TaskScheduler) scheduleLoop() {
return
case <-scheduler.triggerTaskQueue.Chan():
triggerTask = scheduler.triggerTaskQueue.popTask()
log.Debug("scheduleLoop: pop a triggerTask from triggerTaskQueue", zap.Int64("triggerTaskID", triggerTask.getTaskID()))
if triggerTask == nil {
break
}
log.Info("scheduleLoop: pop a triggerTask from triggerTaskQueue", zap.Int64("triggerTaskID", triggerTask.getTaskID()))
alreadyNotify := true
if triggerTask.getState() == taskUndo || triggerTask.getState() == taskDoing {
err = scheduler.processTask(triggerTask)
if err != nil {
log.Debug("scheduleLoop: process triggerTask failed", zap.Int64("triggerTaskID", triggerTask.getTaskID()), zap.Error(err))
log.Warn("scheduleLoop: process triggerTask failed", zap.Int64("triggerTaskID", triggerTask.getTaskID()), zap.Error(err))
alreadyNotify = false
}
}
@ -666,7 +672,7 @@ func (scheduler *TaskScheduler) scheduleLoop() {
alreadyNotify = true
}
rollBackTasks := triggerTask.rollBack(scheduler.ctx)
log.Debug("scheduleLoop: start rollBack after triggerTask failed",
log.Info("scheduleLoop: start rollBack after triggerTask failed",
zap.Int64("triggerTaskID", triggerTask.getTaskID()),
zap.Any("rollBackTasks", rollBackTasks))
// there is no need to save rollBacked internal task to etcd
@ -681,7 +687,7 @@ func (scheduler *TaskScheduler) scheduleLoop() {
log.Error("scheduleLoop: error when remove trigger and internal tasks from etcd", zap.Int64("triggerTaskID", triggerTask.getTaskID()), zap.Error(err))
triggerTask.setResultInfo(err)
} else {
log.Debug("scheduleLoop: trigger task done and delete from etcd", zap.Int64("triggerTaskID", triggerTask.getTaskID()))
log.Info("scheduleLoop: trigger task done and delete from etcd", zap.Int64("triggerTaskID", triggerTask.getTaskID()))
}
resultStatus := triggerTask.getResultInfo()
@ -707,7 +713,7 @@ func (scheduler *TaskScheduler) waitActivateTaskDone(wg *sync.WaitGroup, t task,
var err error
redoFunc1 := func() {
if !t.isValid() || !t.isRetryable() {
log.Debug("waitActivateTaskDone: reSchedule the activate task",
log.Info("waitActivateTaskDone: reSchedule the activate task",
zap.Int64("taskID", t.getTaskID()),
zap.Int64("triggerTaskID", triggerTask.getTaskID()))
reScheduledTasks, err := t.reschedule(scheduler.ctx)
@ -737,7 +743,7 @@ func (scheduler *TaskScheduler) waitActivateTaskDone(wg *sync.WaitGroup, t task,
return
}
rt.setTaskID(id)
log.Debug("waitActivateTaskDone: reScheduler set id", zap.Int64("id", rt.getTaskID()))
log.Info("waitActivateTaskDone: reScheduler set id", zap.Int64("id", rt.getTaskID()))
taskKey := fmt.Sprintf("%s/%d", activeTaskPrefix, rt.getTaskID())
blobs, err := rt.marshal()
if err != nil {
@ -760,7 +766,7 @@ func (scheduler *TaskScheduler) waitActivateTaskDone(wg *sync.WaitGroup, t task,
return
}
triggerTask.removeChildTaskByID(t.getTaskID())
log.Debug("waitActivateTaskDone: delete failed active task and save reScheduled task to etcd",
log.Info("waitActivateTaskDone: delete failed active task and save reScheduled task to etcd",
zap.Int64("triggerTaskID", triggerTask.getTaskID()),
zap.Int64("failed taskID", t.getTaskID()),
zap.Any("reScheduled tasks", reScheduledTasks))
@ -768,7 +774,7 @@ func (scheduler *TaskScheduler) waitActivateTaskDone(wg *sync.WaitGroup, t task,
for _, rt := range reScheduledTasks {
if rt != nil {
triggerTask.addChildTask(rt)
log.Debug("waitActivateTaskDone: add a reScheduled active task to activateChan", zap.Int64("taskID", rt.getTaskID()))
log.Info("waitActivateTaskDone: add a reScheduled active task to activateChan", zap.Int64("taskID", rt.getTaskID()))
scheduler.activateTaskChan <- rt
wg.Add(1)
go scheduler.waitActivateTaskDone(wg, rt, triggerTask)
@ -776,7 +782,7 @@ func (scheduler *TaskScheduler) waitActivateTaskDone(wg *sync.WaitGroup, t task,
}
//delete task from etcd
} else {
log.Debug("waitActivateTaskDone: retry the active task",
log.Info("waitActivateTaskDone: retry the active task",
zap.Int64("taskID", t.getTaskID()),
zap.Int64("triggerTaskID", triggerTask.getTaskID()))
scheduler.activateTaskChan <- t
@ -794,7 +800,7 @@ func (scheduler *TaskScheduler) waitActivateTaskDone(wg *sync.WaitGroup, t task,
triggerTask.setResultInfo(err)
return
}
log.Debug("waitActivateTaskDone: retry the active task",
log.Info("waitActivateTaskDone: retry the active task",
zap.Int64("taskID", t.getTaskID()),
zap.Int64("triggerTaskID", triggerTask.getTaskID()))
scheduler.activateTaskChan <- t
@ -804,7 +810,7 @@ func (scheduler *TaskScheduler) waitActivateTaskDone(wg *sync.WaitGroup, t task,
}
err = t.waitToFinish()
if err != nil {
log.Debug("waitActivateTaskDone: activate task return err",
log.Warn("waitActivateTaskDone: activate task return err",
zap.Int64("taskID", t.getTaskID()),
zap.Int64("triggerTaskID", triggerTask.getTaskID()),
zap.Error(err))
@ -828,7 +834,7 @@ func (scheduler *TaskScheduler) waitActivateTaskDone(wg *sync.WaitGroup, t task,
//TODO:: case commonpb.MsgType_RemoveDmChannels:
}
} else {
log.Debug("waitActivateTaskDone: one activate task done",
log.Info("waitActivateTaskDone: one activate task done",
zap.Int64("taskID", t.getTaskID()),
zap.Int64("triggerTaskID", triggerTask.getTaskID()))
metrics.QueryCoordChildTaskLatency.WithLabelValues().Observe(float64(t.elapseSpan().Milliseconds()))
@ -841,9 +847,8 @@ func (scheduler *TaskScheduler) processActivateTaskLoop() {
for {
select {
case <-scheduler.stopActivateTaskLoopChan:
log.Debug("processActivateTaskLoop, ctx done")
log.Info("processActivateTaskLoop, ctx done")
return
case t := <-scheduler.activateTaskChan:
if t == nil {
log.Error("processActivateTaskLoop: pop a nil active task", zap.Int64("taskID", t.getTaskID()))
@ -851,7 +856,6 @@ func (scheduler *TaskScheduler) processActivateTaskLoop() {
}
if t.getState() != taskDone {
log.Debug("processActivateTaskLoop: pop an active task from activateChan", zap.Int64("taskID", t.getTaskID()))
go func() {
err := scheduler.processTask(t)
t.notify(err)