explicitly dependent services in startup sequence

This commit is contained in:
GLYASAI 2019-12-06 11:24:45 +08:00
parent 27e05ca690
commit 14e6ad04c6
5 changed files with 180 additions and 57 deletions

View File

@ -22,19 +22,15 @@ import (
"context"
"fmt"
"os"
"strconv"
"strings"
"time"
"github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
"google.golang.org/grpc"
"github.com/Sirupsen/logrus"
v2 "github.com/envoyproxy/go-control-plane/envoy/api/v2"
"github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
endpointapi "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint"
envoyv2 "github.com/goodrain/rainbond/node/core/envoy/v2"
"github.com/Sirupsen/logrus"
"google.golang.org/grpc"
)
//DependServiceHealthController Detect the health of the dependent service
@ -43,15 +39,16 @@ import (
//------- cds: discover all dependent services
//------- sds: every service has at least one Ready instance
type DependServiceHealthController struct {
listeners []v2.Listener
clusters []v2.Cluster
sdsHost []v2.ClusterLoadAssignment
interval time.Duration
envoyDiscoverVersion string //only support v2
checkFunc []func() bool
endpointClient v2.EndpointDiscoveryServiceClient
dependServiceCount int
clusterID string
listeners []v2.Listener
clusters []v2.Cluster
sdsHost []v2.ClusterLoadAssignment
interval time.Duration
envoyDiscoverVersion string //only support v2
checkFunc []func() bool
endpointClient v2.EndpointDiscoveryServiceClient
dependServiceCount int
clusterID string
dependServiceClusterNames []string
}
//NewDependServiceHealthController create a controller
@ -79,13 +76,10 @@ func NewDependServiceHealthController() (*DependServiceHealthController, error)
if err != nil {
return nil, err
}
if dependCount, err := strconv.Atoi(os.Getenv("DEPEND_SERVICE_COUNT")); err == nil {
dsc.dependServiceCount = dependCount
} else {
depServices := os.Getenv("DEPEND_SERVICE")
dsc.dependServiceCount = len(strings.Split(depServices, ","))
}
dsc.endpointClient = v2.NewEndpointDiscoveryServiceClient(cli)
dsc.dependServiceClusterNames = strings.Split(os.Getenv("DEPEND_SERVICE_CLUSTER_NAMES"), ",")
return &dsc, nil
}
@ -123,6 +117,7 @@ func (d *DependServiceHealthController) checkClusters() bool {
func (d *DependServiceHealthController) checkEDS() bool {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
res, err := d.endpointClient.FetchEndpoints(ctx, &v2.DiscoveryRequest{
Node: &core.Node{
Cluster: d.clusterID,
@ -133,21 +128,34 @@ func (d *DependServiceHealthController) checkEDS() bool {
logrus.Errorf("discover depend services endpoint failure %s", err.Error())
return false
}
endpoints := envoyv2.ParseLocalityLbEndpointsResource(res.Resources)
readyLength := 0
for _, endpoint := range endpoints {
if len(endpoint.Endpoints) > 0 && len(endpoint.Endpoints[0].LbEndpoints) > 0 {
//first LbEndpoints healthy is not nil. so endpoint is not notreadyaddress
if host, ok := endpoint.Endpoints[0].LbEndpoints[0].HostIdentifier.(*endpointapi.LbEndpoint_Endpoint); ok {
if host.Endpoint != nil && host.Endpoint.HealthCheckConfig != nil {
readyLength++
logrus.Infof("depend service (%s) start complete, need waiting service count %d", endpoint.ClusterName, d.dependServiceCount-readyLength)
clusterLoadAssignments := envoyv2.ParseLocalityLbEndpointsResource(res.Resources)
readyClusters := make(map[string]bool, len(clusterLoadAssignments))
for _, cla := range clusterLoadAssignments {
// clusterName := fmt.Sprintf("%s_%s_%s_%d", namespace, serviceAlias, destServiceAlias, service.Spec.Ports[0].Port)
clusterName := cla.GetClusterName()
ready := func() bool {
if len(cla.Endpoints) > 0 && len(cla.Endpoints[0].LbEndpoints) > 0 {
// first LbEndpoints healthy is not nil. so endpoint is not notreadyaddress
if host, ok := cla.Endpoints[0].LbEndpoints[0].HostIdentifier.(*endpointapi.LbEndpoint_Endpoint); ok {
if host.Endpoint != nil && host.Endpoint.HealthCheckConfig != nil {
logrus.Infof("depend service (%s) start complete", cla.ClusterName)
return true
}
}
}
return false
}()
readyClusters[clusterName] = ready
}
for _, cn := range d.dependServiceClusterNames {
if _, ok := readyClusters[cn]; !ok {
return false
}
}
if readyLength >= d.dependServiceCount {
return true
}
return false
logrus.Info("all dependent services have been started.")
return true
}

View File

@ -110,6 +110,7 @@ type TenantServicesPortDao interface {
DELPortsByServiceID(serviceID string) error
HasOpenPort(sid string) bool
DelByServiceID(sid string) error
ListInnerPortsByServiceIDs(serviceIDs []string) ([]*model.TenantServicesPort, error)
}
//TenantPluginDao TenantPluginDao

View File

@ -632,6 +632,16 @@ func (t *TenantServicesPortDaoImpl) DelByServiceID(sid string) error {
return t.DB.Where("service_id=?", sid).Delete(&model.TenantServicesPort{}).Error
}
// ListInnerPortsByServiceIDs -
func (t *TenantServicesPortDaoImpl) ListInnerPortsByServiceIDs(serviceIDs []string) ([]*model.TenantServicesPort, error) {
var ports []*model.TenantServicesPort
if err := t.DB.Where("service_id in (?) and is_inner_service=?", serviceIDs, true).Find(&ports).Error; err != nil {
return nil, err
}
return ports, nil
}
//TenantServiceRelationDaoImpl TenantServiceRelationDaoImpl
type TenantServiceRelationDaoImpl struct {
DB *gorm.DB

View File

@ -132,3 +132,87 @@ func TestTenantServicesDao_GetOpenedPort(t *testing.T) {
t.Errorf("Expected 3 for the length of ports, but return %d", len(ports))
}
}
func TestListInnerPorts(t *testing.T) {
dbname := "region"
rootpw := "rainbond"
ctx := context.Background()
req := testcontainers.ContainerRequest{
Image: "mariadb",
ExposedPorts: []string{"3306/tcp"},
Env: map[string]string{
"MYSQL_ROOT_PASSWORD": rootpw,
"MYSQL_DATABASE": dbname,
},
Cmd: "--character-set-server=utf8mb4 --collation-server=utf8mb4_unicode_ci",
}
mariadb, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{
ContainerRequest: req,
Started: true,
})
if err != nil {
t.Fatal(err)
}
defer mariadb.Terminate(ctx)
host, err := mariadb.Host(ctx)
if err != nil {
t.Error(err)
}
port, err := mariadb.MappedPort(ctx, "3306")
if err != nil {
t.Error(err)
}
connInfo := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s", "root",
rootpw, host, port.Int(), dbname)
tryTimes := 3
for {
if err := CreateManager(dbconfig.Config{
DBType: "mysql",
MysqlConnectionInfo: connInfo,
}); err != nil {
if tryTimes == 0 {
t.Fatalf("Connect info: %s; error creating db manager: %v", connInfo, err)
} else {
tryTimes = tryTimes - 1
time.Sleep(10 * time.Second)
continue
}
}
break
}
sid := util.NewUUID()
trueVal := true
falseVal := false
err = GetManager().TenantServicesPortDao().AddModel(&model.TenantServicesPort{
ServiceID: sid,
ContainerPort: 1111,
MappingPort: 1111,
IsInnerService: &trueVal,
IsOuterService: &trueVal,
})
if err != nil {
t.Fatal(err)
}
err = GetManager().TenantServicesPortDao().AddModel(&model.TenantServicesPort{
ServiceID: sid,
ContainerPort: 2222,
MappingPort: 2222,
IsInnerService: &trueVal,
IsOuterService: &falseVal,
})
if err != nil {
t.Fatal(err)
}
ports, err := GetManager().TenantServicesPortDao().ListInnerPortsByServiceIDs([]string{sid})
if err != nil {
t.Fatal(err)
}
if len(ports) != 2 {
t.Errorf("Expocted %d for ports, but got %d", 2, len(ports))
}
}

View File

@ -178,30 +178,49 @@ func createEnv(as *v1.AppService, dbmanager db.Manager) (*[]corev1.EnvVar, error
}
//set service all dependces ids
as.Dependces = relationIDs
if len(relationIDs) > 0 {
es, err := dbmanager.TenantServiceEnvVarDao().GetDependServiceEnvs(relationIDs, []string{"outer", "both"})
if err != nil {
return nil, err
}
if es != nil {
envsAll = append(envsAll, es...)
}
serviceAliass, err := dbmanager.TenantServiceDao().GetServiceAliasByIDs(relationIDs)
if err != nil {
return nil, err
}
var Depend string
for _, sa := range serviceAliass {
if Depend != "" {
Depend += ","
}
Depend += fmt.Sprintf("%s:%s", sa.ServiceAlias, sa.ServiceID)
}
envs = append(envs, corev1.EnvVar{Name: "DEPEND_SERVICE", Value: Depend})
envs = append(envs, corev1.EnvVar{Name: "DEPEND_SERVICE_COUNT", Value: strconv.Itoa(len(serviceAliass))})
as.NeedProxy = true
es, err := dbmanager.TenantServiceEnvVarDao().GetDependServiceEnvs(relationIDs, []string{"outer", "both"})
if err != nil {
return nil, err
}
if es != nil {
envsAll = append(envsAll, es...)
}
serviceAliass, err := dbmanager.TenantServiceDao().GetServiceAliasByIDs(relationIDs)
if err != nil {
return nil, err
}
var Depend string
for _, sa := range serviceAliass {
if Depend != "" {
Depend += ","
}
Depend += fmt.Sprintf("%s:%s", sa.ServiceAlias, sa.ServiceID)
}
envs = append(envs, corev1.EnvVar{Name: "DEPEND_SERVICE", Value: Depend})
envs = append(envs, corev1.EnvVar{Name: "DEPEND_SERVICE_COUNT", Value: strconv.Itoa(len(serviceAliass))})
sid2alias := make(map[string]string, len(serviceAliass))
for _, alias := range serviceAliass {
sid2alias[alias.ServiceID] = alias.ServiceAlias
}
var clusterNames []string
ports, err := dbmanager.TenantServicesPortDao().ListInnerPortsByServiceIDs(relationIDs)
for _, port := range ports {
depServiceAlias, ok := sid2alias[port.ServiceID]
if !ok {
logrus.Warningf("service id: %s; service alias not found", port.ServiceID)
continue
}
clusterName := fmt.Sprintf("%s_%s_%s_%d", as.TenantID, as.ServiceAlias, depServiceAlias, port.ContainerPort)
clusterNames = append(clusterNames, clusterName)
}
envs = append(envs, corev1.EnvVar{Name: "DEPEND_SERVICE_CLUSTER_NAMES", Value: strings.Join(clusterNames, ",")})
as.NeedProxy = true
}
//set app relation env
relations, err = dbmanager.TenantServiceRelationDao().GetTenantServiceRelationsByDependServiceID(as.ServiceID)
if err != nil {
@ -227,6 +246,7 @@ func createEnv(as *v1.AppService, dbmanager db.Manager) (*[]corev1.EnvVar, error
envs = append(envs, corev1.EnvVar{Name: "REVERSE_DEPEND_SERVICE", Value: Depend})
}
}
//set app port and net env
ports, err := dbmanager.TenantServicesPortDao().GetPortsByServiceID(as.ServiceID)
if err != nil {