perf: add the native service mode in order to start (#1524)

This commit is contained in:
张启航 2023-01-17 15:37:09 +08:00 committed by GitHub
parent f269ae8ad2
commit 9f05ce73e8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 104 additions and 19 deletions

View File

@ -99,4 +99,4 @@ var ONLINEBUILDERIMAGENAME string
var ONLINERUNNERIMAGENAME string
// CIVERSION -
var CIVERSION = "v5.8.1-release"
var CIVERSION = "v5.11.0-release"

View File

@ -27,19 +27,34 @@ import (
//GetCmds get init probe cmds
func GetCmds() cli.Commands {
var commands cli.Commands
commands = append(commands, cli.Command{
Name: "probe",
Usage: "probe depend service endpoint whether ready",
Action: func(ctx *cli.Context) error {
logrus.Info("start probe all depend service")
controller, err := healthy.NewDependServiceHealthController()
if err != nil {
logrus.Fatalf("new probe controller failure %s", err.Error())
return err
}
controller.Check()
return nil
commands = []cli.Command{
{
Name: "probe",
Usage: "probe depend service endpoint whether ready",
Action: func(ctx *cli.Context) error {
logrus.Info("start probe all depend service")
controller, err := healthy.NewDependServiceHealthController()
if err != nil {
logrus.Fatalf("new probe controller failure %s", err.Error())
return err
}
controller.Check()
return nil
},
}, {
Name: "decoupling_probe",
Usage: "decoupling probe depend service endpoint whether ready",
Action: func(ctx *cli.Context) error {
logrus.Info("start decoupling probe all depend service")
controller, err := healthy.NewDecouplingDependServiceHealthController()
if err != nil {
logrus.Fatalf("new decoupling probe controller failure %s", err.Error())
return err
}
controller.Check()
return nil
},
},
})
}
return commands
}

View File

@ -20,7 +20,9 @@ package healthy
import (
"context"
"encoding/json"
"fmt"
"net"
"os"
"strings"
"time"
@ -52,6 +54,14 @@ type DependServiceHealthController struct {
clusterID string
dependServiceNames []string
ignoreCheckEndpointsClusterName []string
dependentComponents []DependentComponents
}
//DependentComponents -
type DependentComponents struct {
K8sServiceName string `json:"k8s_service_name"`
Port int `json:"port"`
Protocol string `json:"protocol"`
}
//NewDependServiceHealthController create a controller
@ -82,6 +92,20 @@ func NewDependServiceHealthController() (*DependServiceHealthController, error)
return &dsc, nil
}
//NewDecouplingDependServiceHealthController create a decoupling controller
func NewDecouplingDependServiceHealthController() (*DependServiceHealthController, error) {
dsc := DependServiceHealthController{
interval: time.Second * 5,
}
dsc.checkFunc = append(dsc.checkFunc, dsc.checkDependentComponentsPorts)
dependentComponents := os.Getenv("DependentComponents")
err := json.Unmarshal([]byte(dependentComponents), &dsc.dependentComponents)
if err != nil {
return nil, err
}
return &dsc, nil
}
//Check check all conditions
func (d *DependServiceHealthController) Check() {
logrus.Info("start denpenent health check.")
@ -134,6 +158,29 @@ func (d *DependServiceHealthController) checkClusters() bool {
return true
}
func (d *DependServiceHealthController) checkDependentComponentsPorts() bool {
for _, dependentComponent := range d.dependentComponents {
logrus.Infof("start check service %v port %v", dependentComponent.K8sServiceName, dependentComponent.Port)
var conn net.Conn
var err error
address := fmt.Sprintf(dependentComponent.K8sServiceName+":%v", dependentComponent.Port)
if dependentComponent.Protocol == "udp" {
conn, err = net.DialTimeout("udp", address, 3*time.Second)
} else {
conn, err = net.DialTimeout("tcp", address, 3*time.Second)
}
if err != nil {
logrus.Errorf("service %v port %v connection failed %v", dependentComponent.K8sServiceName, dependentComponent.Port, err)
return false
}
if conn == nil {
logrus.Errorf("service %v port %v connection failed", dependentComponent.K8sServiceName, dependentComponent.Port)
return false
}
}
return true
}
func (d *DependServiceHealthController) checkEDS() bool {
logrus.Infof("start checking eds; dependent service cluster names: %s", d.dependServiceNames)
if len(d.clusters) == len(d.ignoreCheckEndpointsClusterName) {

View File

@ -186,7 +186,7 @@ type TenantServicesPortDao interface {
HasOpenPort(sid string) bool
DelByServiceID(sid string) error
ListInnerPortsByServiceIDs(serviceIDs []string) ([]*model.TenantServicesPort, error)
ListByK8sServiceNames(serviceIDs []string) ([]*model.TenantServicesPort, error)
ListByK8sServiceNames(k8sServiceNames []string) ([]*model.TenantServicesPort, error)
CreateOrUpdatePortsInBatch(ports []*model.TenantServicesPort) error
DeleteByComponentIDs(componentIDs []string) error
}

View File

@ -21,6 +21,7 @@ package conversion
import (
"encoding/json"
"fmt"
"github.com/goodrain/rainbond/cmd/init-probe/healthy"
"github.com/goodrain/rainbond/worker/appm/volume"
"os"
"strconv"
@ -67,9 +68,6 @@ func conversionServicePlugin(as *typesv1.AppService, dbmanager db.Manager) ([]v1
if err != nil && err.Error() != gorm.ErrRecordNotFound.Error() {
return nil, nil, nil, fmt.Errorf("find plugins error. %v", err.Error())
}
if len(appPlugins) == 0 && !as.NeedProxy {
return nil, nil, nil, nil
}
netPlugin := false
var meshPluginID string
var mainContainer v1.Container
@ -221,7 +219,32 @@ func conversionServicePlugin(as *typesv1.AppService, dbmanager db.Manager) ([]v1
}
bootSequence := createProbeMeshInitContainer(as, meshPluginID, as.ServiceAlias, mainContainer.Env)
if bootSeqDepServiceIds := as.ExtensionSet["boot_seq_dep_service_ids"]; as.NeedProxy && bootSeqDepServiceIds != "" {
if as.GovernanceMode != model.GovernanceModeBuildInServiceMesh {
bootSequence.Args = []string{"decoupling_probe"}
var dependentComponents []healthy.DependentComponents
services, err := db.GetManager().TenantServiceRelationDao().GetTenantServiceRelations(as.ServiceID)
var dependServiceIDs []string
for _, service := range services {
dependServiceIDs = append(dependServiceIDs, service.DependServiceID)
}
servicePorts, err := db.GetManager().TenantServicesPortDao().ListInnerPortsByServiceIDs(dependServiceIDs)
for _, servicePort := range servicePorts {
dependentComponents = append(dependentComponents, healthy.DependentComponents{
K8sServiceName: servicePort.K8sServiceName,
Port: servicePort.ContainerPort,
Protocol: servicePort.Protocol,
})
}
dependentComponentsBytes, err := json.Marshal(dependentComponents)
if err != nil {
logrus.Errorf("dependent components serialization failure %s", err.Error())
}
bootSequence.Env = append(bootSequence.Env, v1.EnvVar{
Name: "DependentComponents",
Value: string(dependentComponentsBytes),
})
}
if bootSeqDepServiceIds := as.ExtensionSet["boot_seq_dep_service_ids"]; bootSeqDepServiceIds != "" {
initContainers = append(initContainers, bootSequence)
}
as.BootSeqContainer = &bootSequence