mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-03 12:29:36 +08:00
Use typeutil.QueryCoordRole instead of Params.RoleName in querycoord (#13216)
Signed-off-by: yudong.cai <yudong.cai@zilliz.com>
This commit is contained in:
parent
b5d639a1ab
commit
b7e97f67e5
@ -21,16 +21,16 @@ import (
|
|||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/milvus-io/milvus/internal/common"
|
|
||||||
|
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
|
||||||
|
"github.com/milvus-io/milvus/internal/common"
|
||||||
"github.com/milvus-io/milvus/internal/log"
|
"github.com/milvus-io/milvus/internal/log"
|
||||||
"github.com/milvus-io/milvus/internal/proto/commonpb"
|
"github.com/milvus-io/milvus/internal/proto/commonpb"
|
||||||
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
"github.com/milvus-io/milvus/internal/proto/internalpb"
|
||||||
"github.com/milvus-io/milvus/internal/proto/milvuspb"
|
"github.com/milvus-io/milvus/internal/proto/milvuspb"
|
||||||
"github.com/milvus-io/milvus/internal/proto/querypb"
|
"github.com/milvus-io/milvus/internal/proto/querypb"
|
||||||
"github.com/milvus-io/milvus/internal/util/metricsinfo"
|
"github.com/milvus-io/milvus/internal/util/metricsinfo"
|
||||||
|
"github.com/milvus-io/milvus/internal/util/typeutil"
|
||||||
)
|
)
|
||||||
|
|
||||||
// GetComponentStates return information about whether the coord is healthy
|
// GetComponentStates return information about whether the coord is healthy
|
||||||
@ -145,7 +145,8 @@ func (qc *QueryCoord) ShowCollections(ctx context.Context, req *querypb.ShowColl
|
|||||||
func (qc *QueryCoord) LoadCollection(ctx context.Context, req *querypb.LoadCollectionRequest) (*commonpb.Status, error) {
|
func (qc *QueryCoord) LoadCollection(ctx context.Context, req *querypb.LoadCollectionRequest) (*commonpb.Status, error) {
|
||||||
collectionID := req.CollectionID
|
collectionID := req.CollectionID
|
||||||
//schema := req.Schema
|
//schema := req.Schema
|
||||||
log.Debug("loadCollectionRequest received", zap.String("role", Params.RoleName), zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", collectionID),
|
log.Debug("loadCollectionRequest received", zap.String("role", typeutil.QueryCoordRole),
|
||||||
|
zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", collectionID),
|
||||||
zap.Stringer("schema", req.Schema))
|
zap.Stringer("schema", req.Schema))
|
||||||
status := &commonpb.Status{
|
status := &commonpb.Status{
|
||||||
ErrorCode: commonpb.ErrorCode_Success,
|
ErrorCode: commonpb.ErrorCode_Success,
|
||||||
@ -182,7 +183,9 @@ func (qc *QueryCoord) LoadCollection(ctx context.Context, req *querypb.LoadColle
|
|||||||
return status, nil
|
return status, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Debug("loadCollectionRequest completed", zap.String("role", Params.RoleName), zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", collectionID), zap.Any("status", status))
|
log.Debug("loadCollectionRequest completed", zap.String("role", typeutil.QueryCoordRole),
|
||||||
|
zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", collectionID),
|
||||||
|
zap.Any("status", status))
|
||||||
return status, nil
|
return status, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -190,7 +193,8 @@ func (qc *QueryCoord) LoadCollection(ctx context.Context, req *querypb.LoadColle
|
|||||||
func (qc *QueryCoord) ReleaseCollection(ctx context.Context, req *querypb.ReleaseCollectionRequest) (*commonpb.Status, error) {
|
func (qc *QueryCoord) ReleaseCollection(ctx context.Context, req *querypb.ReleaseCollectionRequest) (*commonpb.Status, error) {
|
||||||
//dbID := req.DbID
|
//dbID := req.DbID
|
||||||
collectionID := req.CollectionID
|
collectionID := req.CollectionID
|
||||||
log.Debug("releaseCollectionRequest received", zap.String("role", Params.RoleName), zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", collectionID))
|
log.Debug("releaseCollectionRequest received", zap.String("role", typeutil.QueryCoordRole),
|
||||||
|
zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", collectionID))
|
||||||
status := &commonpb.Status{
|
status := &commonpb.Status{
|
||||||
ErrorCode: commonpb.ErrorCode_Success,
|
ErrorCode: commonpb.ErrorCode_Success,
|
||||||
}
|
}
|
||||||
@ -230,7 +234,8 @@ func (qc *QueryCoord) ReleaseCollection(ctx context.Context, req *querypb.Releas
|
|||||||
return status, nil
|
return status, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Debug("releaseCollectionRequest completed", zap.String("role", Params.RoleName), zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", collectionID))
|
log.Debug("releaseCollectionRequest completed", zap.String("role", typeutil.QueryCoordRole),
|
||||||
|
zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", collectionID))
|
||||||
//qc.MetaReplica.printMeta()
|
//qc.MetaReplica.printMeta()
|
||||||
//qc.cluster.printMeta()
|
//qc.cluster.printMeta()
|
||||||
return status, nil
|
return status, nil
|
||||||
@ -304,7 +309,9 @@ func (qc *QueryCoord) ShowPartitions(ctx context.Context, req *querypb.ShowParti
|
|||||||
func (qc *QueryCoord) LoadPartitions(ctx context.Context, req *querypb.LoadPartitionsRequest) (*commonpb.Status, error) {
|
func (qc *QueryCoord) LoadPartitions(ctx context.Context, req *querypb.LoadPartitionsRequest) (*commonpb.Status, error) {
|
||||||
collectionID := req.CollectionID
|
collectionID := req.CollectionID
|
||||||
partitionIDs := req.PartitionIDs
|
partitionIDs := req.PartitionIDs
|
||||||
log.Debug("loadPartitionRequest received", zap.String("role", Params.RoleName), zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", partitionIDs))
|
log.Debug("loadPartitionRequest received", zap.String("role", typeutil.QueryCoordRole),
|
||||||
|
zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", collectionID),
|
||||||
|
zap.Int64s("partitionIDs", partitionIDs))
|
||||||
status := &commonpb.Status{
|
status := &commonpb.Status{
|
||||||
ErrorCode: commonpb.ErrorCode_Success,
|
ErrorCode: commonpb.ErrorCode_Success,
|
||||||
}
|
}
|
||||||
@ -320,7 +327,9 @@ func (qc *QueryCoord) LoadPartitions(ctx context.Context, req *querypb.LoadParti
|
|||||||
status.ErrorCode = commonpb.ErrorCode_UnexpectedError
|
status.ErrorCode = commonpb.ErrorCode_UnexpectedError
|
||||||
err := errors.New("partitionIDs are empty")
|
err := errors.New("partitionIDs are empty")
|
||||||
status.Reason = err.Error()
|
status.Reason = err.Error()
|
||||||
log.Debug("loadPartitionRequest completed", zap.String("role", Params.RoleName), zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", req.CollectionID), zap.Int64s("partitionIDs", partitionIDs))
|
log.Debug("loadPartitionRequest completed", zap.String("role", typeutil.QueryCoordRole),
|
||||||
|
zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", req.CollectionID),
|
||||||
|
zap.Int64s("partitionIDs", partitionIDs))
|
||||||
return status, nil
|
return status, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -345,7 +354,9 @@ func (qc *QueryCoord) LoadPartitions(ctx context.Context, req *querypb.LoadParti
|
|||||||
}
|
}
|
||||||
|
|
||||||
if len(partitionIDsToLoad) == 0 {
|
if len(partitionIDsToLoad) == 0 {
|
||||||
log.Debug("loadPartitionRequest completed", zap.String("role", Params.RoleName), zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", req.CollectionID), zap.Int64s("partitionIDs", partitionIDs))
|
log.Debug("loadPartitionRequest completed", zap.String("role", typeutil.QueryCoordRole),
|
||||||
|
zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", req.CollectionID),
|
||||||
|
zap.Int64s("partitionIDs", partitionIDs))
|
||||||
return status, nil
|
return status, nil
|
||||||
}
|
}
|
||||||
req.PartitionIDs = partitionIDsToLoad
|
req.PartitionIDs = partitionIDsToLoad
|
||||||
@ -372,11 +383,15 @@ func (qc *QueryCoord) LoadPartitions(ctx context.Context, req *querypb.LoadParti
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
status.ErrorCode = commonpb.ErrorCode_UnexpectedError
|
status.ErrorCode = commonpb.ErrorCode_UnexpectedError
|
||||||
status.Reason = err.Error()
|
status.Reason = err.Error()
|
||||||
log.Debug("loadPartitionRequest completed", zap.String("role", Params.RoleName), zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", req.CollectionID), zap.Int64s("partitionIDs", partitionIDs))
|
log.Debug("loadPartitionRequest completed", zap.String("role", typeutil.QueryCoordRole),
|
||||||
|
zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", req.CollectionID),
|
||||||
|
zap.Int64s("partitionIDs", partitionIDs))
|
||||||
return status, nil
|
return status, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Debug("loadPartitionRequest completed", zap.String("role", Params.RoleName), zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", req.CollectionID), zap.Int64s("partitionIDs", partitionIDs))
|
log.Debug("loadPartitionRequest completed", zap.String("role", typeutil.QueryCoordRole),
|
||||||
|
zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", req.CollectionID),
|
||||||
|
zap.Int64s("partitionIDs", partitionIDs))
|
||||||
return status, nil
|
return status, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -385,7 +400,9 @@ func (qc *QueryCoord) ReleasePartitions(ctx context.Context, req *querypb.Releas
|
|||||||
//dbID := req.DbID
|
//dbID := req.DbID
|
||||||
collectionID := req.CollectionID
|
collectionID := req.CollectionID
|
||||||
partitionIDs := req.PartitionIDs
|
partitionIDs := req.PartitionIDs
|
||||||
log.Debug("releasePartitionRequest received", zap.String("role", Params.RoleName), zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", req.CollectionID), zap.Int64s("partitionIDs", partitionIDs))
|
log.Debug("releasePartitionRequest received", zap.String("role", typeutil.QueryCoordRole),
|
||||||
|
zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", req.CollectionID),
|
||||||
|
zap.Int64s("partitionIDs", partitionIDs))
|
||||||
status := &commonpb.Status{
|
status := &commonpb.Status{
|
||||||
ErrorCode: commonpb.ErrorCode_Success,
|
ErrorCode: commonpb.ErrorCode_Success,
|
||||||
}
|
}
|
||||||
@ -407,7 +424,9 @@ func (qc *QueryCoord) ReleasePartitions(ctx context.Context, req *querypb.Releas
|
|||||||
status.ErrorCode = commonpb.ErrorCode_UnexpectedError
|
status.ErrorCode = commonpb.ErrorCode_UnexpectedError
|
||||||
err := errors.New("partitionIDs are empty")
|
err := errors.New("partitionIDs are empty")
|
||||||
status.Reason = err.Error()
|
status.Reason = err.Error()
|
||||||
log.Debug("releasePartitionsRequest completed", zap.String("role", Params.RoleName), zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", req.CollectionID), zap.Int64s("partitionIDs", partitionIDs))
|
log.Debug("releasePartitionsRequest completed", zap.String("role", typeutil.QueryCoordRole),
|
||||||
|
zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", req.CollectionID),
|
||||||
|
zap.Int64s("partitionIDs", partitionIDs))
|
||||||
return status, nil
|
return status, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -443,7 +462,9 @@ func (qc *QueryCoord) ReleasePartitions(ctx context.Context, req *querypb.Releas
|
|||||||
status.Reason = err.Error()
|
status.Reason = err.Error()
|
||||||
return status, nil
|
return status, nil
|
||||||
}
|
}
|
||||||
log.Debug("releasePartitionRequest completed", zap.String("role", Params.RoleName), zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", collectionID), zap.Int64s("partitionIDs", partitionIDs))
|
log.Debug("releasePartitionRequest completed", zap.String("role", typeutil.QueryCoordRole),
|
||||||
|
zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", collectionID),
|
||||||
|
zap.Int64s("partitionIDs", partitionIDs))
|
||||||
//qc.MetaReplica.printMeta()
|
//qc.MetaReplica.printMeta()
|
||||||
//qc.cluster.printMeta()
|
//qc.cluster.printMeta()
|
||||||
return status, nil
|
return status, nil
|
||||||
@ -563,7 +584,7 @@ func (qc *QueryCoord) GetSegmentInfo(ctx context.Context, req *querypb.GetSegmen
|
|||||||
// LoadBalance would do a load balancing operation between query nodes
|
// LoadBalance would do a load balancing operation between query nodes
|
||||||
func (qc *QueryCoord) LoadBalance(ctx context.Context, req *querypb.LoadBalanceRequest) (*commonpb.Status, error) {
|
func (qc *QueryCoord) LoadBalance(ctx context.Context, req *querypb.LoadBalanceRequest) (*commonpb.Status, error) {
|
||||||
log.Debug("LoadBalanceRequest received",
|
log.Debug("LoadBalanceRequest received",
|
||||||
zap.String("role", Params.RoleName),
|
zap.String("role", typeutil.QueryCoordRole),
|
||||||
zap.Int64("msgID", req.Base.MsgID),
|
zap.Int64("msgID", req.Base.MsgID),
|
||||||
zap.Any("req", req),
|
zap.Any("req", req),
|
||||||
)
|
)
|
||||||
@ -602,7 +623,7 @@ func (qc *QueryCoord) LoadBalance(ctx context.Context, req *querypb.LoadBalanceR
|
|||||||
return status, nil
|
return status, nil
|
||||||
}
|
}
|
||||||
log.Debug("LoadBalanceRequest completed",
|
log.Debug("LoadBalanceRequest completed",
|
||||||
zap.String("role", Params.RoleName),
|
zap.String("role", typeutil.QueryCoordRole),
|
||||||
zap.Int64("msgID", req.Base.MsgID),
|
zap.Int64("msgID", req.Base.MsgID),
|
||||||
zap.Any("req", req),
|
zap.Any("req", req),
|
||||||
)
|
)
|
||||||
|
Loading…
Reference in New Issue
Block a user