milvus/internal/distributed/indexnode/service.go
zhenshan.cao e83ac41bca Add component for index
Signed-off-by: zhenshan.cao <zhenshan.cao@zilliz.com>
2021-01-26 18:01:32 +08:00

191 lines
4.7 KiB
Go

package grpcindexnode
import (
"context"
"log"
"net"
"strconv"
"sync"
serviceclient "github.com/zilliztech/milvus-distributed/internal/distributed/indexservice/client"
"github.com/zilliztech/milvus-distributed/internal/indexnode"
"github.com/zilliztech/milvus-distributed/internal/proto/commonpb"
"github.com/zilliztech/milvus-distributed/internal/proto/indexpb"
"github.com/zilliztech/milvus-distributed/internal/proto/internalpb2"
"github.com/zilliztech/milvus-distributed/internal/proto/milvuspb"
"github.com/zilliztech/milvus-distributed/internal/util/typeutil"
"google.golang.org/grpc"
)
type Server struct {
node typeutil.IndexNodeInterface
grpcServer *grpc.Server
serverClient typeutil.IndexServiceInterface
loopCtx context.Context
loopCancel func()
loopWg sync.WaitGroup
}
func (s *Server) GetComponentStates(ctx context.Context, empty *commonpb.Empty) (*internalpb2.ComponentStates, error) {
return s.node.GetComponentStates()
}
func (s *Server) GetTimeTickChannel(ctx context.Context, empty *commonpb.Empty) (*milvuspb.StringResponse, error) {
ret, err := s.node.GetTimeTickChannel()
resp := &milvuspb.StringResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
},
}
if err != nil {
resp.Status.ErrorCode = commonpb.ErrorCode_UNEXPECTED_ERROR
resp.Status.Reason = err.Error()
} else {
resp.Value = ret
}
return resp, nil
}
func (s *Server) GetStatisticsChannel(ctx context.Context, empty *commonpb.Empty) (*milvuspb.StringResponse, error) {
ret, err := s.node.GetStatisticsChannel()
resp := &milvuspb.StringResponse{
Status: &commonpb.Status{
ErrorCode: commonpb.ErrorCode_SUCCESS,
},
}
if err != nil {
resp.Status.ErrorCode = commonpb.ErrorCode_UNEXPECTED_ERROR
resp.Status.Reason = err.Error()
} else {
resp.Value = ret
}
return resp, nil
}
func (s *Server) registerNode() error {
log.Printf("Registering node. IP = %s, Port = %d", indexnode.Params.NodeIP, indexnode.Params.NodePort)
request := &indexpb.RegisterNodeRequest{
Base: nil,
Address: &commonpb.Address{
Ip: indexnode.Params.NodeIP,
Port: int64(indexnode.Params.NodePort),
},
}
resp, err := s.serverClient.RegisterNode(request)
if err != nil {
log.Printf("IndexNode connect to IndexService failed, error= %v", err)
return err
}
indexnode.Params.NodeID = resp.InitParams.NodeID
log.Println("Register indexNode successful with nodeID=", indexnode.Params.NodeID)
err = indexnode.Params.LoadFromKVPair(resp.InitParams.StartParams)
return err
}
func (s *Server) grpcLoop() {
defer s.loopWg.Done()
lis, err := net.Listen("tcp", ":"+strconv.Itoa(indexnode.Params.NodePort))
if err != nil {
log.Fatalf("IndexNode grpc server fatal error=%v", err)
}
s.grpcServer = grpc.NewServer()
indexpb.RegisterIndexNodeServer(s.grpcServer, s)
if err = s.grpcServer.Serve(lis); err != nil {
log.Fatalf("IndexNode grpc server fatal error=%v", err)
}
}
func (s *Server) startIndexNode() error {
s.loopWg.Add(1)
//TODO: How to make sure that grpc server has started successfully
go s.grpcLoop()
log.Println("IndexNode grpc server start successfully")
err := s.registerNode()
if err != nil {
return err
}
indexnode.Params.Init()
return s.node.Start()
}
func Init() error {
indexnode.Params.Init()
//Get native ip
addresses, err := net.InterfaceAddrs()
if err != nil {
panic(err)
}
for _, value := range addresses {
if ipnet, ok := value.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
if ipnet.IP.To4() != nil {
indexnode.Params.NodeIP = ipnet.IP.String()
break
}
}
}
//Generate random and available port
listener, err := net.Listen("tcp", ":0")
if err != nil {
return err
}
indexnode.Params.NodePort = listener.Addr().(*net.TCPAddr).Port
listener.Close()
indexnode.Params.NodeAddress = indexnode.Params.NodeIP + ":" + strconv.FormatInt(int64(indexnode.Params.NodePort), 10)
log.Println("IndexNode init successfully, nodeAddress=", indexnode.Params.NodeAddress)
return nil
}
func (s *Server) Start() error {
return s.startIndexNode()
}
func (s *Server) Stop() error {
s.node.Stop()
s.loopCancel()
if s.grpcServer != nil {
s.grpcServer.GracefulStop()
}
s.loopWg.Wait()
return nil
}
func (s *Server) BuildIndex(ctx context.Context, req *indexpb.BuildIndexCmd) (*commonpb.Status, error) {
return s.node.BuildIndex(req)
}
func NewGrpcServer(ctx context.Context) (*Server, error) {
ctx1, cancel := context.WithCancel(ctx)
indexServiceClient := serviceclient.NewClient(indexnode.Params.ServiceAddress)
node, err := indexnode.CreateIndexNode(ctx1)
if err != nil {
defer cancel()
return nil, err
}
node.SetServiceClient(indexServiceClient)
return &Server{
loopCtx: ctx1,
loopCancel: cancel,
node: node,
serverClient: indexServiceClient,
}, nil
}