From fad9ffab8d38dc9fc5272f73ba3717d3dee0ee58 Mon Sep 17 00:00:00 2001 From: barnettZQG Date: Thu, 17 Jun 2021 09:48:00 +0800 Subject: [PATCH] controls the mesh container to start first. --- cmd/mesh-data-panel/sidecar.go | 80 +++++++++++++++++++++++ go.mod | 2 +- worker/appm/conversion/plugin.go | 109 +++++++++++++++++++++++-------- 3 files changed, 161 insertions(+), 30 deletions(-) diff --git a/cmd/mesh-data-panel/sidecar.go b/cmd/mesh-data-panel/sidecar.go index 8c24b4e19..9b8f642be 100644 --- a/cmd/mesh-data-panel/sidecar.go +++ b/cmd/mesh-data-panel/sidecar.go @@ -21,6 +21,7 @@ package main import ( "encoding/json" "fmt" + "io/ioutil" "net/http" "os" "os/signal" @@ -39,6 +40,45 @@ func main() { if len(os.Args) > 1 && os.Args[1] == "version" { cmd.ShowVersion("sidecar") } + if len(os.Args) > 1 && os.Args[1] == "wait" { + var timeoutSeconds = 60 + var envoyReadyUrl = "http://127.0.0.1:65533/ready" + var envoyListennerReadyUrl = "http://127.0.0.1:65533/listeners" + var periodMillis = 500 + var requestTimeoutMillis = 500 + client := &http.Client{ + Timeout: time.Duration(requestTimeoutMillis) * time.Millisecond, + } + logrus.Infof("Waiting for Envoy proxy to be ready (timeout: %d seconds)...", timeoutSeconds) + + var err error + timeoutAt := time.Now().Add(time.Duration(timeoutSeconds) * time.Second) + for time.Now().Before(timeoutAt) { + err = checkEnvoyIfReady(client, envoyReadyUrl) + if err == nil { + logrus.Infof("Sidecar server is ready!") + break + } + logrus.Infof("Not ready yet: %v", err) + time.Sleep(time.Duration(periodMillis) * time.Millisecond) + } + if len(os.Args) > 2 && os.Args[2] != "0" { + for time.Now().Before(timeoutAt) { + err = checkEnvoyListenerIfReady(client, envoyListennerReadyUrl, os.Args[2]) + if err == nil { + logrus.Infof("Sidecar is ready!") + os.Exit(0) + } + logrus.Infof("Not ready yet: %v", err) + time.Sleep(time.Duration(periodMillis) * time.Millisecond) + } + } else { + logrus.Infof("Sidecar is ready!") + os.Exit(0) + } + logrus.Errorf("timeout waiting for Envoy proxy to become ready. Last error: %v", err) + os.Exit(1) + } if len(os.Args) > 1 && os.Args[1] == "run" { if err := run(); err != nil { os.Exit(1) @@ -166,3 +206,43 @@ func writeHosts(ipnames map[string]string) error { } return hosts.Flush() } + +func checkEnvoyIfReady(client *http.Client, envoyReadyUrl string) error { + req, err := http.NewRequest(http.MethodGet, envoyReadyUrl, nil) + if err != nil { + return err + } + resp, err := client.Do(req) + if err != nil { + return err + } + defer func() { _ = resp.Body.Close() }() + reBody, err := ioutil.ReadAll(resp.Body) + if err != nil { + return err + } + if resp.StatusCode != 200 || string(reBody) != "LIVE" { + return fmt.Errorf("HTTP status code %v ,body: %s", resp.StatusCode, string(reBody)) + } + return nil +} + +func checkEnvoyListenerIfReady(client *http.Client, url string, port string) error { + req, err := http.NewRequest(http.MethodGet, url, nil) + if err != nil { + return err + } + resp, err := client.Do(req) + if err != nil { + return err + } + defer func() { _ = resp.Body.Close() }() + reBody, err := ioutil.ReadAll(resp.Body) + if err != nil { + return err + } + if resp.StatusCode != 200 || !strings.Contains(string(reBody), fmt.Sprintf(":%s", port)) { + return fmt.Errorf("Check Listeners HTTP status code %v, body is %s", resp.StatusCode, string(reBody)) + } + return nil +} diff --git a/go.mod b/go.mod index 9d8ce8d40..8a164453a 100644 --- a/go.mod +++ b/go.mod @@ -110,7 +110,7 @@ require ( golang.org/x/text v0.3.5 // indirect golang.org/x/time v0.0.0-20201208040808-7e3f01d25324 golang.org/x/tools v0.1.0 // indirect - google.golang.org/appengine v1.6.7 // indirect + google.golang.org/appengine v1.6.7 google.golang.org/genproto v0.0.0-20210218151259-fe80b386bf06 // indirect google.golang.org/grpc v1.35.0 gopkg.in/alecthomas/kingpin.v2 v2.2.6 diff --git a/worker/appm/conversion/plugin.go b/worker/appm/conversion/plugin.go index 0e525ad47..c0820cbb2 100644 --- a/worker/appm/conversion/plugin.go +++ b/worker/appm/conversion/plugin.go @@ -40,22 +40,27 @@ import ( //TenantServicePlugin conv service all plugin func TenantServicePlugin(as *typesv1.AppService, dbmanager db.Manager) error { - initContainers, pluginContainers, bootSeqContainer, err := conversionServicePlugin(as, dbmanager) + initContainers, preContainers, postContainers, err := conversionServicePlugin(as, dbmanager) if err != nil { + logrus.Errorf("create plugin containers for component %s failure: %s", as.ServiceID, err.Error()) return err } - as.BootSeqContainer = bootSeqContainer podtemplate := as.GetPodTemplate() if podtemplate != nil { - podtemplate.Spec.Containers = append(podtemplate.Spec.Containers, pluginContainers...) + if len(preContainers) > 0 { + podtemplate.Spec.Containers = append(preContainers, podtemplate.Spec.Containers...) + } + if len(postContainers) > 0 { + podtemplate.Spec.Containers = append(podtemplate.Spec.Containers, postContainers...) + } podtemplate.Spec.InitContainers = initContainers return nil } return fmt.Errorf("pod templete is nil before define plugin") } -func conversionServicePlugin(as *typesv1.AppService, dbmanager db.Manager) ([]v1.Container, []v1.Container, *v1.Container, error) { - var containers []v1.Container +func conversionServicePlugin(as *typesv1.AppService, dbmanager db.Manager) ([]v1.Container, []v1.Container, []v1.Container, error) { + var precontainers, postcontainers []v1.Container var initContainers []v1.Container appPlugins, err := dbmanager.TenantServicePluginRelationDao().GetALLRelationByServiceID(as.ServiceID) if err != nil && err.Error() != gorm.ErrRecordNotFound.Error() { @@ -73,7 +78,8 @@ func conversionServicePlugin(as *typesv1.AppService, dbmanager db.Manager) ([]v1 var inBoundPlugin *model.TenantServicePluginRelation for _, pluginR := range appPlugins { //if plugin not enable,ignore it - if pluginR.Switch == false { + if !pluginR.Switch { + logrus.Debugf("plugin %s is disable in component %s", pluginR.PluginID, as.ServiceID) continue } versionInfo, err := dbmanager.TenantPluginBuildVersionDao().GetLastBuildVersionByVersionID(pluginR.PluginID, pluginR.VersionID) @@ -119,12 +125,30 @@ func conversionServicePlugin(as *typesv1.AppService, dbmanager db.Manager) ([]v1 if err != nil { return nil, nil, nil, fmt.Errorf("get plugin model info failure %s", err.Error()) } + var preconatiner = false if pluginModel == model.InBoundAndOutBoundNetPlugin || pluginModel == model.InBoundNetPlugin { inBoundPlugin = pluginR + preconatiner = true } if pluginModel == model.OutBoundNetPlugin || pluginModel == model.InBoundAndOutBoundNetPlugin { netPlugin = true meshPluginID = pluginR.PluginID + preconatiner = true + } + if netPlugin { + config, err := dbmanager.TenantPluginVersionConfigDao().GetPluginConfig(as.ServiceID, pluginR.PluginID) + if err != nil && err != gorm.ErrRecordNotFound { + logrus.Errorf("get service plugin config from db failure %s", err.Error()) + } + if config != nil { + var resourceConfig api_model.ResourceSpec + if err := json.Unmarshal([]byte(config.ConfigStr), &resourceConfig); err != nil { + logrus.Warningf("load mesh plugin %s config of componet %s failure %s") + } + if len(resourceConfig.BaseServices) > 0 { + setSidecarContainerLifecycle(as, &pc, &resourceConfig) + } + } } if pluginModel == model.InitPlugin { if strings.ToLower(os.Getenv("DISABLE_INIT_CONTAINER_ENABLE_SECURITY")) != "true" { @@ -132,8 +156,10 @@ func conversionServicePlugin(as *typesv1.AppService, dbmanager db.Manager) ([]v1 pc.SecurityContext = &corev1.SecurityContext{Privileged: util.Bool(true)} } initContainers = append(initContainers, pc) + } else if preconatiner { + precontainers = append(precontainers, pc) } else { - containers = append(containers, pc) + postcontainers = append(postcontainers, pc) } } var inboundPluginConfig *api_model.ResourceSpec @@ -157,12 +183,12 @@ func conversionServicePlugin(as *typesv1.AppService, dbmanager db.Manager) ([]v1 } //if need proxy but not install net plugin if as.NeedProxy && !netPlugin { - pluginID, err := applyDefaultMeshPluginConfig(as, dbmanager) + pluginID, pluginConfig, err := applyDefaultMeshPluginConfig(as, dbmanager) if err != nil { logrus.Errorf("apply default mesh plugin config failure %s", err.Error()) } - c2 := createTCPDefaultPluginContainer(as, pluginID, mainContainer.Env) - containers = append(containers, c2) + defaultSidecarContainer := createTCPDefaultPluginContainer(as, pluginID, mainContainer.Env, pluginConfig) + precontainers = append(precontainers, defaultSidecarContainer) meshPluginID = pluginID } @@ -170,22 +196,60 @@ func conversionServicePlugin(as *typesv1.AppService, dbmanager db.Manager) ([]v1 if bootSeqDepServiceIds := as.ExtensionSet["boot_seq_dep_service_ids"]; as.NeedProxy && bootSeqDepServiceIds != "" { initContainers = append(initContainers, bootSequence) } - return initContainers, containers, &bootSequence, nil + as.BootSeqContainer = &bootSequence + return initContainers, precontainers, postcontainers, nil } -func createTCPDefaultPluginContainer(as *typesv1.AppService, pluginID string, envs []v1.EnvVar) v1.Container { +func createTCPDefaultPluginContainer(as *typesv1.AppService, pluginID string, envs []v1.EnvVar, pluginConfig *api_model.ResourceSpec) v1.Container { envs = append(envs, v1.EnvVar{Name: "PLUGIN_ID", Value: pluginID}) xdsHost, xdsHostPort, apiHostPort := getXDSHostIPAndPort() envs = append(envs, xdsHostIPEnv(xdsHost)) envs = append(envs, v1.EnvVar{Name: "API_HOST_PORT", Value: apiHostPort}) envs = append(envs, v1.EnvVar{Name: "XDS_HOST_PORT", Value: xdsHostPort}) - return v1.Container{ + container := v1.Container{ Name: "default-tcpmesh-" + as.ServiceID[len(as.ServiceID)-20:], Env: envs, Image: typesv1.GetTCPMeshImageName(), Resources: createTCPUDPMeshRecources(as), } + setSidecarContainerLifecycle(as, &container, pluginConfig) + return container +} + +func setSidecarContainerLifecycle(as *typesv1.AppService, con *corev1.Container, pluginConfig *api_model.ResourceSpec) { + if strings.ToLower(as.ExtensionSet["DISABLE_SIDECAR_CHECK"]) != "true" { + var port int + if as.ExtensionSet["SIDECAR_CHECK_PORT"] != "" { + c_port, _ := strconv.Atoi(as.ExtensionSet["SIDECAR_CHECK_PORT"]) + if c_port != 0 { + port = c_port + } + } + if port == 0 { + for _, dep := range pluginConfig.BaseServices { + if strings.ToLower(dep.Protocol) != "udp" { + port = dep.Port + break + } + } + if port == 0 { + for _, b_port := range pluginConfig.BasePorts { + if strings.ToLower(b_port.Protocol) != "udp" { + port = b_port.Port + break + } + } + } + } + con.Lifecycle = &corev1.Lifecycle{ + PostStart: &corev1.Handler{ + Exec: &v1.ExecAction{ + Command: []string{"/root/rainbond-mesh-data-panel", "wait", strconv.Itoa(port)}, + }, + }, + } + } } func createProbeMeshInitContainer(as *typesv1.AppService, pluginID, serviceAlias string, envs []v1.EnvVar) v1.Container { @@ -246,7 +310,7 @@ func ApplyPluginConfig(as *typesv1.AppService, servicePluginRelation *model.Tena } //applyDefaultMeshPluginConfig applyDefaultMeshPluginConfig -func applyDefaultMeshPluginConfig(as *typesv1.AppService, dbmanager db.Manager) (string, error) { +func applyDefaultMeshPluginConfig(as *typesv1.AppService, dbmanager db.Manager) (string, *api_model.ResourceSpec, error) { var baseServices []*api_model.BaseService deps, err := dbmanager.TenantServiceRelationDao().GetTenantServiceRelations(as.ServiceID) if err != nil { @@ -280,7 +344,7 @@ func applyDefaultMeshPluginConfig(as *typesv1.AppService, dbmanager db.Manager) } resJSON, err := json.Marshal(res) if err != nil { - return "", err + return "", nil, err } pluginID := "def-mesh" + as.ServiceID cm := &v1.ConfigMap{ @@ -297,7 +361,7 @@ func applyDefaultMeshPluginConfig(as *typesv1.AppService, dbmanager db.Manager) }, } as.SetConfigMap(cm) - return pluginID, nil + return pluginID, res, nil } func getPluginModel(pluginID, tenantID string, dbmanager db.Manager) (string, error) { @@ -375,19 +439,6 @@ func createPluginEnvs(pluginID, tenantID, serviceAlias string, mainEnvs []v1.Env return &envs, nil } -func pluginWeight(pluginModel string) int { - switch pluginModel { - case model.InBoundNetPlugin: - return 9 - case model.OutBoundNetPlugin: - return 8 - case model.GeneralPlugin: - return 1 - default: - return 0 - } -} - func createPluginResources(memory int, cpu int) v1.ResourceRequirements { return createResourcesByDefaultCPU(memory, int64(cpu), int64(cpu)) }