[REV] find the associated endpoints by the name of the service

This commit is contained in:
huangrh 2019-03-31 02:17:07 +08:00
parent 3636855e13
commit 2320e1d962

View File

@ -29,9 +29,6 @@ import (
"strings"
"sync"
"github.com/goodrain/rainbond/db/model"
"k8s.io/apimachinery/pkg/labels"
"github.com/Sirupsen/logrus"
"github.com/eapache/channels"
"github.com/goodrain/rainbond/cmd/gateway/option"
@ -40,7 +37,7 @@ import (
"github.com/goodrain/rainbond/gateway/controller/config"
"github.com/goodrain/rainbond/gateway/defaults"
"github.com/goodrain/rainbond/gateway/util"
v1 "github.com/goodrain/rainbond/gateway/v1"
"github.com/goodrain/rainbond/gateway/v1"
corev1 "k8s.io/api/core/v1"
extensions "k8s.io/api/extensions/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -377,97 +374,74 @@ func (s *k8sStore) extractAnnotations(ing *extensions.Ingress) {
func (s *k8sStore) ListPool() ([]*v1.Pool, []*v1.Pool) {
var httpPools []*v1.Pool
var tcpPools []*v1.Pool
l7Pools := make(map[string]*v1.Pool)
l4Pools := make(map[string]*v1.Pool)
for _, item := range s.listers.Endpoint.List() {
ep := item.(*corev1.Endpoints)
f := func(backendMap map[string][]backend, poolMap map[string]struct{}) map[string]*v1.Pool {
pools := make(map[string]*v1.Pool)
for k, v := range backendMap {
service, err := s.listers.Service.ByKey(k)
if err != nil {
logrus.Warningf("Key: %s;error getting service: %v", k, err)
continue
}
originPort := service.GetLabels()["origin_port"]
var pluginPort int32
if originPort != "" {
port, err := strconv.Atoi(originPort)
if err != nil {
logrus.Warningf("Origin port: %s; error converting to type int: %v", err)
pluginPort = 0
} else {
pluginPort = int32(port)
if ep.Subsets != nil || len(ep.Subsets) != 0 {
epn := ep.ObjectMeta.Name
// l7
backends := l7PoolBackendMap[ep.ObjectMeta.Name]
for _, backend := range backends {
pool := l7Pools[backend.name]
if pool == nil {
pool = &v1.Pool{
Nodes: []*v1.Node{},
}
pool.Name = backend.name
pool.UpstreamHashBy = backend.hashBy
l7Pools[backend.name] = pool
}
}
// list related endpoints
labelname, ok := service.GetLabels()["name"]
if !ok {
logrus.Warningf("Key: %s;label 'name' not found", k)
continue
}
selector, err := labels.Parse(fmt.Sprintf("name=%s", labelname))
if err != nil {
logrus.Warningf("Label: %s; error parsing labels: %s",
fmt.Sprintf("name=%s", labelname), err.Error())
continue
}
endpoints, err := s.sharedInformer.Core().V1().Endpoints().Lister().Endpoints(service.GetNamespace()).List(selector)
if err != nil {
logrus.Warningf("Label: %s; error listing endpoints: %v",
fmt.Sprintf("name=%s", labelname), err)
continue
}
for _, ep := range endpoints {
if ep.Subsets == nil || len(ep.Subsets) == 0 {
continue
}
if ep.GetLabels()["service_kind"] != model.ServiceKindThirdParty.String() {
if ep.Subsets[0].Ports[0].Port != service.Spec.Ports[0].TargetPort.IntVal {
continue
for _, ss := range ep.Subsets {
var addresses []corev1.EndpointAddress
if ss.Addresses != nil && len(ss.Addresses) > 0 {
addresses = append(addresses, ss.Addresses...)
} else {
addresses = append(addresses, ss.NotReadyAddresses...)
}
for _, address := range addresses {
if _, ok := l7PoolMap[epn]; ok { // l7
pool.Nodes = append(pool.Nodes, &v1.Node{
Host: address.IP,
Port: ss.Ports[0].Port,
Weight: backend.weight,
})
}
}
}
for _, backend := range v {
pool := pools[backend.name]
if pool == nil {
pool = &v1.Pool{
Nodes: []*v1.Node{},
}
pool.Name = backend.name
pool.UpstreamHashBy = backend.hashBy
pools[backend.name] = pool
}
// l4
backends = l4PoolBackendMap[ep.ObjectMeta.Name]
for _, backend := range backends {
pool := l4Pools[backend.name]
if pool == nil {
pool = &v1.Pool{
Nodes: []*v1.Node{},
}
for _, ss := range ep.Subsets {
var addresses []corev1.EndpointAddress
if ss.Addresses != nil && len(ss.Addresses) > 0 {
addresses = append(addresses, ss.Addresses...)
} else {
addresses = append(addresses, ss.NotReadyAddresses...)
}
for _, address := range addresses {
if _, ok := poolMap[k]; ok { // l7
pool.Nodes = append(pool.Nodes, &v1.Node{
Host: address.IP,
Port: func(pluginPort, addressPort int32) int32 {
if pluginPort != 0 {
return pluginPort
}
return addressPort
}(pluginPort, ss.Ports[0].Port),
Weight: backend.weight,
})
}
pool.Name = backend.name
l4Pools[backend.name] = pool
}
for _, ss := range ep.Subsets {
var addresses []corev1.EndpointAddress
if ss.Addresses != nil && len(ss.Addresses) > 0 {
addresses = append(addresses, ss.Addresses...)
} else {
addresses = append(addresses, ss.NotReadyAddresses...)
}
for _, address := range addresses {
if _, ok := l4PoolMap[epn]; ok { // l7
pool.Nodes = append(pool.Nodes, &v1.Node{
Host: address.IP,
Port: ss.Ports[0].Port,
Weight: backend.weight,
})
}
}
}
}
}
return pools
}
l7Pools := f(l7PoolBackendMap, l7PoolMap)
l4Pools := f(l4PoolBackendMap, l4PoolMap)
// change map to slice TODO: use map directly
for _, pool := range l7Pools {
httpPools = append(httpPools, pool)
@ -491,20 +465,20 @@ func (s *k8sStore) ListVirtualService() (l7vs []*v1.VirtualService, l4vs []*v1.V
if !s.ingressIsValid(ing) {
continue
}
ingKey := k8s.MetaNamespaceKey(ing)
anns, err := s.GetIngressAnnotations(ingKey)
if err != nil {
logrus.Errorf("Error getting Ingress annotations %q: %v", ingKey, err)
}
if anns.L4.L4Enable && anns.L4.L4Port != 0 {
svcKey := fmt.Sprintf("%v/%v", ing.Namespace, ing.Spec.Backend.ServiceName)
// region l4
host := strings.Replace(anns.L4.L4Host, " ", "", -1)
if host == "" {
host = s.conf.IP
}
host = s.conf.IP
svcKey := fmt.Sprintf("%v/%v", ing.Namespace, ing.Spec.Backend.ServiceName)
protocol := s.GetServiceProtocol(svcKey, ing.Spec.Backend.ServicePort.IntVal)
listening := fmt.Sprintf("%s:%v", host, anns.L4.L4Port)
if string(protocol) == string(v1.ProtocolUDP) {
@ -521,11 +495,12 @@ func (s *k8sStore) ListVirtualService() (l7vs []*v1.VirtualService, l4vs []*v1.V
vs.Namespace = anns.Namespace
vs.ServiceID = anns.Labels["service_id"]
}
l4PoolMap[svcKey] = struct{}{}
l4PoolMap[ing.Spec.Backend.ServiceName] = struct{}{}
l4vsMap[listening] = vs
l4vs = append(l4vs, vs)
backend := backend{name: backendName, weight: anns.Weight.Weight}
l4PoolBackendMap[svcKey] = append(l4PoolBackendMap[svcKey], backend)
l4PoolBackendMap[ing.Spec.Backend.ServiceName] = append(l4PoolBackendMap[ing.Spec.Backend.ServiceName], backend)
// endregion
} else {
// region l7
@ -577,14 +552,15 @@ func (s *k8sStore) ListVirtualService() (l7vs []*v1.VirtualService, l4vs []*v1.V
vs.SSLCert = hostSSLMap[DefVirSrvName]
}
}
l7vsMap[virSrvName] = vs
l7vs = append(l7vs, vs)
}
for _, path := range rule.IngressRuleValue.HTTP.Paths {
svckey := fmt.Sprintf("%s/%s", ing.Namespace, path.Backend.ServiceName)
locKey := fmt.Sprintf("%s_%s", virSrvName, path.Path)
location := srvLocMap[locKey]
l7PoolMap[svckey] = struct{}{}
l7PoolMap[path.Backend.ServiceName] = struct{}{}
// if location do not exists, then creates a new one
if location == nil {
location = &v1.Location{
@ -593,10 +569,9 @@ func (s *k8sStore) ListVirtualService() (l7vs []*v1.VirtualService, l4vs []*v1.V
}
srvLocMap[locKey] = location
vs.Locations = append(vs.Locations, location)
// the first ingress proxy takes effect
location.Proxy = anns.Proxy
}
location.Proxy = anns.Proxy
// If their ServiceName is the same, then the new one will overwrite the old one.
nameCondition := &v1.Condition{}
var backendName string
@ -619,7 +594,7 @@ func (s *k8sStore) ListVirtualService() (l7vs []*v1.VirtualService, l4vs []*v1.V
if anns.UpstreamHashBy != "" {
backend.hashBy = anns.UpstreamHashBy
}
l7PoolBackendMap[svckey] = append(l7PoolBackendMap[svckey], backend)
l7PoolBackendMap[path.Backend.ServiceName] = append(l7PoolBackendMap[path.Backend.ServiceName], backend)
}
}
// endregion
@ -630,57 +605,47 @@ func (s *k8sStore) ListVirtualService() (l7vs []*v1.VirtualService, l4vs []*v1.V
// ingressIsValid checks if the specified ingress is valid
func (s *k8sStore) ingressIsValid(ing *extensions.Ingress) bool {
var svcKey string
var endpointKey string
if ing.Spec.Backend != nil { // stream
svcKey = fmt.Sprintf("%s/%s", ing.Namespace, ing.Spec.Backend.ServiceName)
endpointKey = fmt.Sprintf("%s/%s", ing.Namespace, ing.Spec.Backend.ServiceName)
} else { // http
Loop:
for _, rule := range ing.Spec.Rules {
for _, path := range rule.IngressRuleValue.HTTP.Paths {
svcKey = fmt.Sprintf("%s/%s", ing.Namespace, path.Backend.ServiceName)
if svcKey != "" {
endpointKey = fmt.Sprintf("%s/%s", ing.Namespace, path.Backend.ServiceName)
if endpointKey != "" {
break Loop
}
}
}
}
service, err := s.listers.Service.ByKey(svcKey)
item, exists, err := s.listers.Endpoint.GetByKey(endpointKey)
if err != nil {
logrus.Warningf("Key: %s; error getting key: %v", err)
logrus.Errorf("Can not get endpoint by key(%s): %v", endpointKey, err)
return false
}
labelname := fmt.Sprintf("name=%s", service.GetLabels()["name"])
selector, err := labels.Parse(labelname)
if err != nil {
logrus.Warningf("Label: %s;error parsing labels: %v", labelname, err)
if !exists {
logrus.Warningf("Endpoint \"%s\" does not exist.", endpointKey)
return false
}
eps, err := s.sharedInformer.Core().V1().Endpoints().Lister().Endpoints(service.GetNamespace()).List(selector)
if err != nil {
logrus.Warningf("selector: %s; error listing endpoints: %v",
fmt.Sprintf("name=%s", labelname), err)
endpoint, ok := item.(*corev1.Endpoints)
if !ok {
logrus.Errorf("Cant not convert %v to %v", reflect.TypeOf(item), reflect.TypeOf(endpoint))
return false
}
if eps == nil || len(eps) == 0 {
logrus.Warningf("Selector: %s; empty endpoints", fmt.Sprintf("name=%s", labelname))
if endpoint.Subsets == nil || len(endpoint.Subsets) == 0 {
logrus.Warningf("Endpoints(%s) is empty, ignore it", endpointKey)
return false
}
result := false
for _, ep := range eps {
if ep.Subsets != nil && len(ep.Subsets) > 0 {
e := ep.Subsets[0]
if ep.GetLabels()["service_kind"] != model.ServiceKindThirdParty.String() &&
e.Ports[0].Port != service.Spec.Ports[0].Port {
continue
}
if !((e.Addresses == nil || len(e.Addresses) == 0) && (e.NotReadyAddresses == nil || len(e.NotReadyAddresses) == 0)) {
result = true
break
}
for _, ep := range endpoint.Subsets {
if (ep.Addresses == nil || len(ep.Addresses) == 0) && (ep.NotReadyAddresses == nil || len(ep.NotReadyAddresses) == 0) {
logrus.Warningf("Endpoints(%s) is empty, ignore it", endpointKey)
return false
}
}
return result
return true
}
// GetIngress returns the Ingress matching key.
@ -701,20 +666,6 @@ func (s *k8sStore) ListIngresses() []*extensions.Ingress {
return ingresses
}
// GetServiceNameLabelByKey returns name in the labels of corev1.Service
// matching key(name/namespace).
func (s *k8sStore) GetServiceNameLabelByKey(key string) (string, error) {
svc, err := s.listers.Service.ByKey(key)
if err != nil {
return "", err
}
name, ok := svc.Labels["name"]
if !ok {
return "", fmt.Errorf("label \"name\" not found")
}
return name, nil
}
// GetServiceProtocol returns the Service matching key and port.
func (s *k8sStore) GetServiceProtocol(key string, port int32) corev1.Protocol {
svcs, err := s.listers.Service.ByKey(key)