Signed-off-by: neza2017 <yefu.chen@zilliz.com>
This commit is contained in:
neza2017 2021-02-01 12:04:21 +08:00 committed by yefu.chen
parent 3f45cfd6e8
commit f6db55fab5
6 changed files with 353 additions and 121 deletions

View File

@ -125,8 +125,19 @@ func (m *MasterService) Run() error {
}
func (m *MasterService) Stop() error {
if m != nil {
if m.proxyService != nil {
_ = m.proxyService.Stop()
}
if m.indexService != nil {
_ = m.indexService.Stop()
}
if m.dataService != nil {
_ = m.dataService.Stop()
}
if m.svr != nil {
return m.svr.Stop()
}
}
return nil
}

View File

@ -0,0 +1,20 @@
package components
import (
"context"
)
func NewMsgStreamService(ctx context.Context) (*MsgStream, error) {
return nil, nil
}
type MsgStream struct {
}
func (ps *MsgStream) Run() error {
return nil
}
func (ps *MsgStream) Stop() error {
return nil
}

View File

@ -0,0 +1,20 @@
package components
import (
"context"
)
func NewQueryNode(ctx context.Context) (*QueryNode, error) {
return nil, nil
}
type QueryNode struct {
}
func (ps *QueryNode) Run() error {
return nil
}
func (ps *QueryNode) Stop() error {
return nil
}

View File

@ -0,0 +1,20 @@
package components
import (
"context"
)
func NewQueryService(ctx context.Context) (*QueryService, error) {
return nil, nil
}
type QueryService struct {
}
func (ps *QueryService) Run() error {
return nil
}
func (ps *QueryService) Stop() error {
return nil
}

View File

