fix: build_in_service_mesh component failed to start (#1531)

This commit is contained in:
张启航 2023-01-30 16:28:24 +08:00 committed by GitHub
parent a40ba31bf0
commit 14075cda80
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 175 additions and 157 deletions

View File

@ -525,17 +525,18 @@ func GetServiceAliasByService(service *corev1.Service) string {
}
//CreateDNSLoadAssignment create dns loadAssignment
func CreateDNSLoadAssignment(serviceAlias, namespace, domain string, service *corev1.Service) *apiv2.ClusterLoadAssignment {
func CreateDNSLoadAssignment(serviceAlias, namespace, domain string, service *corev1.Service, p corev1.ServicePort) *apiv2.ClusterLoadAssignment {
destServiceAlias := GetServiceAliasByService(service)
if destServiceAlias == "" {
logrus.Errorf("service alias is empty in k8s service %s", service.Name)
return nil
}
clusterName := fmt.Sprintf("%s_%s_%s_%d", namespace, serviceAlias, destServiceAlias, service.Spec.Ports[0].Port)
clusterName := fmt.Sprintf("%s_%s_%s_%d", namespace, serviceAlias, destServiceAlias, p.Port)
var lendpoints []*endpoint.LocalityLbEndpoints
protocol := service.Labels["port_protocol"]
port := service.Spec.Ports[0].Port
port := p.Port
portProtocol := fmt.Sprintf("port_protocol_%v", port)
protocol := service.Labels[portProtocol]
var lbe []*endpoint.LbEndpoint
envoyAddress := CreateSocketAddress(protocol, domain, uint32(port))
lbe = append(lbe, &endpoint.LbEndpoint{

View File

@ -77,52 +77,53 @@ func upstreamClusters(serviceAlias, namespace string, dependsServices []*api_mod
for _, service := range services {
inner, ok := service.Labels["service_type"]
destServiceAlias := GetServiceAliasByService(service)
port := service.Spec.Ports[0]
if !ok || inner != "inner" {
continue
}
getOptions := func() (d envoyv2.RainbondPluginOptions) {
relPort, _ := strconv.Atoi(service.Labels["origin_port"])
if relPort == 0 {
relPort = int(port.TargetPort.IntVal)
}
depServiceIndex := fmt.Sprintf("%s_%s_%s_%d", namespace, serviceAlias, GetServiceAliasByService(service), relPort)
if _, ok := clusterConfig[depServiceIndex]; ok {
return envoyv2.GetOptionValues(clusterConfig[depServiceIndex].Options)
}
return envoyv2.GetOptionValues(nil)
}
var clusterOption envoyv2.ClusterOptions
clusterOption.Name = fmt.Sprintf("%s_%s_%s_%v", namespace, serviceAlias, GetServiceAliasByService(service), port.Port)
options := getOptions()
clusterOption.OutlierDetection = envoyv2.CreatOutlierDetection(options)
clusterOption.CircuitBreakers = envoyv2.CreateCircuitBreaker(options)
clusterOption.ServiceName = fmt.Sprintf("%s_%s_%s_%v", namespace, serviceAlias, destServiceAlias, port.Port)
if domain, ok := service.Annotations["domain"]; ok && domain != "" {
logrus.Debugf("domain endpoint[%s], create logical_dns cluster: ", domain)
clusterOption.ClusterType = v2.Cluster_LOGICAL_DNS
clusterOption.LoadAssignment = envoyv2.CreateDNSLoadAssignment(serviceAlias, namespace, domain, service)
if strings.HasPrefix(domain, "https://") {
splitDomain := strings.Split(domain, "https://")
if len(splitDomain) == 2 {
clusterOption.TransportSocket = transportSocket(clusterOption.Name, splitDomain[1])
for _, port := range service.Spec.Ports {
getOptions := func() (d envoyv2.RainbondPluginOptions) {
relPort, _ := strconv.Atoi(service.Labels["origin_port"])
if relPort == 0 {
relPort = int(port.TargetPort.IntVal)
}
depServiceIndex := fmt.Sprintf("%s_%s_%s_%d", namespace, serviceAlias, GetServiceAliasByService(service), relPort)
if _, ok := clusterConfig[depServiceIndex]; ok {
return envoyv2.GetOptionValues(clusterConfig[depServiceIndex].Options)
}
return envoyv2.GetOptionValues(nil)
}
var clusterOption envoyv2.ClusterOptions
clusterOption.Name = fmt.Sprintf("%s_%s_%s_%v", namespace, serviceAlias, GetServiceAliasByService(service), port.Port)
options := getOptions()
clusterOption.OutlierDetection = envoyv2.CreatOutlierDetection(options)
clusterOption.CircuitBreakers = envoyv2.CreateCircuitBreaker(options)
clusterOption.ServiceName = fmt.Sprintf("%s_%s_%s_%v", namespace, serviceAlias, destServiceAlias, port.Port)
if domain, ok := service.Annotations["domain"]; ok && domain != "" {
logrus.Debugf("domain endpoint[%s], create logical_dns cluster: ", domain)
clusterOption.ClusterType = v2.Cluster_LOGICAL_DNS
clusterOption.LoadAssignment = envoyv2.CreateDNSLoadAssignment(serviceAlias, namespace, domain, service, port)
if strings.HasPrefix(domain, "https://") {
splitDomain := strings.Split(domain, "https://")
if len(splitDomain) == 2 {
clusterOption.TransportSocket = transportSocket(clusterOption.Name, splitDomain[1])
}
}
} else {
clusterOption.ClusterType = v2.Cluster_EDS
}
clusterOption.HealthyPanicThreshold = options.HealthyPanicThreshold
clusterOption.ConnectionTimeout = envoyv2.ConverTimeDuration(options.ConnectionTimeout)
// set port realy protocol
portProtocol := service.Labels[fmt.Sprintf("port_protocol_%v", port)]
clusterOption.Protocol = portProtocol
clusterOption.GrpcHealthServiceName = options.GrpcHealthServiceName
clusterOption.HealthTimeout = options.HealthCheckTimeout
clusterOption.HealthInterval = options.HealthCheckInterval
cluster := envoyv2.CreateCluster(clusterOption)
if cluster != nil {
logrus.Debugf("cluster is : %v", cluster)
cdsClusters = append(cdsClusters, cluster)
}
} else {
clusterOption.ClusterType = v2.Cluster_EDS
}
clusterOption.HealthyPanicThreshold = options.HealthyPanicThreshold
clusterOption.ConnectionTimeout = envoyv2.ConverTimeDuration(options.ConnectionTimeout)
// set port realy protocol
portProtocol := service.Labels["port_protocol"]
clusterOption.Protocol = portProtocol
clusterOption.GrpcHealthServiceName = options.GrpcHealthServiceName
clusterOption.HealthTimeout = options.HealthCheckTimeout
clusterOption.HealthInterval = options.HealthCheckInterval
cluster := envoyv2.CreateCluster(clusterOption)
if cluster != nil {
logrus.Debugf("cluster is : %v", cluster)
cdsClusters = append(cdsClusters, cluster)
}
}
return

View File

@ -44,7 +44,6 @@ func OneNodeClusterLoadAssignment(serviceAlias, namespace string, endpoints []*c
logrus.Errorf("service alias is empty in k8s service %s", service.Name)
continue
}
clusterName := fmt.Sprintf("%s_%s_%s_%d", namespace, serviceAlias, destServiceAlias, service.Spec.Ports[0].Port)
selectEndpoint := getEndpointsByServiceName(endpoints, service.Name)
logrus.Debugf("select endpoints %d for service %s", len(selectEndpoint), service.Name)
var lendpoints []*endpoint.LocalityLbEndpoints // localityLbEndpoints just support only one content
@ -107,14 +106,17 @@ func OneNodeClusterLoadAssignment(serviceAlias, namespace string, endpoints []*c
lendpoints = append(lendpoints, &endpoint.LocalityLbEndpoints{LbEndpoints: lbe})
}
}
cla := &v2.ClusterLoadAssignment{
ClusterName: clusterName,
Endpoints: lendpoints,
}
if err := cla.Validate(); err != nil {
logrus.Errorf("endpoints discover validate failure %s", err.Error())
} else {
clusterLoadAssignment = append(clusterLoadAssignment, cla)
for _, p := range service.Spec.Ports {
clusterName := fmt.Sprintf("%s_%s_%s_%d", namespace, serviceAlias, destServiceAlias, p.Port)
cla := &v2.ClusterLoadAssignment{
ClusterName: clusterName,
Endpoints: lendpoints,
}
if err := cla.Validate(); err != nil {
logrus.Errorf("endpoints discover validate failure %s", err.Error())
} else {
clusterLoadAssignment = append(clusterLoadAssignment, cla)
}
}
}
if len(clusterLoadAssignment) == 0 {

View File

@ -113,111 +113,113 @@ func upstreamListener(serviceAlias, namespace string, dependsServices []*api_mod
if !ok || inner != "inner" {
continue
}
port := service.Spec.Ports[0].Port
protocol := service.Spec.Ports[0].Protocol
var ListenPort = port
//listener real port
if value, ok := service.Labels["origin_port"]; ok {
origin, _ := strconv.Atoi(value)
if origin != 0 {
ListenPort = int32(origin)
}
}
clusterName := fmt.Sprintf("%s_%s_%s_%d", namespace, serviceAlias, GetServiceAliasByService(service), port)
listennerName := fmt.Sprintf("%s_%s_%s_%s_%d", namespace, serviceAlias, GetServiceAliasByService(service), strings.ToLower(string(protocol)), ListenPort)
destService := ListennerConfig[listennerName]
statPrefix := fmt.Sprintf("%s_%s", serviceAlias, GetServiceAliasByService(service))
var options envoyv2.RainbondPluginOptions
if destService != nil {
options = envoyv2.GetOptionValues(destService.Options)
} else {
logrus.Warningf("destService is nil for service %s listenner name %s", serviceAlias, listennerName)
}
// Unique by listen port
if _, ok := portMap[ListenPort]; !ok {
//listener name depend listner port
listenerName := fmt.Sprintf("%s_%s_%d", namespace, serviceAlias, ListenPort)
var listener *v2.Listener
protocol := service.Labels["port_protocol"]
if domain, ok := service.Annotations["domain"]; ok && domain != "" && (protocol == "https" || protocol == "http" || protocol == "grpc") {
route := envoyv2.CreateRouteWithHostRewrite(domain, clusterName, "/", nil, 0)
if route != nil {
pvh := envoyv2.CreateRouteVirtualHost(
fmt.Sprintf("%s_%s_%s_%d", namespace, serviceAlias, GetServiceAliasByService(service), port),
[]string{"*"},
nil,
route,
)
if pvh != nil {
listener = envoyv2.CreateHTTPListener(fmt.Sprintf("%s_%s_http_%d", namespace, serviceAlias, port), envoyv2.DefaultLocalhostListenerAddress, fmt.Sprintf("%s_%d", serviceAlias, port), uint32(port), nil, pvh)
} else {
logrus.Warnf("create route virtual host of domain listener %s failure", fmt.Sprintf("%s_%s_http_%d", namespace, serviceAlias, port))
}
for _, p := range service.Spec.Ports {
port := p.Port
protocol := p.Protocol
var ListenPort = port
//listener real port
if value, ok := service.Labels["origin_port"]; ok {
origin, _ := strconv.Atoi(value)
if origin != 0 {
ListenPort = int32(origin)
}
} else if protocol == "udp" {
listener = envoyv2.CreateUDPListener(listenerName, clusterName, envoyv2.DefaultLocalhostListenerAddress, statPrefix, uint32(ListenPort))
} else {
listener = envoyv2.CreateTCPListener(listenerName, clusterName, envoyv2.DefaultLocalhostListenerAddress, statPrefix, uint32(ListenPort), options.TCPIdleTimeout)
}
if listener != nil {
ldsL = append(ldsL, listener)
clusterName := fmt.Sprintf("%s_%s_%s_%d", namespace, serviceAlias, GetServiceAliasByService(service), port)
listennerName := fmt.Sprintf("%s_%s_%s_%s_%d", namespace, serviceAlias, GetServiceAliasByService(service), strings.ToLower(string(protocol)), ListenPort)
destService := ListennerConfig[listennerName]
statPrefix := fmt.Sprintf("%s_%s", serviceAlias, GetServiceAliasByService(service))
var options envoyv2.RainbondPluginOptions
if destService != nil {
options = envoyv2.GetOptionValues(destService.Options)
} else {
logrus.Warningf("create tcp listenner %s failure", listenerName)
continue
logrus.Warningf("destService is nil for service %s listenner name %s", serviceAlias, listennerName)
}
portMap[ListenPort] = len(ldsL) - 1
}
portProtocol, _ := service.Labels["port_protocol"]
if destService != nil && destService.Protocol != "" {
portProtocol = destService.Protocol
}
if portProtocol != "" {
//TODO: support more protocol
switch portProtocol {
case "http", "https", "grpc":
hashKey := options.RouteBasicHash()
if oldroute, ok := uniqRoute[hashKey]; ok {
oldrr := oldroute.Action.(*route.Route_Route)
if oldrrwc, ok := oldrr.Route.ClusterSpecifier.(*route.RouteAction_WeightedClusters); ok {
weight := envoyv2.CheckWeightSum(oldrrwc.WeightedClusters.Clusters, options.Weight)
oldrrwc.WeightedClusters.Clusters = append(oldrrwc.WeightedClusters.Clusters, &route.WeightedCluster_ClusterWeight{
Name: clusterName,
Weight: envoyv2.ConversionUInt32(weight),
})
}
} else {
var headerMatchers []*route.HeaderMatcher
for _, header := range options.Headers {
headerMatcher := envoyv2.CreateHeaderMatcher(header)
if headerMatcher != nil {
headerMatchers = append(headerMatchers, headerMatcher)
// Unique by listen port
if _, ok := portMap[ListenPort]; !ok {
//listener name depend listner port
listenerName := fmt.Sprintf("%s_%s_%d", namespace, serviceAlias, ListenPort)
var listener *v2.Listener
portProtocol := fmt.Sprintf("port_protocol_%v", port)
protocol := service.Labels[portProtocol]
if domain, ok := service.Annotations["domain"]; ok && domain != "" && (protocol == "https" || protocol == "http" || protocol == "grpc") {
route := envoyv2.CreateRouteWithHostRewrite(domain, clusterName, "/", nil, 0)
if route != nil {
pvh := envoyv2.CreateRouteVirtualHost(
fmt.Sprintf("%s_%s_%s_%d", namespace, serviceAlias, GetServiceAliasByService(service), port),
[]string{"*"},
nil,
route,
)
if pvh != nil {
listener = envoyv2.CreateHTTPListener(fmt.Sprintf("%s_%s_http_%d", namespace, serviceAlias, port), envoyv2.DefaultLocalhostListenerAddress, fmt.Sprintf("%s_%d", serviceAlias, port), uint32(port), nil, pvh)
} else {
logrus.Warnf("create route virtual host of domain listener %s failure", fmt.Sprintf("%s_%s_http_%d", namespace, serviceAlias, port))
}
}
var route *route.Route
if domain, ok := service.Annotations["domain"]; ok && domain != "" {
route = envoyv2.CreateRouteWithHostRewrite(domain, clusterName, options.Prefix, headerMatchers, options.Weight)
} else {
route = envoyv2.CreateRoute(clusterName, options.Prefix, headerMatchers, options.Weight)
}
} else if protocol == "udp" {
listener = envoyv2.CreateUDPListener(listenerName, clusterName, envoyv2.DefaultLocalhostListenerAddress, statPrefix, uint32(ListenPort))
} else {
listener = envoyv2.CreateTCPListener(listenerName, clusterName, envoyv2.DefaultLocalhostListenerAddress, statPrefix, uint32(ListenPort), options.TCPIdleTimeout)
}
if listener != nil {
ldsL = append(ldsL, listener)
} else {
logrus.Warningf("create tcp listenner %s failure", listenerName)
continue
}
portMap[ListenPort] = len(ldsL) - 1
}
portProtocol := service.Labels[fmt.Sprintf("port_protocol_%v", port)]
if destService != nil && destService.Protocol != "" {
portProtocol = destService.Protocol
}
if route != nil {
if pvh := VHLDomainMap[strings.Join(options.Domains, "")]; pvh != nil {
pvh.Routes = append(pvh.Routes, route)
if portProtocol != "" {
//TODO: support more protocol
switch portProtocol {
case "http", "https", "grpc":
hashKey := options.RouteBasicHash()
if oldroute, ok := uniqRoute[hashKey]; ok {
oldrr := oldroute.Action.(*route.Route_Route)
if oldrrwc, ok := oldrr.Route.ClusterSpecifier.(*route.RouteAction_WeightedClusters); ok {
weight := envoyv2.CheckWeightSum(oldrrwc.WeightedClusters.Clusters, options.Weight)
oldrrwc.WeightedClusters.Clusters = append(oldrrwc.WeightedClusters.Clusters, &route.WeightedCluster_ClusterWeight{
Name: clusterName,
Weight: envoyv2.ConversionUInt32(weight),
})
}
} else {
var headerMatchers []*route.HeaderMatcher
for _, header := range options.Headers {
headerMatcher := envoyv2.CreateHeaderMatcher(header)
if headerMatcher != nil {
headerMatchers = append(headerMatchers, headerMatcher)
}
}
var route *route.Route
if domain, ok := service.Annotations["domain"]; ok && domain != "" {
route = envoyv2.CreateRouteWithHostRewrite(domain, clusterName, options.Prefix, headerMatchers, options.Weight)
} else {
pvh := envoyv2.CreateRouteVirtualHost(fmt.Sprintf("%s_%s_%s_%d", namespace, serviceAlias,
GetServiceAliasByService(service), port), envoyv2.CheckDomain(options.Domains, portProtocol), nil, route)
if pvh != nil {
newVHL = append(newVHL, pvh)
uniqRoute[hashKey] = route
VHLDomainMap[strings.Join(options.Domains, "")] = pvh
route = envoyv2.CreateRoute(clusterName, options.Prefix, headerMatchers, options.Weight)
}
if route != nil {
if pvh := VHLDomainMap[strings.Join(options.Domains, "")]; pvh != nil {
pvh.Routes = append(pvh.Routes, route)
} else {
pvh := envoyv2.CreateRouteVirtualHost(fmt.Sprintf("%s_%s_%s_%d", namespace, serviceAlias,
GetServiceAliasByService(service), port), envoyv2.CheckDomain(options.Domains, portProtocol), nil, route)
if pvh != nil {
newVHL = append(newVHL, pvh)
uniqRoute[hashKey] = route
VHLDomainMap[strings.Join(options.Domains, "")] = pvh
}
}
}
}
default:
continue
}
default:
continue
}
}
}

View File

@ -183,15 +183,17 @@ func (d *DiscoverServerManager) UpdateNodeConfig(nc *NodeConfig) error {
if selector != nil {
upServices, upEndpoints := d.GetServicesAndEndpoints(nc.namespace, selector)
for i, service := range upServices {
listenPort := service.Spec.Ports[0].Port
if value, ok := service.Labels["origin_port"]; ok {
origin, _ := strconv.Atoi(value)
if origin != 0 {
listenPort = int32(origin)
for _, p := range service.Spec.Ports {
listenPort := p.Port
if value, ok := service.Labels["origin_port"]; ok {
origin, _ := strconv.Atoi(value)
if origin != 0 {
listenPort = int32(origin)
}
}
if listenPort == int32(dep.Port) {
services = append(services, upServices[i])
}
}
if listenPort == int32(dep.Port) {
services = append(services, upServices[i])
}
}
for i, end := range upEndpoints {

View File

@ -428,6 +428,8 @@ func (a *AppServiceBuild) createInnerService(ports []*model.TenantServicesPort)
if servicePort.Port == 0 {
servicePort.Port = int32(port.ContainerPort)
}
portProtocol := fmt.Sprintf("port_protocol_%v", servicePort.Port)
service.Labels[portProtocol] = port.Protocol
servicePorts = append(servicePorts, servicePort)
}
spec := corev1.ServiceSpec{

View File

@ -1,6 +1,8 @@
package conversion
import (
"fmt"
"strconv"
"time"
"github.com/goodrain/rainbond/db"
@ -71,14 +73,20 @@ func createServiceMonitor(as *v1.AppService, dbmanager db.Manager) []*mv1.Servic
sm.Name = tsm.Name
sm.Labels = as.GetCommonLabels()
sm.Namespace = as.GetNamespace()
var portProtocol string
for _, p := range service.Spec.Ports {
if int(p.Port) == tsm.Port {
portProtocol = service.Labels[fmt.Sprintf("port_protocol_%v", p.Port)]
}
}
sm.Spec = mv1.ServiceMonitorSpec{
// service label app_name
JobLabel: "app_name",
NamespaceSelector: mv1.NamespaceSelector{Any: true},
Selector: metav1.LabelSelector{
MatchLabels: map[string]string{
"service_port": service.Labels["service_port"],
"port_protocol": service.Labels["port_protocol"],
"service_port": strconv.Itoa(tsm.Port),
"port_protocol": portProtocol,
"name": service.Labels["name"],
"service_type": service.Labels["service_type"],
},