Merge branch 'V5.1' of https://github.com/goodrain/rainbond into V5.1

This commit is contained in:
barnett 2019-03-20 14:15:51 +08:00
commit 4aead9ace1
4 changed files with 116 additions and 114 deletions

View File

@ -21,7 +21,9 @@ package store
import (
"bytes"
"fmt"
"github.com/goodrain/rainbond/db/model"
"io/ioutil"
"k8s.io/apimachinery/pkg/labels"
"net"
"os"
"reflect"
@ -37,7 +39,7 @@ import (
"github.com/goodrain/rainbond/gateway/controller/config"
"github.com/goodrain/rainbond/gateway/defaults"
"github.com/goodrain/rainbond/gateway/util"
v1 "github.com/goodrain/rainbond/gateway/v1"
"github.com/goodrain/rainbond/gateway/v1"
corev1 "k8s.io/api/core/v1"
extensions "k8s.io/api/extensions/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -118,7 +120,8 @@ type k8sStore struct {
conf *option.Config
client kubernetes.Interface
// informer contains the cache Informers
informers *Informer
informers *Informer
sharedInformer informers.SharedInformerFactory
// Lister contains object listers (stores).
listers *Lister
secretIngressMap *secretIngressMap
@ -158,21 +161,21 @@ func New(client kubernetes.Interface,
store.listers.IngressAnnotation.Store = cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc)
// create informers factory, enable and assign required informers
infFactory := informers.NewFilteredSharedInformerFactory(client, conf.ResyncPeriod, corev1.NamespaceAll,
store.sharedInformer = informers.NewFilteredSharedInformerFactory(client, conf.ResyncPeriod, corev1.NamespaceAll,
func(options *metav1.ListOptions) {
options.LabelSelector = "creater=Rainbond"
})
store.informers.Ingress = infFactory.Extensions().V1beta1().Ingresses().Informer()
store.informers.Ingress = store.sharedInformer.Extensions().V1beta1().Ingresses().Informer()
store.listers.Ingress.Store = store.informers.Ingress.GetStore()
store.informers.Service = infFactory.Core().V1().Services().Informer()
store.informers.Service = store.sharedInformer.Core().V1().Services().Informer()
store.listers.Service.Store = store.informers.Service.GetStore()
store.informers.Endpoint = infFactory.Core().V1().Endpoints().Informer()
store.informers.Endpoint = store.sharedInformer.Core().V1().Endpoints().Informer()
store.listers.Endpoint.Store = store.informers.Endpoint.GetStore()
store.informers.Secret = infFactory.Core().V1().Secrets().Informer()
store.informers.Secret = store.sharedInformer.Core().V1().Secrets().Informer()
store.listers.Secret.Store = store.informers.Secret.GetStore()
ingEventHandler := cache.ResourceEventHandlerFuncs{
@ -373,86 +376,101 @@ func (s *k8sStore) extractAnnotations(ing *extensions.Ingress) {
func (s *k8sStore) ListPool() ([]*v1.Pool, []*v1.Pool) {
var httpPools []*v1.Pool
var tcpPools []*v1.Pool
l7Pools := make(map[string]*v1.Pool)
l4Pools := make(map[string]*v1.Pool)
for _, item := range s.listers.Endpoint.List() {
ep := item.(*corev1.Endpoints)
labels := ep.GetLabels()
name, ok := labels["name"]
if !ok {
logrus.Warningf("there is no name in the labels of corev1.Endpoints(%s/%s)",
ep.Namespace, ep.Name)
continue
}
if ep.Subsets != nil || len(ep.Subsets) != 0 {
// l7
backends := l7PoolBackendMap[name]
for _, backend := range backends {
pool := l7Pools[backend.name]
if pool == nil {
pool = &v1.Pool{
Nodes: []*v1.Node{},
}
pool.Name = backend.name
pool.UpstreamHashBy = backend.hashBy
l7Pools[backend.name] = pool
}
for _, ss := range ep.Subsets {
var addresses []corev1.EndpointAddress
if ss.Addresses != nil && len(ss.Addresses) > 0 {
addresses = append(addresses, ss.Addresses...)
} else {
addresses = append(addresses, ss.NotReadyAddresses...)
}
for _, address := range addresses {
if _, ok := l7PoolMap[name]; ok { // l7
pool.Nodes = append(pool.Nodes, &v1.Node{
Host: address.IP,
Port: ss.Ports[0].Port,
Weight: backend.weight,
})
}
}
f := func(backendMap map[string][]backend, poolMap map[string]struct{}) map[string]*v1.Pool {
pools := make(map[string]*v1.Pool)
for k, v := range backendMap {
service, err := s.listers.Service.ByKey(k)
if err != nil {
logrus.Warningf("Key: %s;error getting service: %v", k, err)
continue
}
originPort := service.GetLabels()["origin_port"]
var pluginPort int32
if originPort != "" {
port, err := strconv.Atoi(originPort)
if err != nil {
logrus.Warningf("Origin port: %s; error converting to type int: %v", err)
pluginPort = 0
} else {
pluginPort = int32(port)
}
}
// l4
backends = l4PoolBackendMap[name]
for _, backend := range backends {
pool := l4Pools[backend.name]
if pool == nil {
pool = &v1.Pool{
Nodes: []*v1.Node{},
}
pool.Name = backend.name
l4Pools[backend.name] = pool
// list related endpoints
labelname, ok := service.GetLabels()["name"]
if !ok {
logrus.Warningf("Key: %s;label 'name' not found", k)
continue
}
selector, err := labels.Parse(fmt.Sprintf("name=%s", labelname))
if err != nil {
logrus.Warningf("Label: %s; error parsing labels: %s",
fmt.Sprintf("name=%s", labelname), err.Error())
continue
}
endpoints, err := s.sharedInformer.Core().V1().Endpoints().Lister().Endpoints(service.GetNamespace()).List(selector)
if err != nil {
logrus.Warningf("Label: %s; error listing endpoints: %v",
fmt.Sprintf("name=%s", labelname), err)
continue
}
for _, ep := range endpoints {
if ep.Subsets == nil || len(ep.Subsets) == 0 {
continue
}
for _, ss := range ep.Subsets {
var addresses []corev1.EndpointAddress
if ss.Addresses != nil && len(ss.Addresses) > 0 {
addresses = append(addresses, ss.Addresses...)
} else {
addresses = append(addresses, ss.NotReadyAddresses...)
if ep.GetLabels()["service_kind"] != model.ServiceKindThirdParty.String() {
if ep.Subsets[0].Ports[0].Port != service.Spec.Ports[0].Port {
continue
}
for _, address := range addresses {
if _, ok := l4PoolMap[name]; ok { // l7
pool.Nodes = append(pool.Nodes, &v1.Node{
Host: address.IP,
Port: ss.Ports[0].Port,
Weight: backend.weight,
})
}
for _, backend := range v {
pool := pools[backend.name]
if pool == nil {
pool = &v1.Pool{
Nodes: []*v1.Node{},
}
pool.Name = backend.name
pool.UpstreamHashBy = backend.hashBy
pools[backend.name] = pool
}
for _, ss := range ep.Subsets {
var addresses []corev1.EndpointAddress
if ss.Addresses != nil && len(ss.Addresses) > 0 {
addresses = append(addresses, ss.Addresses...)
} else {
addresses = append(addresses, ss.NotReadyAddresses...)
}
for _, address := range addresses {
if _, ok := poolMap[k]; ok { // l7
pool.Nodes = append(pool.Nodes, &v1.Node{
Host: address.IP,
Port: func(pluginPort, addressPort int32) int32 {
if pluginPort != 0 {
return pluginPort
}
return ss.Ports[0].Port
}(pluginPort, ss.Ports[0].Port),
Weight: backend.weight,
})
}
}
}
}
}
}
return pools
}
l7Pools := f(l7PoolBackendMap, l7PoolMap)
l4Pools := f(l4PoolBackendMap, l4PoolMap)
// change map to slice TODO: use map directly
for _, pool := range l7Pools {
httpPools = append(httpPools, pool)
}
for _, pool := range l4Pools {
tcpPools = append(tcpPools, pool)
}
@ -479,11 +497,6 @@ func (s *k8sStore) ListVirtualService() (l7vs []*v1.VirtualService, l4vs []*v1.V
}
if anns.L4.L4Enable && anns.L4.L4Port != 0 {
svcKey := fmt.Sprintf("%v/%v", ing.Namespace, ing.Spec.Backend.ServiceName)
name, err := s.GetServiceNameLabelByKey(svcKey)
if err != nil {
logrus.Warningf("key: %s; error getting service name label: %v", svcKey, err)
continue
}
// region l4
host := strings.Replace(anns.L4.L4Host, " ", "", -1)
@ -507,11 +520,11 @@ func (s *k8sStore) ListVirtualService() (l7vs []*v1.VirtualService, l4vs []*v1.V
vs.Namespace = anns.Namespace
vs.ServiceID = anns.Labels["service_id"]
}
l4PoolMap[name] = struct{}{}
l4PoolMap[svcKey] = struct{}{}
l4vsMap[listening] = vs
l4vs = append(l4vs, vs)
backend := backend{name: backendName, weight: anns.Weight.Weight}
l4PoolBackendMap[name] = append(l4PoolBackendMap[name], backend)
l4PoolBackendMap[svcKey] = append(l4PoolBackendMap[svcKey], backend)
// endregion
} else {
// region l7
@ -568,14 +581,9 @@ func (s *k8sStore) ListVirtualService() (l7vs []*v1.VirtualService, l4vs []*v1.V
}
for _, path := range rule.IngressRuleValue.HTTP.Paths {
svckey := fmt.Sprintf("%s/%s", ing.Namespace, path.Backend.ServiceName)
name, err := s.GetServiceNameLabelByKey(svckey)
if err != nil {
logrus.Warningf("key: %s; error getting service name label: %v", svckey, err)
continue
}
locKey := fmt.Sprintf("%s_%s", virSrvName, path.Path)
location := srvLocMap[locKey]
l7PoolMap[name] = struct{}{}
l7PoolMap[svckey] = struct{}{}
// if location do not exists, then creates a new one
if location == nil {
location = &v1.Location{
@ -610,7 +618,7 @@ func (s *k8sStore) ListVirtualService() (l7vs []*v1.VirtualService, l4vs []*v1.V
if anns.UpstreamHashBy != "" {
backend.hashBy = anns.UpstreamHashBy
}
l7PoolBackendMap[name] = append(l7PoolBackendMap[name], backend)
l7PoolBackendMap[svckey] = append(l7PoolBackendMap[svckey], backend)
}
}
// endregion
@ -635,38 +643,38 @@ func (s *k8sStore) ingressIsValid(ing *extensions.Ingress) bool {
}
}
}
labelname, err := s.GetServiceNameLabelByKey(svcKey)
service, err := s.listers.Service.ByKey(svcKey)
if err != nil {
logrus.Warningf("label: %s; error parsing label: %v", labelname, err)
logrus.Warningf("Key: %s; error getting key: %v", err)
return false
}
endpointsList, err := s.client.CoreV1().Endpoints(ing.Namespace).List(metav1.ListOptions{
LabelSelector: fmt.Sprintf("name=%s", labelname),
})
labelname := fmt.Sprintf("name=%s", service.GetLabels()["name"])
selector, err := labels.Parse(labelname)
if err != nil {
logrus.Warningf("selector: %s; error list endpoints: %v",
logrus.Warningf("Label: %s;error parsing labels: %v", labelname, err)
return false
}
eps, err := s.sharedInformer.Core().V1().Endpoints().Lister().Endpoints(service.GetNamespace()).List(selector)
if err != nil {
logrus.Warningf("selector: %s; error listing endpoints: %v",
fmt.Sprintf("name=%s", labelname), err)
return false
}
if endpointsList == nil || endpointsList.Items == nil || len(endpointsList.Items) == 0 {
logrus.Warningf("selector: %s; can't find any endpoints",
fmt.Sprintf("name=%s", labelname))
return false
if eps == nil || len(eps) == 0 {
logrus.Warningf("Selector: %s; empty endpoints", fmt.Sprintf("name=%s", labelname))
}
result := false
RESULT:
for _, ep := range endpointsList.Items {
for _, ep := range eps {
if ep.Subsets != nil && len(ep.Subsets) > 0 {
//logrus.Warningf("selector: %s; empty endpoints subsets; endpoints: %V",
//fmt.Sprintf("name=%s", labelname), ep)
for _, e := range ep.Subsets {
if !((e.Addresses == nil || len(e.Addresses) == 0) && (e.NotReadyAddresses == nil || len(e.NotReadyAddresses) == 0)) {
//logrus.Warningf("selector: %s; empty endpoints addresses; endpoints: %V",
// fmt.Sprintf("name=%s", labelname), ep)
result = true
break RESULT
}
e := ep.Subsets[0]
if ep.GetLabels()["service_kind"] != model.ServiceKindThirdParty.String() &&
e.Ports[0].Port != service.Spec.Ports[0].Port {
continue
}
if !((e.Addresses == nil || len(e.Addresses) == 0) && (e.NotReadyAddresses == nil || len(e.NotReadyAddresses) == 0)) {
result = true
break
}
}
}

View File

@ -81,9 +81,7 @@ server {
proxy_cookie_domain {{ $loc.Proxy.CookieDomain }};
proxy_cookie_path {{ $loc.Proxy.CookiePath }};
{{ if isValidByteSize $loc.Proxy.BodySize true }}
client_max_body_size {{ $loc.Proxy.BodySize }}m;
{{ end }}
{{ if $loc.DisableAccessLog }}
access_log off;

View File

@ -198,11 +198,7 @@ func NewStore(clientset *kubernetes.Clientset,
}
},
UpdateFunc: func(old, cur interface{}) {
//oep := old.(*corev1.Endpoints)
cep := cur.(*corev1.Endpoints)
logrus.Debugf("received update endpoints; name: %s", cep.GetName())
//if cep.ResourceVersion != oep.ResourceVersion &&
// !reflect.DeepEqual(cep.Subsets, oep.Subsets) {
serviceID := cep.Labels["service_id"]
version := cep.Labels["version"]

View File

@ -274,7 +274,7 @@ func (m *Manager) verticalScalingExec(task *model.Task) error {
}
newAppService.Logger = logger
appService.SetUpgradePatch(newAppService)
err = m.controllerManager.StartController(controller.TypeUpgradeController, *appService)
err = m.controllerManager.StartController(controller.TypeUpgradeController, *newAppService)
if err != nil {
logrus.Errorf("Application run vertical scaling(upgrade) controller failure:%s", err.Error())
logger.Info("Application run vertical scaling(upgrade) controller failure", controller.GetCallbackLoggerOption())