controls the mesh container to start first.

This commit is contained in:
barnettZQG 2021-06-17 09:48:00 +08:00
parent 283baf3695
commit fad9ffab8d
3 changed files with 161 additions and 30 deletions

View File

@ -21,6 +21,7 @@ package main
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"os"
"os/signal"
@ -39,6 +40,45 @@ func main() {
if len(os.Args) > 1 && os.Args[1] == "version" {
cmd.ShowVersion("sidecar")
}
if len(os.Args) > 1 && os.Args[1] == "wait" {
var timeoutSeconds = 60
var envoyReadyUrl = "http://127.0.0.1:65533/ready"
var envoyListennerReadyUrl = "http://127.0.0.1:65533/listeners"
var periodMillis = 500
var requestTimeoutMillis = 500
client := &http.Client{
Timeout: time.Duration(requestTimeoutMillis) * time.Millisecond,
}
logrus.Infof("Waiting for Envoy proxy to be ready (timeout: %d seconds)...", timeoutSeconds)
var err error
timeoutAt := time.Now().Add(time.Duration(timeoutSeconds) * time.Second)
for time.Now().Before(timeoutAt) {
err = checkEnvoyIfReady(client, envoyReadyUrl)
if err == nil {
logrus.Infof("Sidecar server is ready!")
break
}
logrus.Infof("Not ready yet: %v", err)
time.Sleep(time.Duration(periodMillis) * time.Millisecond)
}
if len(os.Args) > 2 && os.Args[2] != "0" {
for time.Now().Before(timeoutAt) {
err = checkEnvoyListenerIfReady(client, envoyListennerReadyUrl, os.Args[2])
if err == nil {
logrus.Infof("Sidecar is ready!")
os.Exit(0)
}
logrus.Infof("Not ready yet: %v", err)
time.Sleep(time.Duration(periodMillis) * time.Millisecond)
}
} else {
logrus.Infof("Sidecar is ready!")
os.Exit(0)
}
logrus.Errorf("timeout waiting for Envoy proxy to become ready. Last error: %v", err)
os.Exit(1)
}
if len(os.Args) > 1 && os.Args[1] == "run" {
if err := run(); err != nil {
os.Exit(1)
@ -166,3 +206,43 @@ func writeHosts(ipnames map[string]string) error {
}
return hosts.Flush()
}
func checkEnvoyIfReady(client *http.Client, envoyReadyUrl string) error {
req, err := http.NewRequest(http.MethodGet, envoyReadyUrl, nil)
if err != nil {
return err
}
resp, err := client.Do(req)
if err != nil {
return err
}
defer func() { _ = resp.Body.Close() }()
reBody, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
if resp.StatusCode != 200 || string(reBody) != "LIVE" {
return fmt.Errorf("HTTP status code %v ,body: %s", resp.StatusCode, string(reBody))
}
return nil
}
func checkEnvoyListenerIfReady(client *http.Client, url string, port string) error {
req, err := http.NewRequest(http.MethodGet, url, nil)
if err != nil {
return err
}
resp, err := client.Do(req)
if err != nil {
return err
}
defer func() { _ = resp.Body.Close() }()
reBody, err := ioutil.ReadAll(resp.Body)
if err != nil {
return err
}
if resp.StatusCode != 200 || !strings.Contains(string(reBody), fmt.Sprintf(":%s", port)) {
return fmt.Errorf("Check Listeners HTTP status code %v, body is %s", resp.StatusCode, string(reBody))
}
return nil
}

2
go.mod
View File

@ -110,7 +110,7 @@ require (
golang.org/x/text v0.3.5 // indirect
golang.org/x/time v0.0.0-20201208040808-7e3f01d25324
golang.org/x/tools v0.1.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/appengine v1.6.7
google.golang.org/genproto v0.0.0-20210218151259-fe80b386bf06 // indirect
google.golang.org/grpc v1.35.0
gopkg.in/alecthomas/kingpin.v2 v2.2.6

View File

@ -40,22 +40,27 @@ import (
//TenantServicePlugin conv service all plugin
func TenantServicePlugin(as *typesv1.AppService, dbmanager db.Manager) error {
initContainers, pluginContainers, bootSeqContainer, err := conversionServicePlugin(as, dbmanager)
initContainers, preContainers, postContainers, err := conversionServicePlugin(as, dbmanager)
if err != nil {
logrus.Errorf("create plugin containers for component %s failure: %s", as.ServiceID, err.Error())
return err
}
as.BootSeqContainer = bootSeqContainer
podtemplate := as.GetPodTemplate()
if podtemplate != nil {
podtemplate.Spec.Containers = append(podtemplate.Spec.Containers, pluginContainers...)
if len(preContainers) > 0 {
podtemplate.Spec.Containers = append(preContainers, podtemplate.Spec.Containers...)
}
if len(postContainers) > 0 {
podtemplate.Spec.Containers = append(podtemplate.Spec.Containers, postContainers...)
}
podtemplate.Spec.InitContainers = initContainers
return nil
}
return fmt.Errorf("pod templete is nil before define plugin")
}
func conversionServicePlugin(as *typesv1.AppService, dbmanager db.Manager) ([]v1.Container, []v1.Container, *v1.Container, error) {
var containers []v1.Container
func conversionServicePlugin(as *typesv1.AppService, dbmanager db.Manager) ([]v1.Container, []v1.Container, []v1.Container, error) {
var precontainers, postcontainers []v1.Container
var initContainers []v1.Container
appPlugins, err := dbmanager.TenantServicePluginRelationDao().GetALLRelationByServiceID(as.ServiceID)
if err != nil && err.Error() != gorm.ErrRecordNotFound.Error() {
@ -73,7 +78,8 @@ func conversionServicePlugin(as *typesv1.AppService, dbmanager db.Manager) ([]v1
var inBoundPlugin *model.TenantServicePluginRelation
for _, pluginR := range appPlugins {
//if plugin not enable,ignore it
if pluginR.Switch == false {
if !pluginR.Switch {
logrus.Debugf("plugin %s is disable in component %s", pluginR.PluginID, as.ServiceID)
continue
}
versionInfo, err := dbmanager.TenantPluginBuildVersionDao().GetLastBuildVersionByVersionID(pluginR.PluginID, pluginR.VersionID)
@ -119,12 +125,30 @@ func conversionServicePlugin(as *typesv1.AppService, dbmanager db.Manager) ([]v1
if err != nil {
return nil, nil, nil, fmt.Errorf("get plugin model info failure %s", err.Error())
}
var preconatiner = false
if pluginModel == model.InBoundAndOutBoundNetPlugin || pluginModel == model.InBoundNetPlugin {
inBoundPlugin = pluginR
preconatiner = true
}
if pluginModel == model.OutBoundNetPlugin || pluginModel == model.InBoundAndOutBoundNetPlugin {
netPlugin = true
meshPluginID = pluginR.PluginID
preconatiner = true
}
if netPlugin {
config, err := dbmanager.TenantPluginVersionConfigDao().GetPluginConfig(as.ServiceID, pluginR.PluginID)
if err != nil && err != gorm.ErrRecordNotFound {
logrus.Errorf("get service plugin config from db failure %s", err.Error())
}
if config != nil {
var resourceConfig api_model.ResourceSpec
if err := json.Unmarshal([]byte(config.ConfigStr), &resourceConfig); err != nil {
logrus.Warningf("load mesh plugin %s config of componet %s failure %s")
}
if len(resourceConfig.BaseServices) > 0 {
setSidecarContainerLifecycle(as, &pc, &resourceConfig)
}
}
}
if pluginModel == model.InitPlugin {
if strings.ToLower(os.Getenv("DISABLE_INIT_CONTAINER_ENABLE_SECURITY")) != "true" {
@ -132,8 +156,10 @@ func conversionServicePlugin(as *typesv1.AppService, dbmanager db.Manager) ([]v1
pc.SecurityContext = &corev1.SecurityContext{Privileged: util.Bool(true)}
}
initContainers = append(initContainers, pc)
} else if preconatiner {
precontainers = append(precontainers, pc)
} else {
containers = append(containers, pc)
postcontainers = append(postcontainers, pc)
}
}
var inboundPluginConfig *api_model.ResourceSpec
@ -157,12 +183,12 @@ func conversionServicePlugin(as *typesv1.AppService, dbmanager db.Manager) ([]v1
}
//if need proxy but not install net plugin
if as.NeedProxy && !netPlugin {
pluginID, err := applyDefaultMeshPluginConfig(as, dbmanager)
pluginID, pluginConfig, err := applyDefaultMeshPluginConfig(as, dbmanager)
if err != nil {
logrus.Errorf("apply default mesh plugin config failure %s", err.Error())
}
c2 := createTCPDefaultPluginContainer(as, pluginID, mainContainer.Env)
containers = append(containers, c2)
defaultSidecarContainer := createTCPDefaultPluginContainer(as, pluginID, mainContainer.Env, pluginConfig)
precontainers = append(precontainers, defaultSidecarContainer)
meshPluginID = pluginID
}
@ -170,22 +196,60 @@ func conversionServicePlugin(as *typesv1.AppService, dbmanager db.Manager) ([]v1
if bootSeqDepServiceIds := as.ExtensionSet["boot_seq_dep_service_ids"]; as.NeedProxy && bootSeqDepServiceIds != "" {
initContainers = append(initContainers, bootSequence)
}
return initContainers, containers, &bootSequence, nil
as.BootSeqContainer = &bootSequence
return initContainers, precontainers, postcontainers, nil
}
func createTCPDefaultPluginContainer(as *typesv1.AppService, pluginID string, envs []v1.EnvVar) v1.Container {
func createTCPDefaultPluginContainer(as *typesv1.AppService, pluginID string, envs []v1.EnvVar, pluginConfig *api_model.ResourceSpec) v1.Container {
envs = append(envs, v1.EnvVar{Name: "PLUGIN_ID", Value: pluginID})
xdsHost, xdsHostPort, apiHostPort := getXDSHostIPAndPort()
envs = append(envs, xdsHostIPEnv(xdsHost))
envs = append(envs, v1.EnvVar{Name: "API_HOST_PORT", Value: apiHostPort})
envs = append(envs, v1.EnvVar{Name: "XDS_HOST_PORT", Value: xdsHostPort})
return v1.Container{
container := v1.Container{
Name: "default-tcpmesh-" + as.ServiceID[len(as.ServiceID)-20:],
Env: envs,
Image: typesv1.GetTCPMeshImageName(),
Resources: createTCPUDPMeshRecources(as),
}
setSidecarContainerLifecycle(as, &container, pluginConfig)
return container
}
func setSidecarContainerLifecycle(as *typesv1.AppService, con *corev1.Container, pluginConfig *api_model.ResourceSpec) {
if strings.ToLower(as.ExtensionSet["DISABLE_SIDECAR_CHECK"]) != "true" {
var port int
if as.ExtensionSet["SIDECAR_CHECK_PORT"] != "" {
c_port, _ := strconv.Atoi(as.ExtensionSet["SIDECAR_CHECK_PORT"])
if c_port != 0 {
port = c_port
}
}
if port == 0 {
for _, dep := range pluginConfig.BaseServices {
if strings.ToLower(dep.Protocol) != "udp" {
port = dep.Port
break
}
}
if port == 0 {
for _, b_port := range pluginConfig.BasePorts {
if strings.ToLower(b_port.Protocol) != "udp" {
port = b_port.Port
break
}
}
}
}
con.Lifecycle = &corev1.Lifecycle{
PostStart: &corev1.Handler{
Exec: &v1.ExecAction{
Command: []string{"/root/rainbond-mesh-data-panel", "wait", strconv.Itoa(port)},
},
},
}
}
}
func createProbeMeshInitContainer(as *typesv1.AppService, pluginID, serviceAlias string, envs []v1.EnvVar) v1.Container {
@ -246,7 +310,7 @@ func ApplyPluginConfig(as *typesv1.AppService, servicePluginRelation *model.Tena
}
//applyDefaultMeshPluginConfig applyDefaultMeshPluginConfig
func applyDefaultMeshPluginConfig(as *typesv1.AppService, dbmanager db.Manager) (string, error) {
func applyDefaultMeshPluginConfig(as *typesv1.AppService, dbmanager db.Manager) (string, *api_model.ResourceSpec, error) {
var baseServices []*api_model.BaseService
deps, err := dbmanager.TenantServiceRelationDao().GetTenantServiceRelations(as.ServiceID)
if err != nil {
@ -280,7 +344,7 @@ func applyDefaultMeshPluginConfig(as *typesv1.AppService, dbmanager db.Manager)
}
resJSON, err := json.Marshal(res)
if err != nil {
return "", err
return "", nil, err
}
pluginID := "def-mesh" + as.ServiceID
cm := &v1.ConfigMap{
@ -297,7 +361,7 @@ func applyDefaultMeshPluginConfig(as *typesv1.AppService, dbmanager db.Manager)
},
}
as.SetConfigMap(cm)
return pluginID, nil
return pluginID, res, nil
}
func getPluginModel(pluginID, tenantID string, dbmanager db.Manager) (string, error) {
@ -375,19 +439,6 @@ func createPluginEnvs(pluginID, tenantID, serviceAlias string, mainEnvs []v1.Env
return &envs, nil
}
func pluginWeight(pluginModel string) int {
switch pluginModel {
case model.InBoundNetPlugin:
return 9
case model.OutBoundNetPlugin:
return 8
case model.GeneralPlugin:
return 1
default:
return 0
}
}
func createPluginResources(memory int, cpu int) v1.ResourceRequirements {
return createResourcesByDefaultCPU(memory, int64(cpu), int64(cpu))
}