diff --git a/internal/rootcoord/root_coord.go b/internal/rootcoord/root_coord.go index f480c63b45..e00f98f490 100644 --- a/internal/rootcoord/root_coord.go +++ b/internal/rootcoord/root_coord.go @@ -181,9 +181,24 @@ func (c *Core) UpdateStateCode(code internalpb.StateCode) { c.stateCode.Store(code) } -func (c *Core) isHealthy() bool { +func (c *Core) checkHealthy() (internalpb.StateCode, bool) { code := c.stateCode.Load().(internalpb.StateCode) - return code == internalpb.StateCode_Healthy + ok := code == internalpb.StateCode_Healthy + return code, ok +} + +func failStatus(code commonpb.ErrorCode, reason string) *commonpb.Status { + return &commonpb.Status{ + ErrorCode: code, + Reason: reason, + } +} + +func succStatus() *commonpb.Status { + return &commonpb.Status{ + ErrorCode: commonpb.ErrorCode_Success, + Reason: "", + } } func (c *Core) checkInit() error { @@ -886,26 +901,26 @@ func (c *Core) Init() error { c.initOnce.Do(func() { connectEtcdFn := func() error { if c.etcdCli, initError = clientv3.New(clientv3.Config{Endpoints: Params.EtcdEndpoints, DialTimeout: 5 * time.Second}); initError != nil { - log.Error("RootCoord, Failed to new Etcd client", zap.Any("reason", initError)) + log.Error("RootCoord failed to new Etcd client", zap.Any("reason", initError)) return initError } if c.kvBase, initError = c.kvBaseCreate(Params.KvRootPath); initError != nil { - log.Error("RootCoord, Failed to new EtcdKV", zap.Any("reason", initError)) + log.Error("RootCoord failed to new EtcdKV", zap.Any("reason", initError)) return initError } var metaKV kv.TxnKV metaKV, initError = c.kvBaseCreate(Params.MetaRootPath) if initError != nil { - log.Error("RootCoord, Failed to new EtcdKV", zap.Any("reason", initError)) + log.Error("RootCoord failed to new EtcdKV", zap.Any("reason", initError)) return initError } var ss *suffixSnapshot if ss, initError = newSuffixSnapshot(metaKV, "_ts", Params.MetaRootPath, "snapshots"); initError != nil { - log.Error("RootCoord, Failed to new suffixSnapshot", zap.Error(initError)) + log.Error("RootCoord failed to new suffixSnapshot", zap.Error(initError)) return initError } if c.MetaTable, initError = NewMetaTable(metaKV, ss); initError != nil { - log.Error("RootCoord, Failed to new MetaTable", zap.Any("reason", initError)) + log.Error("RootCoord failed to new MetaTable", zap.Any("reason", initError)) return initError } @@ -1004,11 +1019,10 @@ func (c *Core) Init() error { return } }) - if initError == nil { - log.Debug(typeutil.RootCoordRole, zap.String("State Code", internalpb.StateCode_name[int32(internalpb.StateCode_Initializing)])) - } else { + if initError != nil { log.Debug("RootCoord init error", zap.Error(initError)) } + log.Debug("RootCoord init done") return initError } @@ -1227,14 +1241,11 @@ func (c *Core) GetStatisticsChannel(ctx context.Context) (*milvuspb.StringRespon // CreateCollection create collection func (c *Core) CreateCollection(ctx context.Context, in *milvuspb.CreateCollectionRequest) (*commonpb.Status, error) { metrics.RootCoordCreateCollectionCounter.WithLabelValues(metricProxy(in.Base.SourceID), MetricRequestsTotal).Inc() - code := c.stateCode.Load().(internalpb.StateCode) - if code != internalpb.StateCode_Healthy { - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: fmt.Sprintf("state code = %s", internalpb.StateCode_name[int32(code)]), - }, nil + if code, ok := c.checkHealthy(); !ok { + return failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+internalpb.StateCode_name[int32(code)]), nil } - log.Debug("CreateCollection ", zap.String("name", in.CollectionName), zap.Int64("msgID", in.Base.MsgID)) + + log.Debug("CreateCollection", zap.String("collection name", in.CollectionName), zap.Int64("msgID", in.Base.MsgID)) t := &CreateCollectionReqTask{ baseReqTask: baseReqTask{ ctx: ctx, @@ -1244,31 +1255,23 @@ func (c *Core) CreateCollection(ctx context.Context, in *milvuspb.CreateCollecti } err := executeTask(t) if err != nil { - log.Debug("CreateCollection failed", zap.String("name", in.CollectionName), zap.Int64("msgID", in.Base.MsgID), zap.Error(err)) - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: "Create collection failed: " + err.Error(), - }, nil + log.Error("CreateCollection failed", zap.String("collection name", in.CollectionName), zap.Int64("msgID", in.Base.MsgID), zap.Error(err)) + return failStatus(commonpb.ErrorCode_UnexpectedError, "Create collection failed: "+err.Error()), nil } - log.Debug("CreateCollection Success", zap.String("name", in.CollectionName), zap.Int64("msgID", in.Base.MsgID)) + log.Debug("CreateCollection success", zap.String("collection name", in.CollectionName), zap.Int64("msgID", in.Base.MsgID)) + metrics.RootCoordCreateCollectionCounter.WithLabelValues(metricProxy(in.Base.SourceID), MetricRequestsSuccess).Inc() - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - Reason: "", - }, nil + return succStatus(), nil } // DropCollection drop collection func (c *Core) DropCollection(ctx context.Context, in *milvuspb.DropCollectionRequest) (*commonpb.Status, error) { metrics.RootCoordDropCollectionCounter.WithLabelValues(metricProxy(in.Base.SourceID), MetricRequestsTotal).Inc() - code := c.stateCode.Load().(internalpb.StateCode) - if code != internalpb.StateCode_Healthy { - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: fmt.Sprintf("state code = %s", internalpb.StateCode_name[int32(code)]), - }, nil + if code, ok := c.checkHealthy(); !ok { + return failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+internalpb.StateCode_name[int32(code)]), nil } - log.Debug("DropCollection", zap.String("name", in.CollectionName), zap.Int64("msgID", in.Base.MsgID)) + + log.Debug("DropCollection", zap.String("collection name", in.CollectionName), zap.Int64("msgID", in.Base.MsgID)) t := &DropCollectionReqTask{ baseReqTask: baseReqTask{ ctx: ctx, @@ -1278,34 +1281,26 @@ func (c *Core) DropCollection(ctx context.Context, in *milvuspb.DropCollectionRe } err := executeTask(t) if err != nil { - log.Warn("DropCollection Failed", zap.String("name", in.CollectionName), zap.Int64("msgID", in.Base.MsgID), zap.Error(err)) - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: "Drop collection failed: " + err.Error(), - }, nil + log.Error("DropCollection failed", zap.String("collection name", in.CollectionName), zap.Int64("msgID", in.Base.MsgID), zap.Error(err)) + return failStatus(commonpb.ErrorCode_UnexpectedError, "Drop collection failed: "+err.Error()), nil } - log.Debug("DropCollection Succeeded", zap.String("name", in.CollectionName), zap.Int64("msgID", in.Base.MsgID)) + log.Debug("DropCollection success", zap.String("collection name", in.CollectionName), zap.Int64("msgID", in.Base.MsgID)) + metrics.RootCoordDropCollectionCounter.WithLabelValues(metricProxy(in.Base.SourceID), MetricRequestsSuccess).Inc() - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - Reason: "", - }, nil + return succStatus(), nil } // HasCollection check collection existence func (c *Core) HasCollection(ctx context.Context, in *milvuspb.HasCollectionRequest) (*milvuspb.BoolResponse, error) { metrics.RootCoordHasCollectionCounter.WithLabelValues(metricProxy(in.Base.SourceID), MetricRequestsTotal).Inc() - code := c.stateCode.Load().(internalpb.StateCode) - if code != internalpb.StateCode_Healthy { + if code, ok := c.checkHealthy(); !ok { return &milvuspb.BoolResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: fmt.Sprintf("state code = %s", internalpb.StateCode_name[int32(code)]), - }, - Value: false, + Status: failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+internalpb.StateCode_name[int32(code)]), + Value: false, }, nil } - log.Debug("HasCollection", zap.String("name", in.CollectionName), zap.Int64("msgID", in.Base.MsgID)) + + log.Debug("HasCollection", zap.String("collection name", in.CollectionName), zap.Int64("msgID", in.Base.MsgID)) t := &HasCollectionReqTask{ baseReqTask: baseReqTask{ ctx: ctx, @@ -1316,41 +1311,31 @@ func (c *Core) HasCollection(ctx context.Context, in *milvuspb.HasCollectionRequ } err := executeTask(t) if err != nil { - log.Debug("HasCollection Failed", zap.String("name", in.CollectionName), zap.Int64("msgID", in.Base.MsgID)) + log.Error("HasCollection failed", zap.String("collection name", in.CollectionName), zap.Int64("msgID", in.Base.MsgID), zap.Error(err)) return &milvuspb.BoolResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: "Has collection failed: " + err.Error(), - }, - Value: false, + Status: failStatus(commonpb.ErrorCode_UnexpectedError, "Has collection failed: "+err.Error()), + Value: false, }, nil } - log.Debug("HasCollection Succeeded", zap.String("name", in.CollectionName), zap.Int64("msgID", in.Base.MsgID)) + log.Debug("HasCollection success", zap.String("collection name", in.CollectionName), zap.Int64("msgID", in.Base.MsgID)) + metrics.RootCoordHasCollectionCounter.WithLabelValues(metricProxy(in.Base.SourceID), MetricRequestsSuccess).Inc() return &milvuspb.BoolResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - Reason: "", - }, - Value: t.HasCollection, + Status: succStatus(), + Value: t.HasCollection, }, nil } // DescribeCollection return collection info func (c *Core) DescribeCollection(ctx context.Context, in *milvuspb.DescribeCollectionRequest) (*milvuspb.DescribeCollectionResponse, error) { metrics.RootCoordDescribeCollectionCounter.WithLabelValues(metricProxy(in.Base.SourceID), MetricRequestsTotal).Inc() - code := c.stateCode.Load().(internalpb.StateCode) - if code != internalpb.StateCode_Healthy { + if code, ok := c.checkHealthy(); !ok { return &milvuspb.DescribeCollectionResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: fmt.Sprintf("state code = %s", internalpb.StateCode_name[int32(code)]), - }, - Schema: nil, - CollectionID: 0, + Status: failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode"+internalpb.StateCode_name[int32(code)]), }, nil } - log.Debug("DescribeCollection", zap.String("name", in.CollectionName), zap.Int64("msgID", in.Base.MsgID)) + + log.Debug("DescribeCollection", zap.String("collection name", in.CollectionName), zap.Int64("msgID", in.Base.MsgID)) t := &DescribeCollectionReqTask{ baseReqTask: baseReqTask{ ctx: ctx, @@ -1361,38 +1346,27 @@ func (c *Core) DescribeCollection(ctx context.Context, in *milvuspb.DescribeColl } err := executeTask(t) if err != nil { - log.Debug("DescribeCollection Failed", zap.String("name", in.CollectionName), zap.Error(err), zap.Int64("msgID", in.Base.MsgID)) + log.Error("DescribeCollection failed", zap.String("collection name", in.CollectionName), zap.Int64("msgID", in.Base.MsgID), zap.Error(err)) return &milvuspb.DescribeCollectionResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: "describe collection failed: " + err.Error(), - }, - Schema: nil, + Status: failStatus(commonpb.ErrorCode_UnexpectedError, "describe collection failed: "+err.Error()), }, nil } - log.Debug("DescribeCollection Succeeded", zap.String("name", in.CollectionName), zap.Int64("msgID", in.Base.MsgID)) + log.Debug("DescribeCollection success", zap.String("collection name", in.CollectionName), zap.Int64("msgID", in.Base.MsgID)) + metrics.RootCoordDescribeCollectionCounter.WithLabelValues(metricProxy(in.Base.SourceID), MetricRequestsSuccess).Inc() - t.Rsp.Status = &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - Reason: "", - } - // log.Debug("describe collection", zap.Any("schema", t.Rsp.Schema)) + t.Rsp.Status = succStatus() return t.Rsp, nil } // ShowCollections list all collection names func (c *Core) ShowCollections(ctx context.Context, in *milvuspb.ShowCollectionsRequest) (*milvuspb.ShowCollectionsResponse, error) { metrics.RootCoordShowCollectionsCounter.WithLabelValues(metricProxy(in.Base.SourceID), MetricRequestsTotal).Inc() - code := c.stateCode.Load().(internalpb.StateCode) - if code != internalpb.StateCode_Healthy { + if code, ok := c.checkHealthy(); !ok { return &milvuspb.ShowCollectionsResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: fmt.Sprintf("state code = %s", internalpb.StateCode_name[int32(code)]), - }, - CollectionNames: nil, + Status: failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+internalpb.StateCode_name[int32(code)]), }, nil } + log.Debug("ShowCollections", zap.String("dbname", in.DbName), zap.Int64("msgID", in.Base.MsgID)) t := &ShowCollectionReqTask{ baseReqTask: baseReqTask{ @@ -1400,43 +1374,32 @@ func (c *Core) ShowCollections(ctx context.Context, in *milvuspb.ShowCollections core: c, }, Req: in, - Rsp: &milvuspb.ShowCollectionsResponse{ - CollectionNames: nil, - CollectionIds: nil, - }, + Rsp: &milvuspb.ShowCollectionsResponse{}, } err := executeTask(t) if err != nil { - log.Warn("ShowCollections failed", zap.String("dbname", in.DbName), zap.Int64("msgID", in.Base.MsgID), - zap.Error(err)) + log.Error("ShowCollections failed", zap.String("dbname", in.DbName), zap.Int64("msgID", in.Base.MsgID), zap.Error(err)) return &milvuspb.ShowCollectionsResponse{ - CollectionNames: nil, - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: "ShowCollections failed: " + err.Error(), - }, + Status: failStatus(commonpb.ErrorCode_UnexpectedError, "ShowCollections failed: "+err.Error()), }, nil } - log.Debug("ShowCollections Succeeded", zap.String("dbname", in.DbName), zap.Strings("collection names", t.Rsp.CollectionNames), zap.Int64("msgID", in.Base.MsgID)) + log.Debug("ShowCollections success", zap.String("dbname", in.DbName), + zap.Int("num of collections", len(t.Rsp.CollectionNames)), zap.Int64("msgID", in.Base.MsgID)) + metrics.RootCoordShowCollectionsCounter.WithLabelValues(metricProxy(in.Base.SourceID), MetricRequestsSuccess).Inc() - t.Rsp.Status = &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - Reason: "", - } + t.Rsp.Status = succStatus() return t.Rsp, nil } // CreatePartition create partition func (c *Core) CreatePartition(ctx context.Context, in *milvuspb.CreatePartitionRequest) (*commonpb.Status, error) { metrics.RootCoordCreatePartitionCounter.WithLabelValues(metricProxy(in.Base.SourceID), MetricRequestsTotal).Inc() - code := c.stateCode.Load().(internalpb.StateCode) - if code != internalpb.StateCode_Healthy { - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: fmt.Sprintf("state code = %s", internalpb.StateCode_name[int32(code)]), - }, nil + if code, ok := c.checkHealthy(); !ok { + return failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+internalpb.StateCode_name[int32(code)]), nil } - log.Debug("CreatePartition", zap.String("collection name", in.CollectionName), zap.String("partition name", in.PartitionName), zap.Int64("msgID", in.Base.MsgID)) + + log.Debug("CreatePartition", zap.String("collection name", in.CollectionName), + zap.String("partition name", in.PartitionName), zap.Int64("msgID", in.Base.MsgID)) t := &CreatePartitionReqTask{ baseReqTask: baseReqTask{ ctx: ctx, @@ -1446,32 +1409,26 @@ func (c *Core) CreatePartition(ctx context.Context, in *milvuspb.CreatePartition } err := executeTask(t) if err != nil { - log.Warn("CreatePartition Failed", zap.String("collection name", in.CollectionName), zap.String("partition name", in.PartitionName), zap.Int64("msgID", in.Base.MsgID), - zap.Error(err)) - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: "create partition failed: " + err.Error(), - }, nil + log.Error("CreatePartition failed", zap.String("collection name", in.CollectionName), + zap.String("partition name", in.PartitionName), zap.Int64("msgID", in.Base.MsgID), zap.Error(err)) + return failStatus(commonpb.ErrorCode_UnexpectedError, "CreatePartition failed: "+err.Error()), nil } - log.Debug("CreatePartition Succeeded", zap.String("collection name", in.CollectionName), zap.String("partition name", in.PartitionName), zap.Int64("msgID", in.Base.MsgID)) + log.Debug("CreatePartition success", zap.String("collection name", in.CollectionName), + zap.String("partition name", in.PartitionName), zap.Int64("msgID", in.Base.MsgID)) + metrics.RootCoordCreatePartitionCounter.WithLabelValues(metricProxy(in.Base.SourceID), MetricRequestsSuccess).Inc() - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - Reason: "", - }, nil + return succStatus(), nil } // DropPartition drop partition func (c *Core) DropPartition(ctx context.Context, in *milvuspb.DropPartitionRequest) (*commonpb.Status, error) { metrics.RootCoordDropPartitionCounter.WithLabelValues(metricProxy(in.Base.SourceID), MetricRequestsTotal).Inc() - code := c.stateCode.Load().(internalpb.StateCode) - if code != internalpb.StateCode_Healthy { - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: fmt.Sprintf("state code = %s", internalpb.StateCode_name[int32(code)]), - }, nil + if code, ok := c.checkHealthy(); !ok { + return failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+internalpb.StateCode_name[int32(code)]), nil } - log.Debug("DropPartition", zap.String("collection name", in.CollectionName), zap.String("partition name", in.PartitionName), zap.Int64("msgID", in.Base.MsgID)) + + log.Debug("DropPartition", zap.String("collection name", in.CollectionName), + zap.String("partition name", in.PartitionName), zap.Int64("msgID", in.Base.MsgID)) t := &DropPartitionReqTask{ baseReqTask: baseReqTask{ ctx: ctx, @@ -1481,35 +1438,29 @@ func (c *Core) DropPartition(ctx context.Context, in *milvuspb.DropPartitionRequ } err := executeTask(t) if err != nil { - log.Warn("DropPartition Failed", zap.String("collection name", in.CollectionName), zap.String("partition name", in.PartitionName), zap.Int64("msgID", in.Base.MsgID), - zap.Error(err)) - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: "DropPartition failed: " + err.Error(), - }, nil + log.Error("DropPartition failed", zap.String("collection name", in.CollectionName), + zap.String("partition name", in.PartitionName), zap.Int64("msgID", in.Base.MsgID), zap.Error(err)) + return failStatus(commonpb.ErrorCode_UnexpectedError, "DropPartition failed: "+err.Error()), nil } - log.Debug("DropPartition Succeeded", zap.String("collection name", in.CollectionName), zap.String("partition name", in.PartitionName), zap.Int64("msgID", in.Base.MsgID)) + log.Debug("DropPartition success", zap.String("collection name", in.CollectionName), + zap.String("partition name", in.PartitionName), zap.Int64("msgID", in.Base.MsgID)) + metrics.RootCoordDropPartitionCounter.WithLabelValues(metricProxy(in.Base.SourceID), MetricRequestsSuccess).Inc() - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - Reason: "", - }, nil + return succStatus(), nil } // HasPartition check partition existence func (c *Core) HasPartition(ctx context.Context, in *milvuspb.HasPartitionRequest) (*milvuspb.BoolResponse, error) { metrics.RootCoordHasPartitionCounter.WithLabelValues(metricProxy(in.Base.SourceID), MetricRequestsTotal).Inc() - code := c.stateCode.Load().(internalpb.StateCode) - if code != internalpb.StateCode_Healthy { + if code, ok := c.checkHealthy(); !ok { return &milvuspb.BoolResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: fmt.Sprintf("state code = %s", internalpb.StateCode_name[int32(code)]), - }, - Value: false, + Status: failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+internalpb.StateCode_name[int32(code)]), + Value: false, }, nil } - log.Debug("HasPartition", zap.String("collection name", in.CollectionName), zap.String("partition name", in.PartitionName), zap.Int64("msgID", in.Base.MsgID)) + + log.Debug("HasPartition", zap.String("collection name", in.CollectionName), + zap.String("partition name", in.PartitionName), zap.Int64("msgID", in.Base.MsgID)) t := &HasPartitionReqTask{ baseReqTask: baseReqTask{ ctx: ctx, @@ -1520,87 +1471,65 @@ func (c *Core) HasPartition(ctx context.Context, in *milvuspb.HasPartitionReques } err := executeTask(t) if err != nil { - log.Debug("HasPartition Failed", zap.String("collection name", in.CollectionName), zap.String("partition name", in.PartitionName), zap.Int64("msgID", in.Base.MsgID)) + log.Error("HasPartition failed", zap.String("collection name", in.CollectionName), + zap.String("partition name", in.PartitionName), zap.Int64("msgID", in.Base.MsgID)) return &milvuspb.BoolResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: "HasPartition failed: " + err.Error(), - }, - Value: false, + Status: failStatus(commonpb.ErrorCode_UnexpectedError, "HasPartition failed: "+err.Error()), + Value: false, }, nil } - log.Debug("HasPartition Succeeded", zap.String("collection name", in.CollectionName), zap.String("partition name", in.PartitionName), zap.Int64("msgID", in.Base.MsgID)) + log.Debug("HasPartition success", zap.String("collection name", in.CollectionName), + zap.String("partition name", in.PartitionName), zap.Int64("msgID", in.Base.MsgID)) + metrics.RootCoordHasPartitionCounter.WithLabelValues(metricProxy(in.Base.SourceID), MetricRequestsSuccess).Inc() return &milvuspb.BoolResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - Reason: "", - }, - Value: t.HasPartition, + Status: succStatus(), + Value: t.HasPartition, }, nil } // ShowPartitions list all partition names func (c *Core) ShowPartitions(ctx context.Context, in *milvuspb.ShowPartitionsRequest) (*milvuspb.ShowPartitionsResponse, error) { metrics.RootCoordShowPartitionsCounter.WithLabelValues(metricProxy(in.Base.SourceID), MetricRequestsTotal).Inc() - log.Debug("ShowPartitionRequest received", zap.String("role", Params.RoleName), zap.Int64("msgID", in.Base.MsgID), - zap.String("collection", in.CollectionName)) - code := c.stateCode.Load().(internalpb.StateCode) - if code != internalpb.StateCode_Healthy { - log.Debug("ShowPartitionRequest failed: rootcoord is not healthy", zap.String("role", Params.RoleName), - zap.Int64("msgID", in.Base.MsgID), zap.String("state", internalpb.StateCode_name[int32(code)])) + if code, ok := c.checkHealthy(); !ok { return &milvuspb.ShowPartitionsResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: fmt.Sprintf("rootcoord is not healthy, state code = %s", internalpb.StateCode_name[int32(code)]), - }, - PartitionNames: nil, - PartitionIDs: nil, + Status: failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+internalpb.StateCode_name[int32(code)]), }, nil } + + log.Debug("ShowPartitions", zap.String("collection name", in.CollectionName), zap.Int64("msgID", in.Base.MsgID)) t := &ShowPartitionReqTask{ baseReqTask: baseReqTask{ ctx: ctx, core: c, }, Req: in, - Rsp: &milvuspb.ShowPartitionsResponse{ - PartitionNames: nil, - Status: nil, - }, + Rsp: &milvuspb.ShowPartitionsResponse{}, } err := executeTask(t) if err != nil { - log.Debug("ShowPartitionsRequest failed", zap.String("role", Params.RoleName), zap.Int64("msgID", in.Base.MsgID), zap.Error(err)) + log.Error("ShowPartitions failed", zap.String("collection name", in.CollectionName), zap.Int64("msgID", in.Base.MsgID), zap.Error(err)) return &milvuspb.ShowPartitionsResponse{ - PartitionNames: nil, - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: err.Error(), - }, + Status: failStatus(commonpb.ErrorCode_UnexpectedError, "ShowPartitions failed: "+err.Error()), }, nil } - log.Debug("ShowPartitions succeeded", zap.String("role", Params.RoleName), zap.Int64("msgID", t.Req.Base.MsgID), - zap.String("collection name", in.CollectionName), zap.Int("num of partitions", len(t.Rsp.PartitionNames))) + log.Debug("ShowPartitions success", zap.String("collection name", in.CollectionName), + zap.Int("num of partitions", len(t.Rsp.PartitionNames)), zap.Int64("msgID", t.Req.Base.MsgID)) + metrics.RootCoordShowPartitionsCounter.WithLabelValues(metricProxy(in.Base.SourceID), MetricRequestsSuccess).Inc() - t.Rsp.Status = &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - Reason: "", - } + t.Rsp.Status = succStatus() return t.Rsp, nil } // CreateIndex create index func (c *Core) CreateIndex(ctx context.Context, in *milvuspb.CreateIndexRequest) (*commonpb.Status, error) { metrics.RootCoordCreateIndexCounter.WithLabelValues(metricProxy(in.Base.SourceID), MetricRequestsTotal).Inc() - code := c.stateCode.Load().(internalpb.StateCode) - if code != internalpb.StateCode_Healthy { - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: fmt.Sprintf("state code = %s", internalpb.StateCode_name[int32(code)]), - }, nil + if code, ok := c.checkHealthy(); !ok { + return failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+internalpb.StateCode_name[int32(code)]), nil } - log.Debug("CreateIndex", zap.String("collection name", in.CollectionName), zap.String("field name", in.FieldName), zap.Int64("msgID", in.Base.MsgID)) + + log.Debug("CreateIndex", zap.String("collection name", in.CollectionName), + zap.String("field name", in.FieldName), zap.Int64("msgID", in.Base.MsgID)) t := &CreateIndexReqTask{ baseReqTask: baseReqTask{ ctx: ctx, @@ -1610,74 +1539,56 @@ func (c *Core) CreateIndex(ctx context.Context, in *milvuspb.CreateIndexRequest) } err := executeTask(t) if err != nil { - log.Debug("CreateIndex Failed", zap.String("collection name", in.CollectionName), - zap.String("field name", in.FieldName), zap.Int64("msgID", in.Base.MsgID), - zap.Error(err)) - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: "CreateIndex failed, error = " + err.Error(), - }, nil + log.Error("CreateIndex failed", zap.String("collection name", in.CollectionName), + zap.String("field name", in.FieldName), zap.Int64("msgID", in.Base.MsgID), zap.Error(err)) + return failStatus(commonpb.ErrorCode_UnexpectedError, "CreateIndex failed, error = "+err.Error()), nil } - log.Debug("CreateIndex Succeeded", zap.String("collection name", in.CollectionName), zap.String("field name", in.FieldName), zap.Int64("msgID", in.Base.MsgID)) + log.Debug("CreateIndex success", zap.String("collection name", in.CollectionName), + zap.String("field name", in.FieldName), zap.Int64("msgID", in.Base.MsgID)) + metrics.RootCoordCreateIndexCounter.WithLabelValues(metricProxy(in.Base.SourceID), MetricRequestsSuccess).Inc() - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - Reason: "", - }, nil + return succStatus(), nil } // DescribeIndex return index info func (c *Core) DescribeIndex(ctx context.Context, in *milvuspb.DescribeIndexRequest) (*milvuspb.DescribeIndexResponse, error) { metrics.RootCoordDescribeIndexCounter.WithLabelValues(metricProxy(in.Base.SourceID), MetricRequestsTotal).Inc() - code := c.stateCode.Load().(internalpb.StateCode) - if code != internalpb.StateCode_Healthy { + if code, ok := c.checkHealthy(); !ok { return &milvuspb.DescribeIndexResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: fmt.Sprintf("state code = %s", internalpb.StateCode_name[int32(code)]), - }, - IndexDescriptions: nil, + Status: failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+internalpb.StateCode_name[int32(code)]), }, nil } - log.Debug("DescribeIndex", zap.String("collection name", in.CollectionName), zap.String("field name", in.FieldName), zap.Int64("msgID", in.Base.MsgID)) + + log.Debug("DescribeIndex", zap.String("collection name", in.CollectionName), + zap.String("field name", in.FieldName), zap.Int64("msgID", in.Base.MsgID)) t := &DescribeIndexReqTask{ baseReqTask: baseReqTask{ ctx: ctx, core: c, }, Req: in, - Rsp: &milvuspb.DescribeIndexResponse{ - Status: nil, - IndexDescriptions: nil, - }, + Rsp: &milvuspb.DescribeIndexResponse{}, } err := executeTask(t) if err != nil { - log.Debug("DescribeIndex Failed", zap.String("collection name", in.CollectionName), zap.String("field name", in.FieldName), zap.Int64("msgID", in.Base.MsgID)) + log.Error("DescribeIndex failed", zap.String("collection name", in.CollectionName), + zap.String("field name", in.FieldName), zap.Int64("msgID", in.Base.MsgID), zap.Error(err)) return &milvuspb.DescribeIndexResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: "DescribeIndex failed, error = " + err.Error(), - }, - IndexDescriptions: nil, + Status: failStatus(commonpb.ErrorCode_UnexpectedError, "DescribeIndex failed, error = "+err.Error()), }, nil } idxNames := make([]string, 0, len(t.Rsp.IndexDescriptions)) for _, i := range t.Rsp.IndexDescriptions { idxNames = append(idxNames, i.IndexName) } - log.Debug("DescribeIndex Succeeded", zap.String("collection name", in.CollectionName), zap.String("field name", in.FieldName), zap.Strings("index names", idxNames), zap.Int64("msgID", in.Base.MsgID)) + log.Debug("DescribeIndex success", zap.String("collection name", in.CollectionName), + zap.String("field name", in.FieldName), zap.Strings("index names", idxNames), zap.Int64("msgID", in.Base.MsgID)) + metrics.RootCoordDescribeIndexCounter.WithLabelValues(metricProxy(in.Base.SourceID), MetricRequestsSuccess).Inc() if len(t.Rsp.IndexDescriptions) == 0 { - t.Rsp.Status = &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_IndexNotExist, - Reason: "index does not exist", - } + t.Rsp.Status = failStatus(commonpb.ErrorCode_IndexNotExist, "index does not exist") } else { - t.Rsp.Status = &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - Reason: "", - } + t.Rsp.Status = succStatus() } return t.Rsp, nil } @@ -1685,14 +1596,13 @@ func (c *Core) DescribeIndex(ctx context.Context, in *milvuspb.DescribeIndexRequ // DropIndex drop index func (c *Core) DropIndex(ctx context.Context, in *milvuspb.DropIndexRequest) (*commonpb.Status, error) { metrics.RootCoordDropIndexCounter.WithLabelValues(metricProxy(in.Base.SourceID), MetricRequestsTotal).Inc() - code := c.stateCode.Load().(internalpb.StateCode) - if code != internalpb.StateCode_Healthy { - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: fmt.Sprintf("state code = %s", internalpb.StateCode_name[int32(code)]), - }, nil + if code, ok := c.checkHealthy(); !ok { + return failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+internalpb.StateCode_name[int32(code)]), nil } - log.Debug("DropIndex", zap.String("collection name", in.CollectionName), zap.String("field name", in.FieldName), zap.String("index name", in.IndexName), zap.Int64("msgID", in.Base.MsgID)) + + log.Debug("DropIndex", zap.String("collection name", in.CollectionName), + zap.String("field name", in.FieldName), zap.String("index name", in.IndexName), + zap.Int64("msgID", in.Base.MsgID)) t := &DropIndexReqTask{ baseReqTask: baseReqTask{ ctx: ctx, @@ -1702,144 +1612,109 @@ func (c *Core) DropIndex(ctx context.Context, in *milvuspb.DropIndexRequest) (*c } err := executeTask(t) if err != nil { - log.Debug("DropIndex Failed", zap.String("collection name", in.CollectionName), zap.String("field name", in.FieldName), zap.String("index name", in.IndexName), zap.Int64("msgID", in.Base.MsgID)) - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: "DropIndex failed, error = " + err.Error(), - }, nil + log.Error("DropIndex failed", zap.String("collection name", in.CollectionName), + zap.String("field name", in.FieldName), zap.String("index name", in.IndexName), + zap.Int64("msgID", in.Base.MsgID), zap.Error(err)) + return failStatus(commonpb.ErrorCode_UnexpectedError, "DropIndex failed, error = "+err.Error()), nil } - log.Debug("DropIndex Succeeded", zap.String("collection name", in.CollectionName), zap.String("field name", in.FieldName), zap.String("index name", in.IndexName), zap.Int64("msgID", in.Base.MsgID)) + log.Debug("DropIndex success", zap.String("collection name", in.CollectionName), + zap.String("field name", in.FieldName), zap.String("index name", in.IndexName), + zap.Int64("msgID", in.Base.MsgID)) + metrics.RootCoordDropIndexCounter.WithLabelValues(metricProxy(in.Base.SourceID), MetricRequestsSuccess).Inc() - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - Reason: "", - }, nil + return succStatus(), nil } // DescribeSegment return segment info func (c *Core) DescribeSegment(ctx context.Context, in *milvuspb.DescribeSegmentRequest) (*milvuspb.DescribeSegmentResponse, error) { metrics.RootCoordDescribeSegmentCounter.WithLabelValues(metricProxy(in.Base.SourceID), MetricRequestsTotal).Inc() - code := c.stateCode.Load().(internalpb.StateCode) - if code != internalpb.StateCode_Healthy { + if code, ok := c.checkHealthy(); !ok { return &milvuspb.DescribeSegmentResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: fmt.Sprintf("state code = %s", internalpb.StateCode_name[int32(code)]), - }, - IndexID: 0, + Status: failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+internalpb.StateCode_name[int32(code)]), }, nil } - log.Debug("DescribeSegment", zap.Int64("collection id", in.CollectionID), zap.Int64("segment id", in.SegmentID), zap.Int64("msgID", in.Base.MsgID)) + + log.Debug("DescribeSegment", zap.Int64("collection id", in.CollectionID), + zap.Int64("segment id", in.SegmentID), zap.Int64("msgID", in.Base.MsgID)) t := &DescribeSegmentReqTask{ baseReqTask: baseReqTask{ ctx: ctx, core: c, }, Req: in, - Rsp: &milvuspb.DescribeSegmentResponse{ - Status: nil, - IndexID: 0, - }, + Rsp: &milvuspb.DescribeSegmentResponse{}, } err := executeTask(t) if err != nil { - log.Debug("DescribeSegment Failed", zap.Int64("collection id", in.CollectionID), zap.Int64("segment id", in.SegmentID), zap.Int64("msgID", in.Base.MsgID)) + log.Error("DescribeSegment failed", zap.Int64("collection id", in.CollectionID), + zap.Int64("segment id", in.SegmentID), zap.Int64("msgID", in.Base.MsgID), zap.Error(err)) return &milvuspb.DescribeSegmentResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: "DescribeSegment failed, error = " + err.Error(), - }, - IndexID: 0, + Status: failStatus(commonpb.ErrorCode_UnexpectedError, "DescribeSegment failed, error = "+err.Error()), }, nil } - log.Debug("DescribeSegment Succeeded", zap.Int64("collection id", in.CollectionID), zap.Int64("segment id", in.SegmentID), zap.Int64("msgID", in.Base.MsgID)) + log.Debug("DescribeSegment success", zap.Int64("collection id", in.CollectionID), + zap.Int64("segment id", in.SegmentID), zap.Int64("msgID", in.Base.MsgID)) + metrics.RootCoordDescribeSegmentCounter.WithLabelValues(metricProxy(in.Base.SourceID), MetricRequestsSuccess).Inc() - t.Rsp.Status = &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - Reason: "", - } + t.Rsp.Status = succStatus() return t.Rsp, nil } // ShowSegments list all segments func (c *Core) ShowSegments(ctx context.Context, in *milvuspb.ShowSegmentsRequest) (*milvuspb.ShowSegmentsResponse, error) { metrics.RootCoordShowSegmentsCounter.WithLabelValues(metricProxy(in.Base.SourceID), MetricRequestsTotal).Inc() - code := c.stateCode.Load().(internalpb.StateCode) - if code != internalpb.StateCode_Healthy { + if code, ok := c.checkHealthy(); !ok { return &milvuspb.ShowSegmentsResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: fmt.Sprintf("state code = %s", internalpb.StateCode_name[int32(code)]), - }, - SegmentIDs: nil, + Status: failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+internalpb.StateCode_name[int32(code)]), }, nil } - log.Debug("ShowSegments", zap.Int64("collection id", in.CollectionID), zap.Int64("partition id", in.PartitionID), zap.Int64("msgID", in.Base.MsgID)) + + log.Debug("ShowSegments", zap.Int64("collection id", in.CollectionID), + zap.Int64("partition id", in.PartitionID), zap.Int64("msgID", in.Base.MsgID)) t := &ShowSegmentReqTask{ baseReqTask: baseReqTask{ ctx: ctx, core: c, }, Req: in, - Rsp: &milvuspb.ShowSegmentsResponse{ - Status: nil, - SegmentIDs: nil, - }, + Rsp: &milvuspb.ShowSegmentsResponse{}, } err := executeTask(t) if err != nil { - log.Debug("ShowSegments Failed", zap.Int64("collection id", in.CollectionID), zap.Int64("partition id", in.PartitionID), zap.Int64("msgID", in.Base.MsgID)) + log.Debug("ShowSegments failed", zap.Int64("collection id", in.CollectionID), + zap.Int64("partition id", in.PartitionID), zap.Int64("msgID", in.Base.MsgID)) return &milvuspb.ShowSegmentsResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: "ShowSegments failed, error: " + err.Error(), - }, - SegmentIDs: nil, + Status: failStatus(commonpb.ErrorCode_UnexpectedError, "ShowSegments failed, error: "+err.Error()), }, nil } - log.Debug("ShowSegments Succeeded", zap.Int64("collection id", in.CollectionID), zap.Int64("partition id", in.PartitionID), zap.Int64s("segments ids", t.Rsp.SegmentIDs), zap.Int64("msgID", in.Base.MsgID)) + log.Debug("ShowSegments success", zap.Int64("collection id", in.CollectionID), + zap.Int64("partition id", in.PartitionID), zap.Int64s("segments ids", t.Rsp.SegmentIDs), + zap.Int64("msgID", in.Base.MsgID)) + metrics.RootCoordShowSegmentsCounter.WithLabelValues(metricProxy(in.Base.SourceID), MetricRequestsSuccess).Inc() - t.Rsp.Status = &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - Reason: "", - } + t.Rsp.Status = succStatus() return t.Rsp, nil } // AllocTimestamp alloc timestamp func (c *Core) AllocTimestamp(ctx context.Context, in *rootcoordpb.AllocTimestampRequest) (*rootcoordpb.AllocTimestampResponse, error) { - code := c.stateCode.Load().(internalpb.StateCode) - if code != internalpb.StateCode_Healthy { + if code, ok := c.checkHealthy(); !ok { return &rootcoordpb.AllocTimestampResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: fmt.Sprintf("state code = %s", internalpb.StateCode_name[int32(code)]), - }, - Timestamp: 0, - Count: 0, + Status: failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+internalpb.StateCode_name[int32(code)]), }, nil } ts, err := c.TSOAllocator(in.Count) if err != nil { - log.Debug("AllocTimestamp failed", zap.Int64("msgID", in.Base.MsgID), zap.Error(err)) + log.Error("AllocTimestamp failed", zap.Int64("msgID", in.Base.MsgID), zap.Error(err)) return &rootcoordpb.AllocTimestampResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: "AllocTimestamp failed: " + err.Error(), - }, - Timestamp: 0, - Count: 0, + Status: failStatus(commonpb.ErrorCode_UnexpectedError, "AllocTimestamp failed: "+err.Error()), }, nil } - // log.Printf("AllocTimestamp : %d", ts) //return first available time stamp ts = ts - uint64(in.Count) + 1 return &rootcoordpb.AllocTimestampResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - Reason: "", - }, + Status: succStatus(), Timestamp: ts, Count: in.Count, }, nil @@ -1847,107 +1722,68 @@ func (c *Core) AllocTimestamp(ctx context.Context, in *rootcoordpb.AllocTimestam // AllocID alloc ids func (c *Core) AllocID(ctx context.Context, in *rootcoordpb.AllocIDRequest) (*rootcoordpb.AllocIDResponse, error) { - code := c.stateCode.Load().(internalpb.StateCode) - if code != internalpb.StateCode_Healthy { + if code, ok := c.checkHealthy(); !ok { return &rootcoordpb.AllocIDResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: fmt.Sprintf("state code = %s", internalpb.StateCode_name[int32(code)]), - }, - ID: 0, - Count: 0, + Status: failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+internalpb.StateCode_name[int32(code)]), }, nil } start, _, err := c.IDAllocator(in.Count) if err != nil { - log.Debug("AllocID failed", zap.Int64("msgID", in.Base.MsgID), zap.Error(err)) + log.Error("AllocID failed", zap.Int64("msgID", in.Base.MsgID), zap.Error(err)) return &rootcoordpb.AllocIDResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: "AllocID failed: " + err.Error(), - }, - ID: 0, - Count: in.Count, + Status: failStatus(commonpb.ErrorCode_UnexpectedError, "AllocID failed: "+err.Error()), + Count: in.Count, }, nil } - log.Debug("AllocID", zap.Int64("id start", start), zap.Uint32("count", in.Count)) + log.Debug("AllocID success", zap.Int64("id start", start), zap.Uint32("count", in.Count)) + return &rootcoordpb.AllocIDResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - Reason: "", - }, - ID: start, - Count: in.Count, + Status: succStatus(), + ID: start, + Count: in.Count, }, nil } // UpdateChannelTimeTick used to handle ChannelTimeTickMsg func (c *Core) UpdateChannelTimeTick(ctx context.Context, in *internalpb.ChannelTimeTickMsg) (*commonpb.Status, error) { - code := c.stateCode.Load().(internalpb.StateCode) - if code != internalpb.StateCode_Healthy { - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: fmt.Sprintf("state code = %s", internalpb.StateCode_name[int32(code)]), - }, nil - } - status := &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - Reason: "", + if code, ok := c.checkHealthy(); !ok { + return failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+internalpb.StateCode_name[int32(code)]), nil } if in.Base.MsgType != commonpb.MsgType_TimeTick { - status.ErrorCode = commonpb.ErrorCode_UnexpectedError - status.Reason = fmt.Sprintf("UpdateChannelTimeTick receive invalid message %d", in.Base.GetMsgType()) - return status, nil + msgTypeName := commonpb.MsgType_name[int32(in.Base.GetMsgType())] + return failStatus(commonpb.ErrorCode_UnexpectedError, "invalid message type "+msgTypeName), nil } err := c.chanTimeTick.UpdateTimeTick(in, "gRPC") if err != nil { - status.ErrorCode = commonpb.ErrorCode_UnexpectedError - status.Reason = err.Error() - return status, nil + return failStatus(commonpb.ErrorCode_UnexpectedError, "UpdateTimeTick failed: "+err.Error()), nil } - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - Reason: "", - }, nil + return succStatus(), nil } // ReleaseDQLMessageStream release DQL msgstream func (c *Core) ReleaseDQLMessageStream(ctx context.Context, in *proxypb.ReleaseDQLMessageStreamRequest) (*commonpb.Status, error) { - code := c.stateCode.Load().(internalpb.StateCode) - if code != internalpb.StateCode_Healthy { - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: fmt.Sprintf("state code = %s", internalpb.StateCode_name[int32(code)]), - }, nil + if code, ok := c.checkHealthy(); !ok { + return failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+internalpb.StateCode_name[int32(code)]), nil } return c.proxyClientManager.ReleaseDQLMessageStream(ctx, in) } // SegmentFlushCompleted check whether segment flush has completed func (c *Core) SegmentFlushCompleted(ctx context.Context, in *datapb.SegmentFlushCompletedMsg) (*commonpb.Status, error) { - code := c.stateCode.Load().(internalpb.StateCode) - if code != internalpb.StateCode_Healthy { - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: fmt.Sprintf("state code = %s", internalpb.StateCode_name[int32(code)]), - }, nil + if code, ok := c.checkHealthy(); !ok { + return failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+internalpb.StateCode_name[int32(code)]), nil } if in.Base.MsgType != commonpb.MsgType_SegmentFlushDone { - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: fmt.Sprintf("SegmentFlushDone with incorrect msgtype = %s", commonpb.MsgType_name[int32(in.Base.MsgType)]), - }, nil + return failStatus(commonpb.ErrorCode_UnexpectedError, "SegmentFlushDone with incorrect msgtype "+commonpb.MsgType_name[int32(in.Base.MsgType)]), nil } segID := in.Segment.GetID() - log.Debug("flush segment", zap.Int64("id", segID)) + log.Debug("SegmentFlushCompleted", zap.Int64("collection id", in.Segment.CollectionID), + zap.Int64("partition id", in.Segment.PartitionID), zap.Int64("segment id", segID)) coll, err := c.MetaTable.GetCollectionByID(in.Segment.CollectionID, 0) if err != nil { - log.Warn("GetCollectionByID error", zap.Error(err)) - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: fmt.Sprintf("GetCollectionBySegmentID error = %v", err), - }, nil + log.Error("GetCollectionByID error", zap.Error(err)) + return failStatus(commonpb.ErrorCode_UnexpectedError, "GetCollectionByID failed: "+err.Error()), nil } if len(coll.FieldIndexes) == 0 { @@ -1988,20 +1824,17 @@ func (c *Core) SegmentFlushCompleted(ctx context.Context, in *datapb.SegmentFlus } } - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - Reason: "", - }, nil + log.Debug("SegmentFlushCompleted success", zap.Int64("collection id", in.Segment.CollectionID), + zap.Int64("partition id", in.Segment.PartitionID), zap.Int64("segment id", segID)) + return succStatus(), nil } // GetMetrics get metrics func (c *Core) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) (*milvuspb.GetMetricsResponse, error) { - log.Debug("RootCoord.GetMetrics", - zap.Int64("node_id", c.session.ServerID), - zap.String("req", req.Request)) + log.Debug("GetMetrics", zap.Int64("node_id", c.session.ServerID), zap.String("req", req.Request)) - if !c.isHealthy() { - log.Warn("RootCoord.GetMetrics failed", + if _, ok := c.checkHealthy(); !ok { + log.Warn("GetMetrics failed", zap.Int64("node_id", c.session.ServerID), zap.String("req", req.Request), zap.Error(errRootCoordIsUnhealthy(c.session.ServerID))) @@ -2017,34 +1850,29 @@ func (c *Core) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) metricType, err := metricsinfo.ParseMetricType(req.Request) if err != nil { - log.Warn("RootCoord.GetMetrics failed to parse metric type", + log.Error("GetMetrics failed to parse metric type", zap.Int64("node_id", c.session.ServerID), zap.String("req", req.Request), zap.Error(err)) return &milvuspb.GetMetricsResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: err.Error(), - }, + Status: failStatus(commonpb.ErrorCode_UnexpectedError, "ParseMetricType failed: "+err.Error()), Response: "", }, nil } - log.Debug("RootCoord.GetMetrics", - zap.String("metric_type", metricType)) + log.Debug("GetMetrics success", zap.String("metric_type", metricType)) if metricType == metricsinfo.SystemInfoMetrics { ret, err := c.metricsCacheManager.GetSystemInfoMetrics() if err == nil && ret != nil { return ret, nil } - log.Debug("failed to get system info metrics from cache, recompute instead", - zap.Error(err)) + log.Debug("failed to get system info metrics from cache, recompute instead", zap.Error(err)) systemInfoMetrics, err := c.getSystemInfoMetrics(ctx, req) - log.Debug("RootCoord.GetMetrics", + log.Debug("GetMetrics", zap.Int64("node_id", c.session.ServerID), zap.String("req", req.Request), zap.String("metric_type", metricType), @@ -2052,34 +1880,27 @@ func (c *Core) GetMetrics(ctx context.Context, req *milvuspb.GetMetricsRequest) zap.Error(err)) c.metricsCacheManager.UpdateSystemInfoMetrics(systemInfoMetrics) - return systemInfoMetrics, err } - log.Debug("RootCoord.GetMetrics failed, request metric type is not implemented yet", + log.Debug("GetMetrics failed, request metric type is not implemented yet", zap.Int64("node_id", c.session.ServerID), zap.String("req", req.Request), zap.String("metric_type", metricType)) return &milvuspb.GetMetricsResponse{ - Status: &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: metricsinfo.MsgUnimplementedMetric, - }, + Status: failStatus(commonpb.ErrorCode_UnexpectedError, metricsinfo.MsgUnimplementedMetric), Response: "", }, nil } // CreateAlias create collection alias func (c *Core) CreateAlias(ctx context.Context, in *milvuspb.CreateAliasRequest) (*commonpb.Status, error) { - code := c.stateCode.Load().(internalpb.StateCode) - if code != internalpb.StateCode_Healthy { - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: fmt.Sprintf("state code = %s", internalpb.StateCode_name[int32(code)]), - }, nil + if code, ok := c.checkHealthy(); !ok { + return failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+internalpb.StateCode_name[int32(code)]), nil } - log.Debug("CreateAlias ", zap.String("alias", in.Alias), zap.String("name", in.CollectionName)) + + log.Debug("CreateAlias", zap.String("alias", in.Alias), zap.String("collection name", in.CollectionName)) t := &CreateAliasReqTask{ baseReqTask: baseReqTask{ ctx: ctx, @@ -2089,29 +1910,21 @@ func (c *Core) CreateAlias(ctx context.Context, in *milvuspb.CreateAliasRequest) } err := executeTask(t) if err != nil { - log.Debug("CreateAlias failed", zap.String("alias", in.Alias), zap.String("name", in.CollectionName), zap.Error(err)) - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: "Create alias failed: " + err.Error(), - }, nil + log.Error("CreateAlias failed", zap.String("alias", in.Alias), zap.String("collection name", in.CollectionName), zap.Error(err)) + return failStatus(commonpb.ErrorCode_UnexpectedError, "Create alias failed: "+err.Error()), nil } - log.Debug("CreateAlias Success", zap.String("alias", in.Alias), zap.String("name", in.CollectionName)) - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - Reason: "", - }, nil + log.Debug("CreateAlias success", zap.String("alias", in.Alias), zap.String("collection name", in.CollectionName)) + + return succStatus(), nil } // DropAlias drop collection alias func (c *Core) DropAlias(ctx context.Context, in *milvuspb.DropAliasRequest) (*commonpb.Status, error) { - code := c.stateCode.Load().(internalpb.StateCode) - if code != internalpb.StateCode_Healthy { - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: fmt.Sprintf("state code = %s", internalpb.StateCode_name[int32(code)]), - }, nil + if code, ok := c.checkHealthy(); !ok { + return failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+internalpb.StateCode_name[int32(code)]), nil } - log.Debug("DropAlias ", zap.String("alias", in.Alias)) + + log.Debug("DropAlias", zap.String("alias", in.Alias)) t := &DropAliasReqTask{ baseReqTask: baseReqTask{ ctx: ctx, @@ -2121,29 +1934,21 @@ func (c *Core) DropAlias(ctx context.Context, in *milvuspb.DropAliasRequest) (*c } err := executeTask(t) if err != nil { - log.Debug("DropAlias failed", zap.String("alias", in.Alias), zap.Error(err)) - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: "Drop alias failed: " + err.Error(), - }, nil + log.Error("DropAlias failed", zap.String("alias", in.Alias), zap.Error(err)) + return failStatus(commonpb.ErrorCode_UnexpectedError, "DropAlias failed: "+err.Error()), nil } - log.Debug("DropAlias Success", zap.String("alias", in.Alias)) - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - Reason: "", - }, nil + log.Debug("DropAlias success", zap.String("alias", in.Alias)) + + return succStatus(), nil } // AlterAlias alter collection alias func (c *Core) AlterAlias(ctx context.Context, in *milvuspb.AlterAliasRequest) (*commonpb.Status, error) { - code := c.stateCode.Load().(internalpb.StateCode) - if code != internalpb.StateCode_Healthy { - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: fmt.Sprintf("state code = %s", internalpb.StateCode_name[int32(code)]), - }, nil + if code, ok := c.checkHealthy(); !ok { + return failStatus(commonpb.ErrorCode_UnexpectedError, "StateCode="+internalpb.StateCode_name[int32(code)]), nil } - log.Debug("AlterAlias ", zap.String("alias", in.Alias), zap.String("name", in.CollectionName)) + + log.Debug("AlterAlias", zap.String("alias", in.Alias), zap.String("collection name", in.CollectionName)) t := &AlterAliasReqTask{ baseReqTask: baseReqTask{ ctx: ctx, @@ -2153,15 +1958,10 @@ func (c *Core) AlterAlias(ctx context.Context, in *milvuspb.AlterAliasRequest) ( } err := executeTask(t) if err != nil { - log.Debug("AlterAlias failed", zap.String("alias", in.Alias), zap.String("name", in.CollectionName), zap.Error(err)) - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_UnexpectedError, - Reason: "Alter alias failed: " + err.Error(), - }, nil + log.Error("AlterAlias failed", zap.String("alias", in.Alias), zap.String("collection name", in.CollectionName), zap.Error(err)) + return failStatus(commonpb.ErrorCode_UnexpectedError, "AlterAlias failed: "+err.Error()), nil } - log.Debug("AlterAlias Success", zap.String("alias", in.Alias), zap.String("name", in.CollectionName)) - return &commonpb.Status{ - ErrorCode: commonpb.ErrorCode_Success, - Reason: "", - }, nil + log.Debug("AlterAlias success", zap.String("alias", in.Alias), zap.String("collection name", in.CollectionName)) + + return succStatus(), nil } diff --git a/tests/python_client/testcases/test_partition.py b/tests/python_client/testcases/test_partition.py index fc13c67caf..832ef33d84 100644 --- a/tests/python_client/testcases/test_partition.py +++ b/tests/python_client/testcases/test_partition.py @@ -71,7 +71,7 @@ class TestCreateBase: code = getattr(e, 'code', "The exception does not contain the field of code.") assert code == 1 message = getattr(e, 'message', "The exception does not contain the field of message.") - assert message == "create partition failed: partition name = %s already exists" % default_tag + assert message == "CreatePartition failed: partition name = %s already exists" % default_tag assert ut.compare_list_elements(connect.list_partitions(collection), [default_tag, '_default']) @pytest.mark.tags(CaseLabel.L2) @@ -88,7 +88,7 @@ class TestCreateBase: code = getattr(e, 'code', "The exception does not contain the field of code.") assert code == 1 message = getattr(e, 'message', "The exception does not contain the field of message.") - assert message == "create partition failed: can't find collection: %s" % collection_name + assert message == "CreatePartition failed: can't find collection: %s" % collection_name @pytest.mark.tags(CaseLabel.L0) def test_create_partition_name_name_none(self, connect, collection):