@ -2,19 +2,14 @@ package main
import (
"context"
"fmt"
"flag"
"log"
"os"
"os/signal"
"path/filepath"
"strings"
"syscall"
"github.com/go-kit/kit/log/level"
"github.com/ilyakaznacheev/cleanenv"
"github.com/oklog/run"
"github.com/pkg/errors"
"github.com/prometheus/common/promlog"
grpcproxyservice "github.com/zilliztech/milvus-distributed/internal/distributed/proxyservice"
"gopkg.in/alecthomas/kingpin.v2"
"github.com/zilliztech/milvus-distributed/cmd/distributed/components"
)
type MilvusRoles struct {
@ -38,137 +33,275 @@ func (mr *MilvusRoles) hasAnyRole() bool {
mr.EnableIndexService || mr.EnableIndexNode
}
var roles MilvusRoles
func (mr *MilvusRoles) envValue(env string) bool {
env = strings.ToLower(env)
env = strings.Trim(env, " ")
if env == "1" || env == "true" {
return true
}
return false
}
func main() {
a := kingpin.New(filepath.Base(os.Args[0]), "Milvus")
var roles MilvusRoles
a.HelpFlag.Short('h')
a.Flag("master", "Run master service").Short('m').Default("false").BoolVar(&roles.EnableMaster)
a.Flag("msgstream-service", "Run msgstream service").Short('M').Default("false").BoolVar(&roles.EnableMsgStreamService)
a.Flag("proxy-service", "Run proxy service").Short('p').Default("false").BoolVar(&roles.EnableProxyService)
a.Flag("proxy-node", "Run proxy node").Short('P').Default("false").BoolVar(&roles.EnableProxyNode)
a.Flag("query-service", "Run query service").Short('q').Default("false").BoolVar(&roles.EnableQueryService)
a.Flag("query-node", "Run query node").Short('Q').Default("false").BoolVar(&roles.EnableQueryNode)
a.Flag("data-service", "Run data service").Short('d').Default("false").BoolVar(&roles.EnableDataService)
a.Flag("data-node", "Run data node").Short('D').Default("false").BoolVar(&roles.EnableDataNode)
a.Flag("index-service", "Run index service").Short('i').Default("false").BoolVar(&roles.EnableIndexService)
a.Flag("index-node", "Run index node").Short('I').Default("false").BoolVar(&roles.EnableIndexNode)
_, err := a.Parse(os.Args[1:])
if err != nil {
fmt.Fprintln(os.Stderr, errors.Wrapf(err, "Error parsing commandline arguments"))
a.Usage(os.Args[1:])
os.Exit(2)
}
flag.BoolVar(&roles.EnableMaster, "master-service", false, "start as master service")
flag.BoolVar(&roles.EnableProxyService, "proxy-service", false, "start as proxy service")
flag.BoolVar(&roles.EnableProxyNode, "proxy-node", false, "start as proxy node")
flag.BoolVar(&roles.EnableQueryService, "query-service", false, "start as query service")
flag.BoolVar(&roles.EnableQueryNode, "query-node", false, "start as query node")
flag.BoolVar(&roles.EnableDataService, "data-service", false, "start as data service")
flag.BoolVar(&roles.EnableDataNode, "data-node", false, "start as data node")
flag.BoolVar(&roles.EnableIndexService, "index-service", false, "start as index service")
flag.BoolVar(&roles.EnableIndexNode, "index-node", false, "start as index node")
flag.BoolVar(&roles.EnableMsgStreamService, "msg-stream", false, "start as msg stream service")
flag.Parse()
if !roles.hasAnyRole() {
err := cleanenv.ReadEnv(&roles)
if err != nil {
fmt.Println(err)
os.Exit(-1)
for _, e := range os.Environ() {
pairs := strings.SplitN(e, "=", 2)
if len(pairs) == 2 {
switch pairs[0] {
case "ENABLE_MASTER":
roles.EnableMaster = roles.envValue(pairs[1])
case "ENABLE_PROXY_SERVICE":
roles.EnableProxyService = roles.envValue(pairs[1])
case "ENABLE_PROXY_NODE":
roles.EnableProxyNode = roles.envValue(pairs[1])
case "ENABLE_QUERY_SERVICE":
roles.EnableQueryService = roles.envValue(pairs[1])
case "ENABLE_QUERY_NODE":
roles.EnableQueryNode = roles.envValue(pairs[1])
case "ENABLE_DATA_SERVICE":
roles.EnableDataService = roles.envValue(pairs[1])
case "ENABLE_DATA_NODE":
roles.EnableDataNode = roles.envValue(pairs[1])
case "ENABLE_INDEX_SERVICE":
roles.EnableIndexService = roles.envValue(pairs[1])
case "ENABLE_INDEX_NODE":
roles.EnableIndexNode = roles.envValue(pairs[1])
case "ENABLE_MSGSTREAM_SERVICE":
roles.EnableMsgStreamService = roles.envValue(pairs[1])
}
}
}
}
if !roles.hasAnyRole() {
fmt.Println("Please select at least one service to start")
os.Exit(-1)
log.Printf("set the roles please ...")
return
}
logger := promlog.New(NewLogConfig())
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var (
ctxProxyService, cancelProxyService = context.WithCancel(context.Background())
proxyService = NewProxyService(ctxProxyService)
)
var masterService *components.MasterService
if roles.EnableMaster {
log.Print("start as master service")
go func() {
var err error
masterService, err = components.NewMasterService(ctx)
if err != nil {
panic(err)
}
_ = masterService.Run()
}()
}
var g run.Group
{
// Termination handler.
term := make(chan os.Signal, 1)
signal.Notify(term, os.Interrupt, syscall.SIGTERM)
cancel := make(chan struct{})
g.Add(
func() error {
select {
case <-term:
level.Warn(logger).Log("msg", "Received SIGTERM, exiting gracefully...")
case <-cancel:
}
return nil
},
func(err error) {
close(cancel)
},
)
}
var proxyService *components.ProxyService
if roles.EnableProxyService {
// ProxyService
g.Add(
func() error {
err := proxyService.Run()
level.Info(logger).Log("msg", "Proxy service stopped")
return err
},
func(err error) {
level.Info(logger).Log("msg", "Stopping proxy service...")
cancelProxyService()
},
)
log.Print("start as proxy service")
go func() {
var err error
proxyService, err = components.NewProxyService(ctx)
if err != nil {
panic(err)
}
_ = proxyService.Run()
}()
}
var proxyNode *components.ProxyNode
if roles.EnableProxyNode {
// ProxyNode
}
if err := g.Run(); err != nil {
level.Error(logger).Log("err", err)
os.Exit(1)
}
level.Info(logger).Log("msg", "See you next time!")
}
func NewLogConfig() *promlog.Config {
logConfig := promlog.Config{
Level: &promlog.AllowedLevel{},
Format: &promlog.AllowedFormat{},
}
err := logConfig.Level.Set("debug")
log.Print("start as proxy node")
go func() {
var err error
proxyNode, err = components.NewProxyNode(ctx)
if err != nil {
fmt.Println(err)
os.Exit(-1)
panic(err)
}
_ = proxyNode.Run()
}()
}
err = logConfig.Format.Set("logfmt")
var queryService *components.QueryService
if roles.EnableQueryService {
log.Print("start as query service")
go func() {
var err error
queryService, err = components.NewQueryService(ctx)
if err != nil {
fmt.Println(err)
os.Exit(-1)
panic(err)
}
_ = queryService.Run()
}()
}
return &logConfig
}
var queryNode *components.QueryNode
if roles.EnableQueryNode {
log.Print("start as query node")
go func() {
var err error
queryNode, err = components.NewQueryNode(ctx)
if err != nil {
panic(err)
}
_ = queryNode.Run()
}()
}
// Move to proxyservice package
func NewProxyService(ctx context.Context) *ProxyService {
srv, _ := grpcproxyservice.NewServer(ctx)
ps := &ProxyService{ctx: ctx, server: srv}
return ps
}
var dataService *components.DataService
if roles.EnableDataService {
log.Print("start as data service")
go func() {
var err error
dataService, err = components.NewDataService(ctx)
if err != nil {
panic(err)
}
_ = dataService.Run()
}()
}
type ProxyService struct {
ctx context.Context
server *grpcproxyservice.Server
}
var dataNode *components.DataNode
if roles.EnableDataNode {
log.Print("start as data node")
go func() {
var err error
dataNode, err = components.NewDataNode(ctx)
if err != nil {
panic(err)
}
_ = dataNode.Run()
}()
}
var indexService *components.IndexService
if roles.EnableIndexService {
log.Print("start as index service")
go func() {
var err error
indexService, err = components.NewIndexService(ctx)
if err != nil {
panic(err)
}
_ = indexService.Run()
}()
}
var indexNode *components.IndexNode
if roles.EnableIndexNode {
log.Print("start as index node")
go func() {
var err error
indexNode, err = components.NewIndexNode(ctx)
if err != nil {
panic(err)
}
_ = indexNode.Run()
}()
}
var msgStream *components.MsgStream
if roles.EnableMsgStreamService {
log.Print("start as msg stream service")
go func() {
var err error
msgStream, err = components.NewMsgStreamService(ctx)
if err != nil {
panic(err)
}
_ = msgStream.Run()
}()
}
sc := make(chan os.Signal, 1)
signal.Notify(sc,
syscall.SIGHUP,
syscall.SIGINT,
syscall.SIGTERM,
syscall.SIGQUIT)
sig := <-sc
log.Printf("Get %s signal to exit", sig.String())
if roles.EnableMaster {
if masterService != nil {
_ = masterService.Stop()
}
log.Printf("exit master service")
}
if roles.EnableProxyService {
if proxyService != nil {
_ = proxyService.Stop()
}
log.Printf("exit proxy service")
}
if roles.EnableProxyNode {
if proxyNode != nil {
_ = proxyNode.Stop()
}
log.Printf("exit proxy node")
}
if roles.EnableQueryService {
if queryService != nil {
_ = queryService.Stop()
}
log.Printf("exit query service")
}
if roles.EnableQueryNode {
if queryNode != nil {
_ = queryNode.Stop()
}
log.Printf("exit query node")
}
if roles.EnableDataService {
if dataService != nil {
_ = dataService.Stop()
}
log.Printf("exit data service")
}
if roles.EnableDataNode {
if dataNode != nil {
_ = dataNode.Stop()
}
log.Printf("exit data node")
}
if roles.EnableIndexService {
if indexService != nil {
_ = indexService.Stop()
}
log.Printf("exit index service")
}
if roles.EnableIndexNode {
if indexNode != nil {
_ = indexNode.Stop()
}
log.Printf("exit index node")
}
if roles.EnableMsgStreamService {
if msgStream != nil {
_ = msgStream.Stop()
}
log.Printf("exit msg stream service")
}
func (ps *ProxyService) Run() error {
return ps.server.Run()
}

View File

@ -0,0 +1,28 @@
package main
import (
"strings"
"testing"
"github.com/stretchr/testify/assert"
)
func TestRoles(t *testing.T) {
r := MilvusRoles{}
assert.True(t, r.envValue("1"))
assert.True(t, r.envValue(" 1 "))
assert.True(t, r.envValue("True"))
assert.True(t, r.envValue(" True "))
assert.True(t, r.envValue(" TRue "))
assert.False(t, r.envValue("0"))
assert.False(t, r.envValue(" 0 "))
assert.False(t, r.envValue(" false "))
assert.False(t, r.envValue(" False "))
assert.False(t, r.envValue(" abc "))
ss := strings.SplitN("abcdef", "=", 2)
assert.Equal(t, len(ss), 1)
ss = strings.SplitN("adb=def", "=", 2)
assert.Equal(t, len(ss), 2)
}