Refactor healthz API (#19747)

Signed-off-by: Enwei Jiao <enwei.jiao@zilliz.com>

Signed-off-by: Enwei Jiao <enwei.jiao@zilliz.com>
This commit is contained in:
Enwei Jiao 2022-10-14 14:19:24 +08:00 committed by GitHub
parent a20d83bccd
commit 82dd1bba1c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 412 additions and 467 deletions

View File

@ -19,9 +19,11 @@ package components
import (
"context"
"github.com/milvus-io/milvus/api/commonpb"
"github.com/milvus-io/milvus/api/milvuspb"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/dependency"
"github.com/milvus-io/milvus/internal/util/typeutil"
grpcdatacoordclient "github.com/milvus-io/milvus/internal/distributed/datacoord"
)
@ -60,6 +62,14 @@ func (s *DataCoord) Stop() error {
}
// GetComponentStates returns DataCoord's states
func (s *DataCoord) GetComponentStates(ctx context.Context, request *milvuspb.GetComponentStatesRequest) (*milvuspb.ComponentStates, error) {
return s.svr.GetComponentStates(ctx, request)
func (s *DataCoord) Health(ctx context.Context) commonpb.StateCode {
resp, err := s.svr.GetComponentStates(ctx, &milvuspb.GetComponentStatesRequest{})
if err != nil {
return commonpb.StateCode_Abnormal
}
return resp.State.GetStateCode()
}
func (s *DataCoord) GetName() string {
return typeutil.DataCoordRole
}

View File

@ -19,10 +19,12 @@ package components
import (
"context"
"github.com/milvus-io/milvus/api/commonpb"
"github.com/milvus-io/milvus/api/milvuspb"
grpcdatanode "github.com/milvus-io/milvus/internal/distributed/datanode"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/dependency"
"github.com/milvus-io/milvus/internal/util/typeutil"
)
// DataNode implements DataNode grpc server
@ -62,6 +64,14 @@ func (d *DataNode) Stop() error {
}
// GetComponentStates returns DataNode's states
func (d *DataNode) GetComponentStates(ctx context.Context, request *milvuspb.GetComponentStatesRequest) (*milvuspb.ComponentStates, error) {
return d.svr.GetComponentStates(ctx, request)
func (d *DataNode) Health(ctx context.Context) commonpb.StateCode {
resp, err := d.svr.GetComponentStates(ctx, &milvuspb.GetComponentStatesRequest{})
if err != nil {
return commonpb.StateCode_Abnormal
}
return resp.State.GetStateCode()
}
func (d *DataNode) GetName() string {
return typeutil.DataNodeRole
}

View File

@ -19,11 +19,13 @@ package components
import (
"context"
"github.com/milvus-io/milvus/api/commonpb"
"github.com/milvus-io/milvus/api/milvuspb"
grpcindexcoord "github.com/milvus-io/milvus/internal/distributed/indexcoord"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/dependency"
"github.com/milvus-io/milvus/internal/util/typeutil"
)
// IndexCoord implements IndexCoord grpc server
@ -62,6 +64,14 @@ func (s *IndexCoord) Stop() error {
}
// GetComponentStates returns indexnode's states
func (s *IndexCoord) GetComponentStates(ctx context.Context, request *milvuspb.GetComponentStatesRequest) (*milvuspb.ComponentStates, error) {
return s.svr.GetComponentStates(ctx, request)
func (s *IndexCoord) Health(ctx context.Context) commonpb.StateCode {
resp, err := s.svr.GetComponentStates(ctx, &milvuspb.GetComponentStatesRequest{})
if err != nil {
return commonpb.StateCode_Abnormal
}
return resp.State.GetStateCode()
}
func (s *IndexCoord) GetName() string {
return typeutil.IndexCoordRole
}

View File

@ -19,10 +19,12 @@ package components
import (
"context"
"github.com/milvus-io/milvus/api/commonpb"
"github.com/milvus-io/milvus/api/milvuspb"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/dependency"
"github.com/milvus-io/milvus/internal/util/typeutil"
grpcindexnode "github.com/milvus-io/milvus/internal/distributed/indexnode"
)
@ -63,6 +65,14 @@ func (n *IndexNode) Stop() error {
}
// GetComponentStates returns IndexNode's states
func (n *IndexNode) GetComponentStates(ctx context.Context, request *milvuspb.GetComponentStatesRequest) (*milvuspb.ComponentStates, error) {
return n.svr.GetComponentStates(ctx, request)
func (n *IndexNode) Health(ctx context.Context) commonpb.StateCode {
resp, err := n.svr.GetComponentStates(ctx, &milvuspb.GetComponentStatesRequest{})
if err != nil {
return commonpb.StateCode_Abnormal
}
return resp.State.GetStateCode()
}
func (n *IndexNode) GetName() string {
return typeutil.IndexNodeRole
}

View File

@ -19,11 +19,13 @@ package components
import (
"context"
"github.com/milvus-io/milvus/api/commonpb"
"github.com/milvus-io/milvus/api/milvuspb"
grpcproxy "github.com/milvus-io/milvus/internal/distributed/proxy"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/dependency"
"github.com/milvus-io/milvus/internal/util/typeutil"
"go.uber.org/zap"
)
@ -65,6 +67,14 @@ func (n *Proxy) Stop() error {
}
// GetComponentStates returns Proxy's states
func (n *Proxy) GetComponentStates(ctx context.Context, request *milvuspb.GetComponentStatesRequest) (*milvuspb.ComponentStates, error) {
return n.svr.GetComponentStates(ctx, request)
func (n *Proxy) Health(ctx context.Context) commonpb.StateCode {
resp, err := n.svr.GetComponentStates(ctx, &milvuspb.GetComponentStatesRequest{})
if err != nil {
return commonpb.StateCode_Abnormal
}
return resp.State.GetStateCode()
}
func (n *Proxy) GetName() string {
return typeutil.ProxyRole
}

View File

@ -19,10 +19,12 @@ package components
import (
"context"
"github.com/milvus-io/milvus/api/commonpb"
"github.com/milvus-io/milvus/api/milvuspb"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/dependency"
"github.com/milvus-io/milvus/internal/util/typeutil"
grpcquerycoord "github.com/milvus-io/milvus/internal/distributed/querycoord"
)
@ -64,6 +66,14 @@ func (qs *QueryCoord) Stop() error {
}
// GetComponentStates returns QueryCoord's states
func (qs *QueryCoord) GetComponentStates(ctx context.Context, request *milvuspb.GetComponentStatesRequest) (*milvuspb.ComponentStates, error) {
return qs.svr.GetComponentStates(ctx, request)
func (qs *QueryCoord) Health(ctx context.Context) commonpb.StateCode {
resp, err := qs.svr.GetComponentStates(ctx, &milvuspb.GetComponentStatesRequest{})
if err != nil {
return commonpb.StateCode_Abnormal
}
return resp.State.GetStateCode()
}
func (qs *QueryCoord) GetName() string {
return typeutil.QueryCoordRole
}

View File

@ -19,10 +19,12 @@ package components
import (
"context"
"github.com/milvus-io/milvus/api/commonpb"
"github.com/milvus-io/milvus/api/milvuspb"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/dependency"
"github.com/milvus-io/milvus/internal/util/typeutil"
grpcquerynode "github.com/milvus-io/milvus/internal/distributed/querynode"
)
@ -65,6 +67,14 @@ func (q *QueryNode) Stop() error {
}
// GetComponentStates returns QueryNode's states
func (q *QueryNode) GetComponentStates(ctx context.Context, request *milvuspb.GetComponentStatesRequest) (*milvuspb.ComponentStates, error) {
return q.svr.GetComponentStates(ctx, request)
func (q *QueryNode) Health(ctx context.Context) commonpb.StateCode {
resp, err := q.svr.GetComponentStates(ctx, &milvuspb.GetComponentStatesRequest{})
if err != nil {
return commonpb.StateCode_Abnormal
}
return resp.State.GetStateCode()
}
func (q *QueryNode) GetName() string {
return typeutil.QueryNodeRole
}

View File

@ -20,10 +20,12 @@ import (
"context"
"io"
"github.com/milvus-io/milvus/api/commonpb"
"github.com/milvus-io/milvus/api/milvuspb"
rc "github.com/milvus-io/milvus/internal/distributed/rootcoord"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/dependency"
"github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/opentracing/opentracing-go"
"go.uber.org/zap"
@ -69,6 +71,14 @@ func (rc *RootCoord) Stop() error {
}
// GetComponentStates returns RootCoord's states
func (rc *RootCoord) GetComponentStates(ctx context.Context, request *milvuspb.GetComponentStatesRequest) (*milvuspb.ComponentStates, error) {
return rc.svr.GetComponentStates(ctx, request)
func (rc *RootCoord) Health(ctx context.Context) commonpb.StateCode {
resp, err := rc.svr.GetComponentStates(ctx, &milvuspb.GetComponentStatesRequest{})
if err != nil {
return commonpb.StateCode_Abnormal
}
return resp.State.GetStateCode()
}
func (rc *RootCoord) GetName() string {
return typeutil.RootCoordRole
}

View File

@ -70,7 +70,6 @@ func (c *run) execute(args []string, flags *flag.FlagSet) {
case typeutil.IndexNodeRole:
role.EnableIndexNode = true
case typeutil.StandaloneRole, typeutil.EmbeddedRole:
role.HasMultipleRoles = true
role.EnableRootCoord = true
role.EnableProxy = true
role.EnableQueryCoord = true
@ -81,7 +80,6 @@ func (c *run) execute(args []string, flags *flag.FlagSet) {
role.EnableIndexNode = true
local = true
case roleMixture:
role.HasMultipleRoles = true
role.EnableRootCoord = c.enableRootCoord
role.EnableQueryCoord = c.enableQueryCoord
role.EnableDataCoord = c.enableDataCoord

View File

@ -1,106 +0,0 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package roles
import (
"context"
"fmt"
"net/http"
"github.com/milvus-io/milvus/api/commonpb"
"github.com/milvus-io/milvus/api/milvuspb"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/healthz"
"go.uber.org/zap"
)
func unhealthyHandler(w http.ResponseWriter, r *http.Request, reason string) {
w.WriteHeader(http.StatusInternalServerError)
w.Header().Set(healthz.ContentTypeHeader, healthz.ContentTypeText)
_, err := fmt.Fprint(w, reason)
if err != nil {
log.Warn("failed to send response",
zap.Error(err))
}
}
func healthyHandler(w http.ResponseWriter, r *http.Request) {
var err error
w.WriteHeader(http.StatusOK)
w.Header().Set(healthz.ContentTypeHeader, healthz.ContentTypeText)
_, err = fmt.Fprint(w, "OK")
if err != nil {
log.Warn("failed to send response",
zap.Error(err))
}
}
// GetComponentStatesInterface defines the interface that get states from component.
type GetComponentStatesInterface interface {
// GetComponentStates returns the states of component.
GetComponentStates(ctx context.Context, request *milvuspb.GetComponentStatesRequest) (*milvuspb.ComponentStates, error)
}
type componentsHealthzHandler struct {
component GetComponentStatesInterface
}
func (handler *componentsHealthzHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
states, err := handler.component.GetComponentStates(context.Background(), &milvuspb.GetComponentStatesRequest{})
if err != nil {
log.Warn("failed to get component states", zap.Error(err))
unhealthyHandler(w, r, err.Error())
return
}
if states == nil {
log.Warn("failed to get component states, states is nil")
unhealthyHandler(w, r, "failed to get states")
return
}
if states.Status == nil {
log.Warn("failed to get component states, states.Status is nil")
unhealthyHandler(w, r, "failed to get status")
return
}
if states.Status.ErrorCode != commonpb.ErrorCode_Success {
log.Warn("failed to get component states",
zap.String("ErrorCode", states.Status.ErrorCode.String()),
zap.String("Reason", states.Status.Reason))
unhealthyHandler(w, r, states.Status.Reason)
return
}
if states.State == nil {
log.Warn("failed to get component states, states.State is nil")
unhealthyHandler(w, r, "failed to get state")
return
}
if states.State.StateCode != commonpb.StateCode_Healthy {
log.Warn("component is unhealthy", zap.String("state", states.State.StateCode.String()))
unhealthyHandler(w, r, fmt.Sprintf("state: %s", states.State.StateCode.String()))
return
}
healthyHandler(w, r)
}

View File

@ -18,18 +18,16 @@ package roles
import (
"context"
"fmt"
"net/http"
"os"
"os/signal"
"strings"
"sync"
"syscall"
"time"
"github.com/milvus-io/milvus/api/commonpb"
"github.com/milvus-io/milvus/api/milvuspb"
"github.com/milvus-io/milvus/internal/management"
rocksmqimpl "github.com/milvus-io/milvus/internal/mq/mqimpl/rocksmq/server"
"go.uber.org/zap"
"github.com/milvus-io/milvus/cmd/components"
"github.com/milvus-io/milvus/internal/datacoord"
@ -37,22 +35,19 @@ import (
"github.com/milvus-io/milvus/internal/indexcoord"
"github.com/milvus-io/milvus/internal/indexnode"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/management/healthz"
"github.com/milvus-io/milvus/internal/metrics"
rocksmqimpl "github.com/milvus-io/milvus/internal/mq/mqimpl/rocksmq/server"
"github.com/milvus-io/milvus/internal/proxy"
querycoord "github.com/milvus-io/milvus/internal/querycoordv2"
"github.com/milvus-io/milvus/internal/querynode"
"github.com/milvus-io/milvus/internal/rootcoord"
"github.com/milvus-io/milvus/internal/util/dependency"
"github.com/milvus-io/milvus/internal/util/etcd"
"github.com/milvus-io/milvus/internal/util/healthz"
"github.com/milvus-io/milvus/internal/util/metricsinfo"
"github.com/milvus-io/milvus/internal/util/paramtable"
"github.com/milvus-io/milvus/internal/util/trace"
"github.com/milvus-io/milvus/internal/util/typeutil"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
)
var Params paramtable.ComponentParam
@ -71,9 +66,50 @@ func stopRocksmq() {
rocksmqimpl.CloseRocksMQ()
}
type component interface {
healthz.Indicator
Run() error
Stop() error
}
func runComponent[T component](ctx context.Context,
localMsg bool,
params *paramtable.ComponentParam,
extraInit func(),
creator func(context.Context, dependency.Factory) (T, error),
metricRegister func(*prometheus.Registry)) T {
var role T
var wg sync.WaitGroup
wg.Add(1)
go func() {
params.InitOnce()
if extraInit != nil {
extraInit()
}
factory := dependency.NewFactory(localMsg)
var err error
role, err = creator(ctx, factory)
if localMsg {
params.SetLogConfig(typeutil.StandaloneRole)
} else {
params.SetLogConfig(role.GetName())
}
if err != nil {
panic(err)
}
wg.Done()
_ = role.Run()
}()
wg.Wait()
healthz.Register(role)
metricRegister(Registry)
return role
}
// MilvusRoles decides which components are brought up with Milvus.
type MilvusRoles struct {
HasMultipleRoles bool
EnableRootCoord bool `env:"ENABLE_ROOT_COORD"`
EnableProxy bool `env:"ENABLE_PROXY"`
EnableQueryCoord bool `env:"ENABLE_QUERY_COORD"`
@ -100,258 +136,55 @@ func (mr *MilvusRoles) printLDPreLoad() {
}
func (mr *MilvusRoles) runRootCoord(ctx context.Context, localMsg bool) *components.RootCoord {
var rc *components.RootCoord
var wg sync.WaitGroup
wg.Add(1)
go func() {
rootcoord.Params.InitOnce()
if localMsg {
rootcoord.Params.SetLogConfig(typeutil.StandaloneRole)
} else {
rootcoord.Params.SetLogConfig(typeutil.RootCoordRole)
}
factory := dependency.NewFactory(localMsg)
var err error
rc, err = components.NewRootCoord(ctx, factory)
if err != nil {
panic(err)
}
if !mr.HasMultipleRoles {
http.Handle(healthz.HealthzRouterPath, &componentsHealthzHandler{component: rc})
}
wg.Done()
_ = rc.Run()
}()
wg.Wait()
metrics.RegisterRootCoord(Registry)
return rc
return runComponent(ctx, localMsg, &rootcoord.Params, nil, components.NewRootCoord, metrics.RegisterRootCoord)
}
func (mr *MilvusRoles) runProxy(ctx context.Context, localMsg bool, alias string) *components.Proxy {
var pn *components.Proxy
var wg sync.WaitGroup
wg.Add(1)
go func() {
proxy.Params.ProxyCfg.InitAlias(alias)
proxy.Params.InitOnce()
if localMsg {
proxy.Params.SetLogConfig(typeutil.StandaloneRole)
} else {
proxy.Params.SetLogConfig(typeutil.ProxyRole)
}
factory := dependency.NewFactory(localMsg)
var err error
pn, err = components.NewProxy(ctx, factory)
if err != nil {
panic(err)
}
if !mr.HasMultipleRoles {
http.Handle(healthz.HealthzRouterPath, &componentsHealthzHandler{component: pn})
}
wg.Done()
_ = pn.Run()
}()
wg.Wait()
metrics.RegisterProxy(Registry)
return pn
return runComponent(ctx, localMsg, &proxy.Params,
func() {
proxy.Params.ProxyCfg.InitAlias(alias)
},
components.NewProxy,
metrics.RegisterProxy)
}
func (mr *MilvusRoles) runQueryCoord(ctx context.Context, localMsg bool) *components.QueryCoord {
var qs *components.QueryCoord
var wg sync.WaitGroup
wg.Add(1)
go func() {
querycoord.Params.InitOnce()
if localMsg {
querycoord.Params.SetLogConfig(typeutil.StandaloneRole)
} else {
querycoord.Params.SetLogConfig(typeutil.QueryCoordRole)
}
factory := dependency.NewFactory(localMsg)
var err error
qs, err = components.NewQueryCoord(ctx, factory)
if err != nil {
panic(err)
}
if !mr.HasMultipleRoles {
http.Handle(healthz.HealthzRouterPath, &componentsHealthzHandler{component: qs})
}
wg.Done()
_ = qs.Run()
}()
wg.Wait()
metrics.RegisterQueryCoord(Registry)
return qs
return runComponent(ctx, localMsg, querycoord.Params, nil, components.NewQueryCoord, metrics.RegisterQueryCoord)
}
func (mr *MilvusRoles) runQueryNode(ctx context.Context, localMsg bool, alias string) *components.QueryNode {
var qn *components.QueryNode
var wg sync.WaitGroup
wg.Add(1)
go func() {
querynode.Params.QueryNodeCfg.InitAlias(alias)
querynode.Params.InitOnce()
if localMsg {
querynode.Params.SetLogConfig(typeutil.StandaloneRole)
} else {
querynode.Params.SetLogConfig(typeutil.QueryNodeRole)
}
factory := dependency.NewFactory(localMsg)
var err error
qn, err = components.NewQueryNode(ctx, factory)
if err != nil {
panic(err)
}
if !mr.HasMultipleRoles {
http.Handle(healthz.HealthzRouterPath, &componentsHealthzHandler{component: qn})
}
wg.Done()
_ = qn.Run()
}()
wg.Wait()
metrics.RegisterQueryNode(Registry)
return qn
return runComponent(ctx, localMsg, &querynode.Params,
func() {
querynode.Params.QueryNodeCfg.InitAlias(alias)
},
components.NewQueryNode,
metrics.RegisterQueryNode)
}
func (mr *MilvusRoles) runDataCoord(ctx context.Context, localMsg bool) *components.DataCoord {
var ds *components.DataCoord
var wg sync.WaitGroup
wg.Add(1)
go func() {
datacoord.Params.InitOnce()
if localMsg {
datacoord.Params.SetLogConfig(typeutil.StandaloneRole)
} else {
datacoord.Params.SetLogConfig(typeutil.DataCoordRole)
}
factory := dependency.NewFactory(localMsg)
dctx := log.WithModule(ctx, "DataCoord")
var err error
ds, err = components.NewDataCoord(dctx, factory)
if err != nil {
panic(err)
}
if !mr.HasMultipleRoles {
http.Handle(healthz.HealthzRouterPath, &componentsHealthzHandler{component: ds})
}
wg.Done()
_ = ds.Run()
}()
wg.Wait()
metrics.RegisterDataCoord(Registry)
return ds
return runComponent(ctx, localMsg, &datacoord.Params, nil, components.NewDataCoord, metrics.RegisterDataCoord)
}
func (mr *MilvusRoles) runDataNode(ctx context.Context, localMsg bool, alias string) *components.DataNode {
var dn *components.DataNode
var wg sync.WaitGroup
wg.Add(1)
go func() {
datanode.Params.DataNodeCfg.InitAlias(alias)
datanode.Params.InitOnce()
if localMsg {
datanode.Params.SetLogConfig(typeutil.StandaloneRole)
} else {
datanode.Params.SetLogConfig(typeutil.DataNodeRole)
}
factory := dependency.NewFactory(localMsg)
var err error
dn, err = components.NewDataNode(ctx, factory)
if err != nil {
panic(err)
}
if !mr.HasMultipleRoles {
http.Handle(healthz.HealthzRouterPath, &componentsHealthzHandler{component: dn})
}
wg.Done()
_ = dn.Run()
}()
wg.Wait()
metrics.RegisterDataNode(Registry)
return dn
return runComponent(ctx, localMsg, &datanode.Params,
func() {
datanode.Params.DataNodeCfg.InitAlias(alias)
},
components.NewDataNode,
metrics.RegisterDataNode)
}
func (mr *MilvusRoles) runIndexCoord(ctx context.Context, localMsg bool) *components.IndexCoord {
var is *components.IndexCoord
var wg sync.WaitGroup
wg.Add(1)
go func() {
indexcoord.Params.InitOnce()
if localMsg {
indexcoord.Params.SetLogConfig(typeutil.StandaloneRole)
} else {
indexcoord.Params.SetLogConfig(typeutil.IndexCoordRole)
}
factory := dependency.NewFactory(localMsg)
var err error
is, err = components.NewIndexCoord(ctx, factory)
if err != nil {
panic(err)
}
if !mr.HasMultipleRoles {
http.Handle(healthz.HealthzRouterPath, &componentsHealthzHandler{component: is})
}
wg.Done()
_ = is.Run()
}()
wg.Wait()
metrics.RegisterIndexCoord(Registry)
return is
return runComponent(ctx, localMsg, &indexcoord.Params, nil, components.NewIndexCoord, metrics.RegisterIndexCoord)
}
func (mr *MilvusRoles) runIndexNode(ctx context.Context, localMsg bool, alias string) *components.IndexNode {
var in *components.IndexNode
var wg sync.WaitGroup
wg.Add(1)
go func() {
indexnode.Params.IndexNodeCfg.InitAlias(alias)
indexnode.Params.InitOnce()
if localMsg {
indexnode.Params.SetLogConfig(typeutil.StandaloneRole)
} else {
indexnode.Params.SetLogConfig(typeutil.IndexNodeRole)
}
factory := dependency.NewFactory(localMsg)
var err error
in, err = components.NewIndexNode(ctx, factory)
if err != nil {
panic(err)
}
if !mr.HasMultipleRoles {
http.Handle(healthz.HealthzRouterPath, &componentsHealthzHandler{component: in})
}
wg.Done()
_ = in.Run()
}()
wg.Wait()
metrics.RegisterIndexNode(Registry)
return in
return runComponent(ctx, localMsg, &indexnode.Params,
func() {
indexnode.Params.IndexNodeCfg.InitAlias(alias)
},
components.NewIndexNode,
metrics.RegisterIndexNode)
}
// Run Milvus components.
@ -462,61 +295,6 @@ func (mr *MilvusRoles) Run(local bool, alias string) {
}
}
if mr.HasMultipleRoles {
multiRoleHealthzHandler := func(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
req := &milvuspb.GetComponentStatesRequest{}
validateResp := func(resp *milvuspb.ComponentStates, err error) bool {
return err == nil && resp != nil && resp.GetState().GetStateCode() == commonpb.StateCode_Healthy
}
if mr.EnableRootCoord {
if rc == nil || !validateResp(rc.GetComponentStates(ctx, req)) {
rootCoordNotServingHandler(w, r)
return
}
}
if mr.EnableQueryCoord {
if qs == nil || !validateResp(qs.GetComponentStates(ctx, req)) {
queryCoordNotServingHandler(w, r)
return
}
}
if mr.EnableDataCoord {
if ds == nil || !validateResp(ds.GetComponentStates(ctx, req)) {
dataCoordNotServingHandler(w, r)
return
}
}
if mr.EnableIndexCoord {
if is == nil || !validateResp(is.GetComponentStates(ctx, req)) {
indexCoordNotServingHandler(w, r)
return
}
}
if mr.EnableProxy {
if pn == nil || !validateResp(pn.GetComponentStates(ctx, req)) {
proxyNotServingHandler(w, r)
return
}
}
// TODO(dragondriver): need to check node state?
w.WriteHeader(http.StatusOK)
w.Header().Set(healthz.ContentTypeHeader, healthz.ContentTypeText)
_, err := fmt.Fprint(w, "OK")
if err != nil {
log.Warn("Failed to send response",
zap.Error(err))
}
// TODO(dragondriver): handle component states
}
http.HandleFunc(healthz.HealthzRouterPath, multiRoleHealthzHandler)
}
metrics.Register(Registry)
management.ServeHTTP()
sc := make(chan os.Signal, 1)

29
go.mod
View File

@ -7,7 +7,6 @@ require (
github.com/BurntSushi/toml v1.0.0
github.com/DATA-DOG/go-sqlmock v1.5.0
github.com/HdrHistogram/hdrhistogram-go v1.0.1 // indirect
github.com/StackExchange/wmi v1.2.1 // indirect
github.com/antlr/antlr4/runtime/Go/antlr v0.0.0-20210826220005-b48c857c3a0e
github.com/antonmedv/expr v1.8.9
github.com/apache/arrow/go/v8 v8.0.0-20220322092137-778b1772fd20
@ -34,12 +33,11 @@ require (
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.11.0
github.com/sbinet/npyio v0.6.0
github.com/shirou/gopsutil v3.21.8+incompatible
github.com/spaolacci/murmur3 v1.1.0
github.com/spf13/cast v1.3.1
github.com/spf13/viper v1.8.1
github.com/streamnative/pulsarctl v0.5.0
github.com/stretchr/testify v1.7.4
github.com/stretchr/testify v1.8.0
github.com/tecbot/gorocksdb v0.0.0-20191217155057-f0fad39f321c
github.com/uber/jaeger-client-go v2.25.0+incompatible
go.etcd.io/etcd/api/v3 v3.5.0
@ -69,6 +67,13 @@ require github.com/sandertv/go-formula/v2 v2.0.0-alpha.7
require github.com/quasilyte/go-ruleguard/dsl v0.3.21 // indirect
require (
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
github.com/shirou/gopsutil/v3 v3.22.9
github.com/yusufpapurcu/wmi v1.2.2 // indirect
)
require (
github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 // indirect
github.com/AthenZ/athenz v1.10.15 // indirect
@ -98,7 +103,7 @@ require (
github.com/fsnotify/fsnotify v1.4.9 // indirect
github.com/ghodss/yaml v1.0.0 // indirect
github.com/gin-contrib/sse v0.1.0 // indirect
github.com/go-ole/go-ole v1.2.5 // indirect
github.com/go-ole/go-ole v1.2.6 // indirect
github.com/go-playground/locales v0.13.0 // indirect
github.com/go-playground/universal-translator v0.17.0 // indirect
github.com/go-playground/validator/v10 v10.4.1 // indirect
@ -155,8 +160,8 @@ require (
github.com/spf13/pflag v1.0.5 // indirect
github.com/stretchr/objx v0.4.0 // indirect
github.com/subosito/gotenv v1.2.0 // indirect
github.com/tklauser/go-sysconf v0.3.9 // indirect
github.com/tklauser/numcpus v0.3.0 // indirect
github.com/tklauser/go-sysconf v0.3.10 // indirect
github.com/tklauser/numcpus v0.4.0 // indirect
github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 // indirect
github.com/uber/jaeger-lib v2.4.0+incompatible // indirect
github.com/ugorji/go/codec v1.1.7 // indirect
@ -181,7 +186,7 @@ require (
golang.org/x/mod v0.6.0-dev.0.20211013180041-c96bc1413d57 // indirect
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd // indirect
golang.org/x/oauth2 v0.0.0-20210402161424-2e8d93401602 // indirect
golang.org/x/sys v0.0.0-20220422013727-9388b58f7150 // indirect
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a // indirect
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba // indirect
@ -201,10 +206,12 @@ replace (
github.com/bketelsen/crypt => github.com/bketelsen/crypt v0.0.4 // Fix security alert for core-os/etcd
github.com/dgrijalva/jwt-go => github.com/golang-jwt/jwt v3.2.2+incompatible // Fix security alert for jwt-go 3.2.0
github.com/go-kit/kit => github.com/go-kit/kit v0.1.0
)
replace (
// If you want to use the hook interceptor, the following code should be commented out
// and you should modify the api version to be the same as the `so` project.
github.com/milvus-io/milvus/api => ./api
github.com/streamnative/pulsarctl => github.com/xiaofan-luan/pulsarctl v0.5.1
github.com/tecbot/gorocksdb => github.com/milvus-io/gorocksdb v0.0.0-20220624081344-8c5f4212846b // indirect
)
// If you want to use the hook interceptor, the following code should be commented out
// and you should modify the api version to be the same as the `so` project.
replace github.com/milvus-io/milvus/api => ./api

39
go.sum
View File

@ -64,8 +64,6 @@ github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c/go.mod h1:X0
github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible h1:1G1pk05UrOh0NlF1oeaaix1x8XzrfjIDK47TY0Zehcw=
github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/StackExchange/wmi v1.2.1 h1:VIkavFPXSjcnS+O8yTq7NI32k0R5Aj+v39y29VYDOSA=
github.com/StackExchange/wmi v1.2.1/go.mod h1:rcmrprowKIVzvc+NUiLncP2uuArMWLCbu9SBzvHz7e8=
github.com/actgardner/gogen-avro/v10 v10.1.0/go.mod h1:o+ybmVjEa27AAr35FRqU98DJu1fXES56uXniYFv4yDA=
github.com/actgardner/gogen-avro/v10 v10.2.1/go.mod h1:QUhjeHPchheYmMDni/Nx7VB0RsT/ee8YIgGY/xpEQgQ=
github.com/actgardner/gogen-avro/v9 v9.1.0/go.mod h1:nyTj6wPqDJoxM3qdnjcLv+EnMDSDFqE0qDpva2QRmKc=
@ -239,8 +237,8 @@ github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A=
github.com/go-logr/logr v0.4.0/go.mod h1:z6/tIYblkpsD+a4lm/fGIIU9mZ+XfAiaFtq7xTgseGU=
github.com/go-ole/go-ole v1.2.5 h1:t4MGB5xEDZvXI+0rMjjsfBsD7yAgp/s9ZDkL1JndXwY=
github.com/go-ole/go-ole v1.2.5/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY=
github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
github.com/go-playground/assert/v2 v2.0.1 h1:MsBgLAaY856+nPRTKrp3/OZK38U/wa0CcBYNjji3q3A=
github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4=
github.com/go-playground/locales v0.13.0 h1:HyWk6mgj5qFqCT5fjGBuRArbVDfE4hi8+e8ceBS/t7Q=
@ -324,8 +322,9 @@ github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.7 h1:81/ik6ipDQS2aGcBfIN5dHDB36BwrStyeAQquSYCV4o=
github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0=
@ -473,6 +472,8 @@ github.com/linkedin/goavro/v2 v2.11.1 h1:4cuAtbDfqkKnBXp9E+tRkIJGa6W6iAjwonwt8O1
github.com/linkedin/goavro/v2 v2.11.1/go.mod h1:UgQUb2N/pmueQYH9bfqFioWxzYCZXSfF8Jw03O5sjqA=
github.com/lucasb-eyer/go-colorful v1.0.2/go.mod h1:0MS4r+7BZKSJ5mw4/S5MPN+qHFF1fYclkSPilDOKW0s=
github.com/lucasb-eyer/go-colorful v1.0.3/go.mod h1:R4dSotOR9KMtayYi1e77YzuveK+i7ruzyGqttikkLy0=
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ81pIr0yLvtUWk2if982qA3F3QD6H4=
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I=
github.com/magiconair/properties v1.8.1/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ=
github.com/magiconair/properties v1.8.5 h1:b6kJs+EmPFMYGkow9GiUyCyOvIwYetYJ3fSaWak/Gls=
github.com/magiconair/properties v1.8.5/go.mod h1:y3VJvCyxH9uVvJTWEGAELF3aiYNyPKd5NZ3oSwXrF60=
@ -567,6 +568,8 @@ github.com/pmezard/go-difflib v0.0.0-20151028094244-d8ed2627bdf0/go.mod h1:iKH77
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI=
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c h1:ncq/mPwQF4JjgDlrVEn3C11VoGHZN7m8qihwgMEtzYw=
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE=
github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
github.com/prometheus/client_golang v0.9.3/go.mod h1:/TN21ttK/J9q6uSwhBd54HahCDft0ttaMvbicHlPoso=
github.com/prometheus/client_golang v1.0.0/go.mod h1:db9x61etRT2tGnBNRi70OPL5FsnadC4Ky3P0J6CfImo=
@ -617,8 +620,8 @@ github.com/santhosh-tekuri/jsonschema/v5 v5.0.0/go.mod h1:FKdcjfQW6rpZSnxxUvEA5H
github.com/sbinet/npyio v0.6.0 h1:IyqqQIzRjDym9xnIXsToCKei/qCzxDP+Y74KoMlMgXo=
github.com/sbinet/npyio v0.6.0/go.mod h1:/q3BNr6dJOy+t6h7RZchTJ0nwRJO52mivaem29WE1j8=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
github.com/shirou/gopsutil v3.21.8+incompatible h1:sh0foI8tMRlCidUJR+KzqWYWxrkuuPIGiO6Vp+KXdCU=
github.com/shirou/gopsutil v3.21.8+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=
github.com/shirou/gopsutil/v3 v3.22.9 h1:yibtJhIVEMcdw+tCTbOPiF1VcsuDeTE4utJ8Dm4c5eA=
github.com/shirou/gopsutil/v3 v3.22.9/go.mod h1:bBYl1kjgEJpWpxeHmLI+dVHWtyAwfcmSBLDsp2TNT8A=
github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
@ -668,15 +671,15 @@ github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.4 h1:wZRexSlwd7ZXfKINDLsO4r7WBt3gTKONc6K/VesHvHM=
github.com/stretchr/testify v1.7.4/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk=
github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU=
github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s=
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
github.com/thoas/go-funk v0.9.1 h1:O549iLZqPpTUQ10ykd26sZhzD+rmR5pWhuElrhbC20M=
github.com/tklauser/go-sysconf v0.3.9 h1:JeUVdAOWhhxVcU6Eqr/ATFHgXk/mmiItdKeJPev3vTo=
github.com/tklauser/go-sysconf v0.3.9/go.mod h1:11DU/5sG7UexIrp/O6g35hrWzu0JxlwQ3LSFUzyeuhs=
github.com/tklauser/numcpus v0.3.0 h1:ILuRUQBtssgnxw0XXIjKUC56fgnOrFoQQ/4+DeU2biQ=
github.com/tklauser/numcpus v0.3.0/go.mod h1:yFGUr7TUHQRAhyqBcEg0Ge34zDBAsIvJJcyE6boqnA8=
github.com/tklauser/go-sysconf v0.3.10 h1:IJ1AZGZRWbY8T5Vfk04D9WOA5WSejdflXxP03OUqALw=
github.com/tklauser/go-sysconf v0.3.10/go.mod h1:C8XykCvCb+Gn0oNCWPIlcb0RuglQTYaQ2hGm7jmxEFk=
github.com/tklauser/numcpus v0.4.0 h1:E53Dm1HjH1/R2/aoCtXtPgzmElmn51aOkhCFSuZq//o=
github.com/tklauser/numcpus v0.4.0/go.mod h1:1+UI3pD8NW14VMwdgJNJ1ESk2UnwhAnz5hMwiKKqXCQ=
github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802 h1:uruHq4dN7GR16kFc5fp3d1RIYzJW5onx8Ybykw2YQFA=
github.com/tmc/grpc-websocket-proxy v0.0.0-20201229170055-e5319fda7802/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
@ -698,6 +701,8 @@ github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
github.com/yuin/goldmark v1.4.1/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
github.com/yusufpapurcu/wmi v1.2.2 h1:KBNDSne4vP5mbSWnJbO+51IMOXJB67QiYCSBrubbPRg=
github.com/yusufpapurcu/wmi v1.2.2/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0=
github.com/zeebo/xxh3 v1.0.1 h1:FMSRIbkrLikb/0hZxmltpg84VkqDAT5M8ufXynuhXsI=
github.com/zeebo/xxh3 v1.0.1/go.mod h1:8VHV24/3AZLn3b6Mlp/KuC33LWH687Wq6EnziEB+rsA=
go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
@ -953,6 +958,7 @@ golang.org/x/sys v0.0.0-20200923182605-d9f96fdee20d/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201201145000-ef89a241ccb3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201204225414-ed752295db88/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210104204734-6f8348627aad/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@ -967,14 +973,13 @@ golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210816074244-15123e1e1f71/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210819135213-f52c844e1c1c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211019181941-9d821ace8654/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220128215802-99c3d69c2c27/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220422013727-9388b58f7150 h1:xHms4gcpe1YE7A3yIllJXP16CMAGuqwO2lX1mTyyRRc=
golang.org/x/sys v0.0.0-20220422013727-9388b58f7150/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a h1:dGzPydgVsqGcTRVwiLJ1jVbufYwmzD3LfVPLKsKg+0k=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 h1:JGgROgKl9N8DuW20oFS5gxc+lE67/N3FcwmBPMe7ArY=
@ -1188,8 +1193,6 @@ google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp0
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.28.0 h1:w43yiav+6bVFTBQFZX0r7ipe9JQ1QsbMgHwbBziscLw=
google.golang.org/protobuf v1.28.0 h1:w43yiav+6bVFTBQFZX0r7ipe9JQ1QsbMgHwbBziscLw=
google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/avro.v0 v0.0.0-20171217001914-a730b5802183/go.mod h1:FvqrFXt+jCsyQibeRv4xxEJBL5iG2DDW5aeJwzDiq4A=

View File

@ -21,4 +21,6 @@ const (
ContentTypeHeader = "Content-Type"
// ContentTypeText is the health check request type text.
ContentTypeText = "text/plain"
// ContentTypeJSON is another health check request type text, which response contains more info.
ContentTypeJSON = "application/json"
)

View File

@ -14,21 +14,20 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package roles
package healthz
import (
"fmt"
"net/http"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/util/healthz"
"github.com/milvus-io/milvus/internal/util/milvuserrors"
"go.uber.org/zap"
)
func componentsNotServingHandler(w http.ResponseWriter, r *http.Request, msg string) {
w.WriteHeader(http.StatusInternalServerError)
w.Header().Set(healthz.ContentTypeHeader, healthz.ContentTypeText)
w.Header().Set(ContentTypeHeader, ContentTypeText)
_, err := fmt.Fprint(w, msg)
if err != nil {
log.Warn("failed to send response",

View File

@ -0,0 +1,113 @@
// Licensed to the LF AI & Data foundation under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package healthz
import (
"context"
"encoding/json"
"fmt"
"net/http"
"github.com/milvus-io/milvus/api/commonpb"
"github.com/milvus-io/milvus/api/milvuspb"
"github.com/milvus-io/milvus/internal/log"
"go.uber.org/zap"
)
// GetComponentStatesInterface defines the interface that get states from component.
type GetComponentStatesInterface interface {
// GetComponentStates returns the states of component.
GetComponentStates(ctx context.Context, request *milvuspb.GetComponentStatesRequest) (*milvuspb.ComponentStates, error)
}
type Indicator interface {
GetName() string
Health(ctx context.Context) commonpb.StateCode
}
type IndicatorState struct {
Name string `json:"name"`
Code commonpb.StateCode `json:"code"`
}
type HealthResponse struct {
State string `json:"state"`
Detail []*IndicatorState `json:"detail"`
}
type HealthHandler struct {
indicators []Indicator
}
var _ http.Handler = (*HealthHandler)(nil)
var defaultHandler = HealthHandler{}
func Register(indicator Indicator) {
defaultHandler.indicators = append(defaultHandler.indicators, indicator)
}
func Handler() *HealthHandler {
return &defaultHandler
}
func (handler *HealthHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
resp := &HealthResponse{
State: "OK",
}
ctx := context.Background()
for _, in := range handler.indicators {
code := in.Health(ctx)
resp.Detail = append(resp.Detail, &IndicatorState{
Name: in.GetName(),
Code: code,
})
if code != commonpb.StateCode_Healthy {
resp.State = fmt.Sprintf("component %s state is %s", in.GetName(), code.String())
}
}
if resp.State == "OK" {
w.WriteHeader(http.StatusOK)
} else {
w.WriteHeader(http.StatusInternalServerError)
}
// for compatibility
if r.Header.Get(ContentTypeHeader) != ContentTypeJSON {
writeText(w, r, resp.State)
return
}
writeJSON(w, r, resp)
}
func writeJSON(w http.ResponseWriter, r *http.Request, resp *HealthResponse) {
w.Header().Set(ContentTypeHeader, ContentTypeJSON)
bs, err := json.Marshal(resp)
if err != nil {
log.Warn("faild to send response", zap.Error(err))
}
w.Write(bs)
}
func writeText(w http.ResponseWriter, r *http.Request, reason string) {
w.Header().Set(ContentTypeHeader, ContentTypeText)
_, err := fmt.Fprint(w, reason)
if err != nil {
log.Warn("failed to send response",
zap.Error(err))
}
}

View File

@ -14,7 +14,10 @@
// See the License for the specific language governing permissions and
// limitations under the License.
package healthz
package management
// HealthzRouterPath is default path for check health state.
const HealthzRouterPath = "/healthz"
// LogLevelRouterPath is path for Get and Update log level at runtime.
const LogLevelRouterPath = "/log/level"

View File

@ -23,6 +23,7 @@ import (
"strconv"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/management/healthz"
"go.uber.org/zap"
)
@ -39,11 +40,15 @@ type HTTPHandler struct {
func registerDefaults() {
Register(&HTTPHandler{
Path: "/log/level",
Path: LogLevelRouterPath,
HandlerFunc: func(w http.ResponseWriter, req *http.Request) {
log.Level().ServeHTTP(w, req)
},
})
Register(&HTTPHandler{
Path: HealthzRouterPath,
Handler: healthz.Handler(),
})
}
func Register(h *HTTPHandler) {

View File

@ -18,14 +18,18 @@ package management
import (
"bytes"
"context"
"encoding/json"
"io/ioutil"
"net/http"
"net/http/httptest"
"testing"
"github.com/milvus-io/milvus/api/commonpb"
"github.com/milvus-io/milvus/internal/log"
"github.com/milvus-io/milvus/internal/management/healthz"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/suite"
"go.uber.org/zap"
)
@ -36,34 +40,45 @@ func TestGetHTTPAddr(t *testing.T) {
assert.Equal(t, getHTTPAddr(), ":"+testPort)
}
func TestDefaultLogHandler(t *testing.T) {
httpServer := httptest.NewServer(nil)
defer httpServer.Close()
type HTTPServerTestSuite struct {
suite.Suite
server *httptest.Server
}
func (suite *HTTPServerTestSuite) SetupSuite() {
suite.server = httptest.NewServer(nil)
registerDefaults()
}
func (suite *HTTPServerTestSuite) TearDownSuite() {
defer suite.server.Close()
}
func (suite *HTTPServerTestSuite) TestDefaultLogHandler() {
log.SetLevel(zap.DebugLevel)
assert.Equal(t, zap.DebugLevel, log.GetLevel())
suite.Equal(zap.DebugLevel, log.GetLevel())
// replace global logger, log change will not be affected.
conf := &log.Config{Level: "info", File: log.FileLogConfig{}, DisableTimestamp: true}
logger, p, _ := log.InitLogger(conf)
log.ReplaceGlobals(logger, p)
assert.Equal(t, zap.InfoLevel, log.GetLevel())
suite.Equal(zap.InfoLevel, log.GetLevel())
// change log level through http
payload, err := json.Marshal(map[string]interface{}{"level": "error"})
payload, err := json.Marshal(map[string]any{"level": "error"})
if err != nil {
log.Fatal(err.Error())
}
url := httpServer.URL + "/log/level"
url := suite.server.URL + "/log/level"
req, err := http.NewRequest(http.MethodPut, url, bytes.NewBuffer(payload))
req.Header.Set("Content-Type", "application/json")
if err != nil {
log.Fatal(err.Error())
}
client := httpServer.Client()
client := suite.server.Client()
resp, err := client.Do(req)
if err != nil {
log.Fatal(err.Error())
@ -74,6 +89,54 @@ func TestDefaultLogHandler(t *testing.T) {
if err != nil {
log.Fatal(err.Error())
}
assert.Equal(t, "{\"level\":\"error\"}\n", string(body))
assert.Equal(t, zap.ErrorLevel, log.GetLevel())
suite.Equal("{\"level\":\"error\"}\n", string(body))
suite.Equal(zap.ErrorLevel, log.GetLevel())
}
func (suite *HTTPServerTestSuite) TestHealthzHandler() {
url := suite.server.URL + "/healthz"
client := suite.server.Client()
healthz.Register(&MockIndicator{"m1", commonpb.StateCode_Healthy})
req, _ := http.NewRequest(http.MethodGet, url, nil)
resp, err := client.Do(req)
suite.Nil(err)
defer resp.Body.Close()
body, _ := ioutil.ReadAll(resp.Body)
suite.Equal("OK", string(body))
req, _ = http.NewRequest(http.MethodGet, url, nil)
req.Header.Set("Content-Type", "application/json")
resp, err = client.Do(req)
suite.Nil(err)
defer resp.Body.Close()
body, _ = ioutil.ReadAll(resp.Body)
suite.Equal("{\"state\":\"OK\",\"detail\":[{\"name\":\"m1\",\"code\":1}]}", string(body))
healthz.Register(&MockIndicator{"m2", commonpb.StateCode_Abnormal})
req, _ = http.NewRequest(http.MethodGet, url, nil)
req.Header.Set("Content-Type", "application/json")
resp, err = client.Do(req)
suite.Nil(err)
defer resp.Body.Close()
body, _ = ioutil.ReadAll(resp.Body)
suite.Equal("{\"state\":\"component m2 state is Abnormal\",\"detail\":[{\"name\":\"m1\",\"code\":1},{\"name\":\"m2\",\"code\":2}]}", string(body))
}
func TestHTTPServerSuite(t *testing.T) {
suite.Run(t, new(HTTPServerTestSuite))
}
type MockIndicator struct {
name string
code commonpb.StateCode
}
func (m *MockIndicator) Health(ctx context.Context) commonpb.StateCode {
return m.code
}
func (m *MockIndicator) GetName() string {
return m.name
}

View File

@ -14,8 +14,8 @@ package metricsinfo
import (
"sync"
"github.com/shirou/gopsutil/cpu"
"github.com/shirou/gopsutil/mem"
"github.com/shirou/gopsutil/v3/cpu"
"github.com/shirou/gopsutil/v3/mem"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/log"

View File

@ -21,7 +21,7 @@ import (
"sync/atomic"
"time"
"github.com/shirou/gopsutil/disk"
"github.com/shirou/gopsutil/v3/disk"
"go.uber.org/zap"
"github.com/milvus-io/milvus/internal/log"