mirror of
https://gitee.com/johng/gf.git
synced 2024-11-29 18:57:44 +08:00
326 lines
8.0 KiB
Go
326 lines
8.0 KiB
Go
// Copyright GoFrame Author(https://goframe.org). All Rights Reserved.
|
|
//
|
|
// This Source Code Form is subject to the terms of the MIT License.
|
|
// If a copy of the MIT was not distributed with this file,
|
|
// You can obtain one at https://github.com/gogf/gf.
|
|
|
|
package grpcx
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net"
|
|
"os"
|
|
"sync"
|
|
"time"
|
|
|
|
"google.golang.org/grpc"
|
|
|
|
"github.com/gogf/gf/v2/errors/gcode"
|
|
"github.com/gogf/gf/v2/errors/gerror"
|
|
"github.com/gogf/gf/v2/frame/g"
|
|
"github.com/gogf/gf/v2/net/gipv4"
|
|
"github.com/gogf/gf/v2/net/gsvc"
|
|
"github.com/gogf/gf/v2/os/gctx"
|
|
"github.com/gogf/gf/v2/os/glog"
|
|
"github.com/gogf/gf/v2/os/gproc"
|
|
"github.com/gogf/gf/v2/text/gstr"
|
|
"github.com/gogf/gf/v2/util/gconv"
|
|
)
|
|
|
|
// GrpcServer is the server for GRPC protocol.
|
|
type GrpcServer struct {
|
|
Server *grpc.Server
|
|
config *GrpcServerConfig
|
|
listener net.Listener
|
|
services []gsvc.Service
|
|
waitGroup sync.WaitGroup
|
|
registrar gsvc.Registrar
|
|
serviceMu sync.Mutex
|
|
}
|
|
|
|
// Service implements gsvc.Service interface.
|
|
type Service struct {
|
|
gsvc.Service
|
|
Endpoints gsvc.Endpoints
|
|
}
|
|
|
|
// New creates and returns a grpc server.
|
|
func (s modServer) New(conf ...*GrpcServerConfig) *GrpcServer {
|
|
autoLoadAndRegisterFileRegistry()
|
|
|
|
var (
|
|
ctx = gctx.GetInitCtx()
|
|
config *GrpcServerConfig
|
|
)
|
|
if len(conf) > 0 {
|
|
config = conf[0]
|
|
} else {
|
|
config = s.NewConfig()
|
|
}
|
|
if config.Address == "" {
|
|
config.Address = defaultListenAddress
|
|
}
|
|
if !gstr.Contains(config.Address, ":") {
|
|
g.Log().Fatal(ctx, "invalid service address, should contain listening port")
|
|
}
|
|
if config.Logger == nil {
|
|
config.Logger = glog.New()
|
|
}
|
|
grpcServer := &GrpcServer{
|
|
config: config,
|
|
registrar: gsvc.GetRegistry(),
|
|
}
|
|
grpcServer.config.Options = append([]grpc.ServerOption{
|
|
s.ChainUnary(
|
|
s.UnaryTracing,
|
|
grpcServer.UnaryLogger,
|
|
s.UnaryRecover,
|
|
s.UnaryAllowNilRes,
|
|
s.UnaryError,
|
|
),
|
|
s.ChainStream(
|
|
s.StreamTracing,
|
|
),
|
|
}, grpcServer.config.Options...)
|
|
grpcServer.Server = grpc.NewServer(grpcServer.config.Options...)
|
|
return grpcServer
|
|
}
|
|
|
|
// Service binds service list to current server.
|
|
// Server will automatically register the service list after it starts.
|
|
func (s *GrpcServer) Service(services ...gsvc.Service) {
|
|
s.serviceMu.Lock()
|
|
defer s.serviceMu.Unlock()
|
|
s.services = append(s.services, services...)
|
|
}
|
|
|
|
// Run starts the server in blocking way.
|
|
func (s *GrpcServer) Run() {
|
|
var (
|
|
err error
|
|
ctx = gctx.GetInitCtx()
|
|
)
|
|
// Create listener to bind listening ip and port.
|
|
s.listener, err = net.Listen("tcp", s.config.Address)
|
|
if err != nil {
|
|
s.Logger().Fatalf(ctx, `%+v`, err)
|
|
}
|
|
|
|
// Start listening.
|
|
go s.doServeAsynchronously(ctx)
|
|
|
|
// Service register.
|
|
s.doServiceRegister()
|
|
s.Logger().Infof(
|
|
ctx,
|
|
"pid[%d]: grpc server started listening on [%s]",
|
|
gproc.Pid(), s.GetListenedAddress(),
|
|
)
|
|
s.doSignalListen()
|
|
}
|
|
|
|
func (s *GrpcServer) doServeAsynchronously(ctx context.Context) {
|
|
if err := s.Server.Serve(s.listener); err != nil {
|
|
s.Logger().Fatalf(ctx, `%+v`, err)
|
|
}
|
|
}
|
|
|
|
// doSignalListen does signal listening and handling for gracefully shutdown.
|
|
func (s *GrpcServer) doSignalListen() {
|
|
var ctx = context.Background()
|
|
gproc.AddSigHandlerShutdown(func(sig os.Signal) {
|
|
s.Logger().Infof(ctx, "signal received: %s, gracefully shutting down", sig.String())
|
|
// Deregister services when shutdown signal triggers.
|
|
s.doServiceDeregister()
|
|
time.Sleep(time.Second)
|
|
s.Stop()
|
|
})
|
|
gproc.Listen()
|
|
// Deregister services when process ends.
|
|
s.doServiceDeregister()
|
|
}
|
|
|
|
// Logger is alias of GetLogger.
|
|
func (s *GrpcServer) Logger() *glog.Logger {
|
|
return s.config.Logger
|
|
}
|
|
|
|
// doServiceRegister registers current service to Registry.
|
|
func (s *GrpcServer) doServiceRegister() {
|
|
if s.registrar == nil {
|
|
return
|
|
}
|
|
s.serviceMu.Lock()
|
|
defer s.serviceMu.Unlock()
|
|
if len(s.services) == 0 {
|
|
s.services = []gsvc.Service{&gsvc.LocalService{
|
|
Name: s.config.Name,
|
|
Metadata: gsvc.Metadata{},
|
|
}}
|
|
}
|
|
var (
|
|
err error
|
|
ctx = gctx.GetInitCtx()
|
|
protocol = `grpc`
|
|
)
|
|
// Register service list after server starts.
|
|
for i, service := range s.services {
|
|
service = &gsvc.LocalService{
|
|
Name: service.GetName(),
|
|
Endpoints: s.calculateListenedEndpoints(ctx),
|
|
Metadata: service.GetMetadata(),
|
|
}
|
|
service.GetMetadata().Sets(gsvc.Metadata{
|
|
gsvc.MDProtocol: protocol,
|
|
})
|
|
s.Logger().Debugf(ctx, `service register: %+v`, service)
|
|
if len(service.GetEndpoints()) == 0 {
|
|
s.Logger().Warningf(ctx, `no endpoints found to register service, abort service registering`)
|
|
return
|
|
}
|
|
if service, err = s.registrar.Register(ctx, service); err != nil {
|
|
s.Logger().Fatalf(ctx, `%+v`, err)
|
|
}
|
|
s.services[i] = service
|
|
}
|
|
}
|
|
|
|
// doServiceDeregister de-registers current service from Registry.
|
|
func (s *GrpcServer) doServiceDeregister() {
|
|
if s.registrar == nil {
|
|
return
|
|
}
|
|
s.serviceMu.Lock()
|
|
defer s.serviceMu.Unlock()
|
|
var ctx = gctx.GetInitCtx()
|
|
for _, service := range s.services {
|
|
s.Logger().Debugf(ctx, `service deregister: %+v`, service)
|
|
if err := s.registrar.Deregister(ctx, service); err != nil {
|
|
s.Logger().Errorf(ctx, `%+v`, err)
|
|
}
|
|
}
|
|
s.services = s.services[:0]
|
|
}
|
|
|
|
// Start starts the server in no-blocking way.
|
|
func (s *GrpcServer) Start() {
|
|
s.waitGroup.Add(1)
|
|
go s.doStartAsynchronously()
|
|
}
|
|
|
|
func (s *GrpcServer) doStartAsynchronously() {
|
|
defer s.waitGroup.Done()
|
|
s.Run()
|
|
}
|
|
|
|
// Wait works with Start, which blocks current goroutine until the server stops.
|
|
func (s *GrpcServer) Wait() {
|
|
s.waitGroup.Wait()
|
|
}
|
|
|
|
// Stop gracefully stops the server.
|
|
func (s *GrpcServer) Stop() {
|
|
s.doServiceDeregister()
|
|
s.Server.GracefulStop()
|
|
}
|
|
|
|
// GetConfig returns the configuration of current Server.
|
|
func (s *GrpcServer) GetConfig() *GrpcServerConfig {
|
|
return s.config
|
|
}
|
|
|
|
// GetListenedAddress retrieves and returns the address string which are listened by current server.
|
|
func (s *GrpcServer) GetListenedAddress() string {
|
|
if !gstr.Contains(s.config.Address, FreePortAddress) {
|
|
return s.config.Address
|
|
}
|
|
var (
|
|
address = s.config.Address
|
|
listenedPort = s.GetListenedPort()
|
|
)
|
|
address = gstr.Replace(address, FreePortAddress, fmt.Sprintf(`:%d`, listenedPort))
|
|
return address
|
|
}
|
|
|
|
// GetListenedPort retrieves and returns one port which is listened to by current server.
|
|
func (s *GrpcServer) GetListenedPort() int {
|
|
if ln := s.listener; ln != nil {
|
|
return ln.Addr().(*net.TCPAddr).Port
|
|
}
|
|
return -1
|
|
}
|
|
|
|
func (s *GrpcServer) calculateListenedEndpoints(ctx context.Context) gsvc.Endpoints {
|
|
var (
|
|
configAddr = s.config.Address
|
|
endpoints = make(gsvc.Endpoints, 0)
|
|
addresses = s.config.Endpoints
|
|
)
|
|
if len(addresses) == 0 {
|
|
addresses = gstr.SplitAndTrim(configAddr, ",")
|
|
}
|
|
for _, address := range addresses {
|
|
var (
|
|
addrArray = gstr.Split(address, ":")
|
|
listenedIps []string
|
|
listenedPorts []int
|
|
)
|
|
if len(addrArray) == 1 {
|
|
configItemName := "address"
|
|
if len(s.config.Endpoints) != 0 {
|
|
configItemName = "endpoint"
|
|
}
|
|
panic(gerror.NewCodef(
|
|
gcode.CodeInvalidConfiguration,
|
|
`invalid "%s" configuration "%s", missing port`,
|
|
configItemName, address,
|
|
))
|
|
}
|
|
// IPs.
|
|
switch addrArray[0] {
|
|
case "0.0.0.0", "":
|
|
intranetIps, err := gipv4.GetIntranetIpArray()
|
|
if err != nil {
|
|
s.Logger().Errorf(ctx, `error retrieving intranet ip: %+v`, err)
|
|
return nil
|
|
}
|
|
if len(intranetIps) != 0 {
|
|
listenedIps = intranetIps
|
|
break
|
|
}
|
|
// If no intranet ips found, it uses all ips that can be retrieved,
|
|
// it may include internet ip.
|
|
allIps, err := gipv4.GetIpArray()
|
|
if err != nil {
|
|
s.Logger().Errorf(ctx, `error retrieving ip from current node: %+v`, err)
|
|
return nil
|
|
}
|
|
s.Logger().Noticef(
|
|
ctx,
|
|
`no intranet ip found, using internet ip to register service: %v`,
|
|
allIps,
|
|
)
|
|
listenedIps = allIps
|
|
default:
|
|
listenedIps = []string{addrArray[0]}
|
|
}
|
|
// Ports.
|
|
switch addrArray[1] {
|
|
case "0":
|
|
listenedPorts = []int{s.GetListenedPort()}
|
|
default:
|
|
listenedPorts = []int{gconv.Int(addrArray[1])}
|
|
}
|
|
for _, ip := range listenedIps {
|
|
for _, port := range listenedPorts {
|
|
endpoints = append(
|
|
endpoints,
|
|
gsvc.NewEndpoint(fmt.Sprintf(`%s:%d`, ip, port)),
|
|
)
|
|
}
|
|
}
|
|
}
|
|
return endpoints
|
|
}
|