mirror of
https://gitee.com/milvus-io/milvus.git
synced 2024-12-01 19:39:21 +08:00
Log msgID and role when log collection
Signed-off-by: sunby <bingyi.sun@zilliz.com>
This commit is contained in:
parent
fc6472eb08
commit
7bb806b5f6
@ -1162,18 +1162,21 @@ func (c *Core) HasPartition(ctx context.Context, in *milvuspb.HasPartitionReques
|
||||
}
|
||||
|
||||
func (c *Core) ShowPartitions(ctx context.Context, in *milvuspb.ShowPartitionsRequest) (*milvuspb.ShowPartitionsResponse, error) {
|
||||
log.Debug("ShowPartitionRequest received", zap.String("role", Params.RoleName), zap.Int64("msgID", in.Base.MsgID),
|
||||
zap.String("collection", in.CollectionName))
|
||||
code := c.stateCode.Load().(internalpb.StateCode)
|
||||
if code != internalpb.StateCode_Healthy {
|
||||
log.Error("ShowPartitionRequest failed: master is not healthy", zap.String("role", Params.RoleName),
|
||||
zap.Int64("msgID", in.Base.MsgID), zap.String("state", internalpb.StateCode_name[int32(code)]))
|
||||
return &milvuspb.ShowPartitionsResponse{
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
Reason: fmt.Sprintf("state code = %s", internalpb.StateCode_name[int32(code)]),
|
||||
Reason: fmt.Sprintf("master is not healthy, state code = %s", internalpb.StateCode_name[int32(code)]),
|
||||
},
|
||||
PartitionNames: nil,
|
||||
PartitionIDs: nil,
|
||||
}, nil
|
||||
}
|
||||
log.Debug("ShowPartitions", zap.String("collection name", in.CollectionName))
|
||||
t := &ShowPartitionReqTask{
|
||||
baseReqTask: baseReqTask{
|
||||
cv: make(chan error),
|
||||
@ -1188,15 +1191,18 @@ func (c *Core) ShowPartitions(ctx context.Context, in *milvuspb.ShowPartitionsRe
|
||||
c.ddReqQueue <- t
|
||||
err := t.WaitToFinish()
|
||||
if err != nil {
|
||||
log.Error("ShowPartitionsRequest failed", zap.String("role", Params.RoleName), zap.Int64("msgID", in.Base.MsgID), zap.Error(err))
|
||||
return &milvuspb.ShowPartitionsResponse{
|
||||
PartitionNames: nil,
|
||||
Status: &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
Reason: "ShowPartitions failed: " + err.Error(),
|
||||
Reason: err.Error(),
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
log.Debug("ShowPartitions Success", zap.String("collection name", in.CollectionName), zap.Strings("partition names", t.Rsp.PartitionNames), zap.Int64s("partition ids", t.Rsp.PartitionIDs))
|
||||
log.Debug("ShowPartitions succeed", zap.String("role", Params.RoleName), zap.Int64("msgID", t.Req.Base.MsgID),
|
||||
zap.String("collection name", in.CollectionName), zap.Strings("partition names", t.Rsp.PartitionNames),
|
||||
zap.Int64s("partition ids", t.Rsp.PartitionIDs))
|
||||
t.Rsp.Status = &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
Reason: "",
|
||||
|
@ -37,6 +37,8 @@ type ParamTable struct {
|
||||
Timeout int
|
||||
|
||||
Log log.Config
|
||||
|
||||
RoleName string
|
||||
}
|
||||
|
||||
func (p *ParamTable) Init() {
|
||||
@ -68,6 +70,7 @@ func (p *ParamTable) Init() {
|
||||
p.initTimeout()
|
||||
|
||||
p.initLogCfg()
|
||||
p.initRoleName()
|
||||
})
|
||||
}
|
||||
|
||||
@ -209,3 +212,7 @@ func (p *ParamTable) initLogCfg() {
|
||||
p.Log.File.Filename = ""
|
||||
}
|
||||
}
|
||||
|
||||
func (p *ParamTable) initRoleName() {
|
||||
p.RoleName = fmt.Sprintf("%s-%d", "MasterService", p.NodeID)
|
||||
}
|
||||
|
@ -181,15 +181,17 @@ func (node *ProxyNode) LoadCollection(ctx context.Context, request *milvuspb.Loa
|
||||
Reason: err.Error(),
|
||||
}, nil
|
||||
}
|
||||
|
||||
log.Debug("LoadCollectionRequest received", zap.String("role", Params.RoleName), zap.Int64("msgID", lct.Base.MsgID), zap.String("collection", request.CollectionName))
|
||||
err = lct.WaitToFinish()
|
||||
if err != nil {
|
||||
log.Error("LoadCollectionTask failed", zap.String("role", Params.RoleName), zap.Int64("msgID", lct.Base.MsgID), zap.Error(err))
|
||||
return &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_UnexpectedError,
|
||||
Reason: err.Error(),
|
||||
}, nil
|
||||
}
|
||||
|
||||
log.Debug("LoadCollectionRequest completed", zap.String("role", Params.RoleName), zap.Int64("msgID", lct.Base.MsgID))
|
||||
return lct.result, nil
|
||||
}
|
||||
|
||||
|
@ -59,6 +59,7 @@ type ParamTable struct {
|
||||
|
||||
PulsarMaxMessageSize int
|
||||
Log log.Config
|
||||
RoleName string
|
||||
}
|
||||
|
||||
var Params ParamTable
|
||||
@ -158,6 +159,7 @@ func (pt *ParamTable) initParams() {
|
||||
pt.initDefaultIndexName()
|
||||
|
||||
pt.initPulsarMaxMessageSize()
|
||||
pt.initRoleName()
|
||||
}
|
||||
|
||||
func (pt *ParamTable) initPulsarAddress() {
|
||||
@ -478,3 +480,7 @@ func (pt *ParamTable) initLogCfg() {
|
||||
pt.Log.File.Filename = ""
|
||||
}
|
||||
}
|
||||
|
||||
func (pt *ParamTable) initRoleName() {
|
||||
pt.RoleName = fmt.Sprintf("%s-%d", "ProxyNode", pt.ProxyID)
|
||||
}
|
||||
|
@ -1942,6 +1942,7 @@ func (lct *LoadCollectionTask) OnEnqueue() error {
|
||||
}
|
||||
|
||||
func (lct *LoadCollectionTask) PreExecute(ctx context.Context) error {
|
||||
log.Debug("LoadCollectionTask PreExecute", zap.String("role", Params.RoleName), zap.Int64("msgID", lct.Base.MsgID))
|
||||
lct.Base.MsgType = commonpb.MsgType_LoadCollection
|
||||
lct.Base.SourceID = Params.ProxyID
|
||||
|
||||
@ -1955,6 +1956,7 @@ func (lct *LoadCollectionTask) PreExecute(ctx context.Context) error {
|
||||
}
|
||||
|
||||
func (lct *LoadCollectionTask) Execute(ctx context.Context) (err error) {
|
||||
log.Debug("LoadCollectionTask Execute", zap.String("role", Params.RoleName), zap.Int64("msgID", lct.Base.MsgID))
|
||||
collID, err := globalMetaCache.GetCollectionID(ctx, lct.CollectionName)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -1975,11 +1977,17 @@ func (lct *LoadCollectionTask) Execute(ctx context.Context) (err error) {
|
||||
CollectionID: collID,
|
||||
Schema: collSchema,
|
||||
}
|
||||
log.Debug("send LoadCollectionRequest to query service", zap.String("role", Params.RoleName), zap.Int64("msgID", request.Base.MsgID), zap.Int64("collectionID", request.CollectionID),
|
||||
zap.Any("schema", request.Schema))
|
||||
lct.result, err = lct.queryService.LoadCollection(ctx, request)
|
||||
return err
|
||||
if err != nil {
|
||||
return fmt.Errorf("call query service LoadCollection: %s", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (lct *LoadCollectionTask) PostExecute(ctx context.Context) error {
|
||||
log.Debug("LoadCollectionTask PostExecute", zap.String("role", Params.RoleName), zap.Int64("msgID", lct.Base.MsgID))
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -27,7 +27,8 @@ type ParamTable struct {
|
||||
// timetick
|
||||
TimeTickChannelName string
|
||||
|
||||
Log log.Config
|
||||
Log log.Config
|
||||
RoleName string
|
||||
}
|
||||
|
||||
var Params ParamTable
|
||||
@ -57,6 +58,7 @@ func (p *ParamTable) Init() {
|
||||
p.initStatsChannelName()
|
||||
p.initTimeTickChannelName()
|
||||
p.initQueryServiceAddress()
|
||||
p.initRoleName()
|
||||
})
|
||||
}
|
||||
|
||||
@ -123,3 +125,7 @@ func (p *ParamTable) initQueryServiceAddress() {
|
||||
}
|
||||
p.Address = url
|
||||
}
|
||||
|
||||
func (p *ParamTable) initRoleName() {
|
||||
p.RoleName = fmt.Sprintf("%s-%d", "QueryService", p.NodeID)
|
||||
}
|
||||
|
@ -207,6 +207,8 @@ func (qs *QueryService) ShowCollections(ctx context.Context, req *querypb.ShowCo
|
||||
}
|
||||
|
||||
func (qs *QueryService) LoadCollection(ctx context.Context, req *querypb.LoadCollectionRequest) (*commonpb.Status, error) {
|
||||
log.Debug("LoadCollectionRequest received", zap.String("role", Params.RoleName), zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", req.CollectionID),
|
||||
zap.Stringer("schema", req.Schema))
|
||||
dbID := req.DbID
|
||||
collectionID := req.CollectionID
|
||||
schema := req.Schema
|
||||
@ -222,7 +224,7 @@ func (qs *QueryService) LoadCollection(ctx context.Context, req *querypb.LoadCol
|
||||
}
|
||||
}
|
||||
|
||||
log.Debug("load collection start", zap.String("collectionID", fmt.Sprintln(collectionID)))
|
||||
log.Debug("load collection start", zap.String("role", Params.RoleName), zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", collectionID))
|
||||
|
||||
_, err := qs.replica.getCollectionByID(dbID, collectionID)
|
||||
if err != nil {
|
||||
@ -241,14 +243,16 @@ func (qs *QueryService) LoadCollection(ctx context.Context, req *querypb.LoadCol
|
||||
showPartitionRequest := &milvuspb.ShowPartitionsRequest{
|
||||
Base: &commonpb.MsgBase{
|
||||
MsgType: commonpb.MsgType_ShowPartitions,
|
||||
MsgID: req.Base.MsgID,
|
||||
},
|
||||
CollectionID: collectionID,
|
||||
}
|
||||
|
||||
showPartitionResponse, err := qs.masterServiceClient.ShowPartitions(ctx, showPartitionRequest)
|
||||
if err != nil {
|
||||
return fn(err), err
|
||||
return fn(err), fmt.Errorf("call master ShowPartitions: %s", err)
|
||||
}
|
||||
log.Debug("ShowPartitions returned from Master", zap.String("role", Params.RoleName), zap.Int64("msgID", showPartitionRequest.Base.MsgID))
|
||||
if showPartitionResponse.Status.ErrorCode != commonpb.ErrorCode_Success {
|
||||
return showPartitionResponse.Status, err
|
||||
}
|
||||
@ -273,8 +277,7 @@ func (qs *QueryService) LoadCollection(ctx context.Context, req *querypb.LoadCol
|
||||
}
|
||||
|
||||
if len(partitionIDsToLoad) == 0 {
|
||||
log.Debug("load collection end", zap.String("collectionID", fmt.Sprintln(collectionID)))
|
||||
|
||||
log.Debug("LoadCollectionRequest completed", zap.String("role", Params.RoleName), zap.Int64("msgID", req.Base.MsgID), zap.String("collectionID", fmt.Sprintln(collectionID)))
|
||||
return &commonpb.Status{
|
||||
Reason: "Partitions has been already loaded!",
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
@ -290,9 +293,13 @@ func (qs *QueryService) LoadCollection(ctx context.Context, req *querypb.LoadCol
|
||||
}
|
||||
|
||||
status, err := qs.LoadPartitions(ctx, loadPartitionsRequest)
|
||||
if err != nil {
|
||||
log.Error("LoadCollectionRequest failed", zap.String("role", Params.RoleName), zap.Int64("msgID", req.Base.MsgID), zap.Error(err))
|
||||
return status, fmt.Errorf("load partitions: %s", err)
|
||||
}
|
||||
log.Debug("LoadCollectionRequest completed", zap.String("role", Params.RoleName), zap.Int64("msgID", req.Base.MsgID))
|
||||
return status, nil
|
||||
|
||||
log.Debug("load collection end", zap.String("collectionID", fmt.Sprintln(collectionID)))
|
||||
return status, err
|
||||
}
|
||||
|
||||
func (qs *QueryService) ReleaseCollection(ctx context.Context, req *querypb.ReleaseCollectionRequest) (*commonpb.Status, error) {
|
||||
@ -356,6 +363,8 @@ func (qs *QueryService) ShowPartitions(ctx context.Context, req *querypb.ShowPar
|
||||
|
||||
func (qs *QueryService) LoadPartitions(ctx context.Context, req *querypb.LoadPartitionsRequest) (*commonpb.Status, error) {
|
||||
//TODO::suggest different partitions have different dm channel
|
||||
log.Debug("LoadPartitionRequest received", zap.Int64("msgID", req.Base.MsgID), zap.Int64("collectionID", req.CollectionID),
|
||||
zap.Stringer("schema", req.Schema))
|
||||
dbID := req.DbID
|
||||
collectionID := req.CollectionID
|
||||
partitionIDs := req.PartitionIDs
|
||||
@ -482,7 +491,7 @@ func (qs *QueryService) LoadPartitions(ctx context.Context, req *querypb.LoadPar
|
||||
qs.replica.updatePartitionState(dbID, collectionID, partitionID, querypb.PartitionState_InMemory)
|
||||
}
|
||||
|
||||
log.Debug("load partitions end", zap.String("partitionIDs", fmt.Sprintln(partitionIDs)))
|
||||
log.Debug("LoadPartitionRequest completed", zap.Int64("msgID", req.Base.MsgID), zap.Int64s("partitionIDs", partitionIDs))
|
||||
return &commonpb.Status{
|
||||
ErrorCode: commonpb.ErrorCode_Success,
|
||||
}, nil
|
||||
|
Loading…
Reference in New Issue
Block a user