fix service mesh not support multiple port issues

This commit is contained in:
barnettZQG 2020-01-08 12:20:14 +08:00
parent 14187d84c9
commit da0f653218
2 changed files with 57 additions and 57 deletions

View File

@ -45,68 +45,53 @@ func OneNodeClusterLoadAssignment(serviceAlias, namespace string, endpoints []*c
continue
}
clusterName := fmt.Sprintf("%s_%s_%s_%d", namespace, serviceAlias, destServiceAlias, service.Spec.Ports[0].Port)
name := fmt.Sprintf("%sService", destServiceAlias)
if destServiceAlias == serviceAlias {
name = fmt.Sprintf("%sServiceOUT", destServiceAlias)
}
selectEndpoint := getEndpointsByLables(endpoints, map[string]string{"name": name})
selectEndpoint := getEndpointsByServiceName(endpoints, service.Name)
logrus.Debugf("select endpoints %d for service %s", len(selectEndpoint), service.Name)
var lendpoints []endpoint.LocalityLbEndpoints // localityLbEndpoints just support only one content
if len(selectEndpoint) > 0 {
for _, en := range selectEndpoint {
if len(en.Subsets) > 0 {
subset := en.Subsets[0]
if len(subset.Ports) < 1 {
continue
for _, en := range selectEndpoint {
for _, subset := range en.Subsets {
for _, port := range subset.Ports {
toport := int(port.Port)
if serviceAlias == destServiceAlias {
//use real port
if originPort, ok := service.Labels["origin_port"]; ok {
origin, err := strconv.Atoi(originPort)
if err == nil {
toport = origin
}
}
}
for _, port := range subset.Ports {
toport := int(port.Port)
//if haven multiple port, will get other port endpoint
//so must ignore
if (len(service.Spec.Ports) == 0 || service.Spec.Ports[0].TargetPort.IntVal != int32(toport)) && en.Labels["service_kind"] != "third_party" {
continue
protocol := string(port.Protocol)
addressList := subset.Addresses
var notready bool
if len(addressList) == 0 && len(subset.NotReadyAddresses) > 0 {
notready = true
addressList = subset.NotReadyAddresses[:1]
}
getHealty := func() *endpoint.Endpoint_HealthCheckConfig {
if notready {
return nil
}
if serviceAlias == destServiceAlias {
if originPort, ok := service.Labels["origin_port"]; ok {
origin, err := strconv.Atoi(originPort)
if err == nil {
toport = origin
}
}
return &endpoint.Endpoint_HealthCheckConfig{
PortValue: uint32(toport),
}
protocol := string(subset.Ports[0].Protocol)
addressList := subset.Addresses
var notready bool
if len(addressList) == 0 {
notready = true
addressList = subset.NotReadyAddresses
}
getHealty := func() *endpoint.Endpoint_HealthCheckConfig {
if notready {
return nil
}
return &endpoint.Endpoint_HealthCheckConfig{
PortValue: uint32(toport),
}
}
var lbe []endpoint.LbEndpoint // just support one content
if len(addressList) > 0 {
envoyAddress := envoyv2.CreateSocketAddress(protocol, addressList[0].IP, uint32(toport))
lbe = append(lbe, endpoint.LbEndpoint{
HostIdentifier: &endpoint.LbEndpoint_Endpoint{
Endpoint: &endpoint.Endpoint{
Address: &envoyAddress,
HealthCheckConfig: getHealty(),
},
}
var lbe []endpoint.LbEndpoint // just support one content
for _, address := range addressList {
envoyAddress := envoyv2.CreateSocketAddress(protocol, address.IP, uint32(toport))
lbe = append(lbe, endpoint.LbEndpoint{
HostIdentifier: &endpoint.LbEndpoint_Endpoint{
Endpoint: &endpoint.Endpoint{
Address: &envoyAddress,
HealthCheckConfig: getHealty(),
},
})
}
lendpoints = append(lendpoints, endpoint.LocalityLbEndpoints{LbEndpoints: lbe})
},
})
}
lendpoints = append(lendpoints, endpoint.LocalityLbEndpoints{LbEndpoints: lbe})
}
}
}
cla := &v2.ClusterLoadAssignment{
ClusterName: clusterName,
Endpoints: lendpoints,
@ -138,3 +123,12 @@ func getEndpointsByLables(endpoints []*corev1.Endpoints, slabels map[string]stri
}
return
}
func getEndpointsByServiceName(endpoints []*corev1.Endpoints, serviceName string) (re []*corev1.Endpoints) {
for _, en := range endpoints {
if serviceName == en.Name {
re = append(re, en)
}
}
return
}

View File

@ -23,6 +23,7 @@ import (
"fmt"
"net"
"reflect"
"strconv"
"sync"
"sync/atomic"
"time"
@ -183,7 +184,14 @@ func (d *DiscoverServerManager) UpdateNodeConfig(nc *NodeConfig) error {
if selector != nil {
upServices, upEndpoints := d.GetServicesAndEndpoints(nc.namespace, selector)
for i, service := range upServices {
if service.Spec.Ports[0].TargetPort.IntVal == int32(dep.Port) {
listenPort := service.Spec.Ports[0].Port
if value, ok := service.Labels["origin_port"]; ok {
origin, _ := strconv.Atoi(value)
if origin != 0 {
listenPort = int32(origin)
}
}
if listenPort == int32(dep.Port) {
services = append(services, upServices[i])
}
}
@ -191,9 +199,7 @@ func (d *DiscoverServerManager) UpdateNodeConfig(nc *NodeConfig) error {
if len(end.Subsets) == 0 || len(end.Subsets[0].Ports) == 0 {
continue
}
if end.Subsets[0].Ports[0].Port == int32(dep.Port) {
endpoint = append(endpoint, upEndpoints[i])
}
endpoint = append(endpoint, upEndpoints[i])
}
}
}