From da0f653218091d0849d0164c1aa7fe3029d0c87e Mon Sep 17 00:00:00 2001 From: barnettZQG Date: Wed, 8 Jan 2020 12:20:14 +0800 Subject: [PATCH] fix service mesh not support multiple port issues --- node/nodem/envoy/conver/eds_conver.go | 100 ++++++++++++-------------- node/nodem/envoy/server_v2.go | 14 ++-- 2 files changed, 57 insertions(+), 57 deletions(-) diff --git a/node/nodem/envoy/conver/eds_conver.go b/node/nodem/envoy/conver/eds_conver.go index f622a037e..b0c6d14d2 100644 --- a/node/nodem/envoy/conver/eds_conver.go +++ b/node/nodem/envoy/conver/eds_conver.go @@ -45,68 +45,53 @@ func OneNodeClusterLoadAssignment(serviceAlias, namespace string, endpoints []*c continue } clusterName := fmt.Sprintf("%s_%s_%s_%d", namespace, serviceAlias, destServiceAlias, service.Spec.Ports[0].Port) - name := fmt.Sprintf("%sService", destServiceAlias) - if destServiceAlias == serviceAlias { - name = fmt.Sprintf("%sServiceOUT", destServiceAlias) - } - selectEndpoint := getEndpointsByLables(endpoints, map[string]string{"name": name}) + selectEndpoint := getEndpointsByServiceName(endpoints, service.Name) + logrus.Debugf("select endpoints %d for service %s", len(selectEndpoint), service.Name) var lendpoints []endpoint.LocalityLbEndpoints // localityLbEndpoints just support only one content - if len(selectEndpoint) > 0 { - for _, en := range selectEndpoint { - if len(en.Subsets) > 0 { - subset := en.Subsets[0] - if len(subset.Ports) < 1 { - continue + for _, en := range selectEndpoint { + for _, subset := range en.Subsets { + for _, port := range subset.Ports { + toport := int(port.Port) + if serviceAlias == destServiceAlias { + //use real port + if originPort, ok := service.Labels["origin_port"]; ok { + origin, err := strconv.Atoi(originPort) + if err == nil { + toport = origin + } + } } - for _, port := range subset.Ports { - toport := int(port.Port) - //if haven multiple port, will get other port endpoint - //so must ignore - if (len(service.Spec.Ports) == 0 || service.Spec.Ports[0].TargetPort.IntVal != int32(toport)) && en.Labels["service_kind"] != "third_party" { - continue + protocol := string(port.Protocol) + addressList := subset.Addresses + var notready bool + if len(addressList) == 0 && len(subset.NotReadyAddresses) > 0 { + notready = true + addressList = subset.NotReadyAddresses[:1] + } + getHealty := func() *endpoint.Endpoint_HealthCheckConfig { + if notready { + return nil } - if serviceAlias == destServiceAlias { - if originPort, ok := service.Labels["origin_port"]; ok { - origin, err := strconv.Atoi(originPort) - if err == nil { - toport = origin - } - } + return &endpoint.Endpoint_HealthCheckConfig{ + PortValue: uint32(toport), } - protocol := string(subset.Ports[0].Protocol) - addressList := subset.Addresses - var notready bool - if len(addressList) == 0 { - notready = true - addressList = subset.NotReadyAddresses - } - getHealty := func() *endpoint.Endpoint_HealthCheckConfig { - if notready { - return nil - } - return &endpoint.Endpoint_HealthCheckConfig{ - PortValue: uint32(toport), - } - } - var lbe []endpoint.LbEndpoint // just support one content - if len(addressList) > 0 { - envoyAddress := envoyv2.CreateSocketAddress(protocol, addressList[0].IP, uint32(toport)) - - lbe = append(lbe, endpoint.LbEndpoint{ - HostIdentifier: &endpoint.LbEndpoint_Endpoint{ - Endpoint: &endpoint.Endpoint{ - Address: &envoyAddress, - HealthCheckConfig: getHealty(), - }, + } + var lbe []endpoint.LbEndpoint // just support one content + for _, address := range addressList { + envoyAddress := envoyv2.CreateSocketAddress(protocol, address.IP, uint32(toport)) + lbe = append(lbe, endpoint.LbEndpoint{ + HostIdentifier: &endpoint.LbEndpoint_Endpoint{ + Endpoint: &endpoint.Endpoint{ + Address: &envoyAddress, + HealthCheckConfig: getHealty(), }, - }) - } - lendpoints = append(lendpoints, endpoint.LocalityLbEndpoints{LbEndpoints: lbe}) + }, + }) } + lendpoints = append(lendpoints, endpoint.LocalityLbEndpoints{LbEndpoints: lbe}) } } } - cla := &v2.ClusterLoadAssignment{ ClusterName: clusterName, Endpoints: lendpoints, @@ -138,3 +123,12 @@ func getEndpointsByLables(endpoints []*corev1.Endpoints, slabels map[string]stri } return } + +func getEndpointsByServiceName(endpoints []*corev1.Endpoints, serviceName string) (re []*corev1.Endpoints) { + for _, en := range endpoints { + if serviceName == en.Name { + re = append(re, en) + } + } + return +} diff --git a/node/nodem/envoy/server_v2.go b/node/nodem/envoy/server_v2.go index bf43141b8..f085402b6 100644 --- a/node/nodem/envoy/server_v2.go +++ b/node/nodem/envoy/server_v2.go @@ -23,6 +23,7 @@ import ( "fmt" "net" "reflect" + "strconv" "sync" "sync/atomic" "time" @@ -183,7 +184,14 @@ func (d *DiscoverServerManager) UpdateNodeConfig(nc *NodeConfig) error { if selector != nil { upServices, upEndpoints := d.GetServicesAndEndpoints(nc.namespace, selector) for i, service := range upServices { - if service.Spec.Ports[0].TargetPort.IntVal == int32(dep.Port) { + listenPort := service.Spec.Ports[0].Port + if value, ok := service.Labels["origin_port"]; ok { + origin, _ := strconv.Atoi(value) + if origin != 0 { + listenPort = int32(origin) + } + } + if listenPort == int32(dep.Port) { services = append(services, upServices[i]) } } @@ -191,9 +199,7 @@ func (d *DiscoverServerManager) UpdateNodeConfig(nc *NodeConfig) error { if len(end.Subsets) == 0 || len(end.Subsets[0].Ports) == 0 { continue } - if end.Subsets[0].Ports[0].Port == int32(dep.Port) { - endpoint = append(endpoint, upEndpoints[i]) - } + endpoint = append(endpoint, upEndpoints[i]) } } }