Merge pull request #264 from GLYASAI/RAINBOND-837

[FIX] free endpoints were removed by controller-manager
This commit is contained in:
barnettZQG 2019-03-31 09:15:06 +08:00 committed by GitHub
commit b76d47b625
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 622 additions and 596 deletions

View File

@ -19,7 +19,6 @@
package handler
import (
"encoding/json"
"fmt"
"github.com/Sirupsen/logrus"
"github.com/goodrain/rainbond/api/model"
@ -27,8 +26,6 @@ import (
dbmodel "github.com/goodrain/rainbond/db/model"
"github.com/goodrain/rainbond/util"
"github.com/goodrain/rainbond/worker/client"
"strconv"
"strings"
)
// ThirdPartyServiceHanlder handles business logic for all third-party services
@ -96,8 +93,6 @@ func (t *ThirdPartyServiceHanlder) DelEndpoints(epid, sid string) error {
// ListEndpoints lists third-party service endpoints.
func (t *ThirdPartyServiceHanlder) ListEndpoints(sid string) ([]*model.EndpointResp, error) {
endpoints, err := t.dbmanager.EndpointsDao().List(sid)
b, _ := json.Marshal(endpoints)
logrus.Debugf("Endpoints from db: %s", string(b))
if err != nil {
logrus.Warningf("ServiceID: %s; error listing endpoints from db; %v", sid, err)
}
@ -105,9 +100,9 @@ func (t *ThirdPartyServiceHanlder) ListEndpoints(sid string) ([]*model.EndpointR
for _, item := range endpoints {
m[item.UUID] = &model.EndpointResp{
EpID: item.UUID,
IP: func(ip string, port int) string {
if port != 0 {
return fmt.Sprintf("%s:%d", ip, port)
IP: func(ip string, p int) string {
if p != 0 {
return fmt.Sprintf("%s:%d", ip, p)
}
return ip
}(item.IP, item.Port),
@ -123,8 +118,6 @@ func (t *ThirdPartyServiceHanlder) ListEndpoints(sid string) ([]*model.EndpointR
return nil, err
}
if thirdPartyEndpoints != nil && thirdPartyEndpoints.Obj != nil {
b, _ = json.Marshal(thirdPartyEndpoints)
logrus.Debugf("Endpoints from rpc: %s", string(b))
for _, item := range thirdPartyEndpoints.Obj {
ep := m[item.Uuid]
if ep != nil {
@ -133,13 +126,8 @@ func (t *ThirdPartyServiceHanlder) ListEndpoints(sid string) ([]*model.EndpointR
continue
}
m[item.Uuid] = &model.EndpointResp{
EpID: item.Uuid,
IP: func(ip string, port int32) string {
if port != 0 {
return fmt.Sprintf("%s:%d", ip, port)
}
return ip
}(item.Ip, item.Port),
EpID: item.Uuid,
IP: item.Ip,
Status: item.Status,
IsOnline: true,
IsStatic: false,
@ -154,18 +142,3 @@ func (t *ThirdPartyServiceHanlder) ListEndpoints(sid string) ([]*model.EndpointR
return res, nil
}
func splitIP(input string) (string, int) {
sli := strings.Split(input, ":")
if len(sli) == 2 {
return sli[0], func(port string) int {
p, err := strconv.Atoi(port)
if err != nil {
logrus.Warningf("String: %s; error converting string to int", port)
return 0
}
return p
}(sli[1])
}
return input, 0
}

View File

@ -29,9 +29,6 @@ import (
"strings"
"sync"
"github.com/goodrain/rainbond/db/model"
"k8s.io/apimachinery/pkg/labels"
"github.com/Sirupsen/logrus"
"github.com/eapache/channels"
"github.com/goodrain/rainbond/cmd/gateway/option"
@ -40,7 +37,7 @@ import (
"github.com/goodrain/rainbond/gateway/controller/config"
"github.com/goodrain/rainbond/gateway/defaults"
"github.com/goodrain/rainbond/gateway/util"
v1 "github.com/goodrain/rainbond/gateway/v1"
"github.com/goodrain/rainbond/gateway/v1"
corev1 "k8s.io/api/core/v1"
extensions "k8s.io/api/extensions/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -377,97 +374,74 @@ func (s *k8sStore) extractAnnotations(ing *extensions.Ingress) {
func (s *k8sStore) ListPool() ([]*v1.Pool, []*v1.Pool) {
var httpPools []*v1.Pool
var tcpPools []*v1.Pool
l7Pools := make(map[string]*v1.Pool)
l4Pools := make(map[string]*v1.Pool)
for _, item := range s.listers.Endpoint.List() {
ep := item.(*corev1.Endpoints)
f := func(backendMap map[string][]backend, poolMap map[string]struct{}) map[string]*v1.Pool {
pools := make(map[string]*v1.Pool)
for k, v := range backendMap {
service, err := s.listers.Service.ByKey(k)
if err != nil {
logrus.Warningf("Key: %s;error getting service: %v", k, err)
continue
}
originPort := service.GetLabels()["origin_port"]
var pluginPort int32
if originPort != "" {
port, err := strconv.Atoi(originPort)
if err != nil {
logrus.Warningf("Origin port: %s; error converting to type int: %v", err)
pluginPort = 0
} else {
pluginPort = int32(port)
if ep.Subsets != nil || len(ep.Subsets) != 0 {
epn := ep.ObjectMeta.Name
// l7
backends := l7PoolBackendMap[ep.ObjectMeta.Name]
for _, backend := range backends {
pool := l7Pools[backend.name]
if pool == nil {
pool = &v1.Pool{
Nodes: []*v1.Node{},
}
pool.Name = backend.name
pool.UpstreamHashBy = backend.hashBy
l7Pools[backend.name] = pool
}
}
// list related endpoints
labelname, ok := service.GetLabels()["name"]
if !ok {
logrus.Warningf("Key: %s;label 'name' not found", k)
continue
}
selector, err := labels.Parse(fmt.Sprintf("name=%s", labelname))
if err != nil {
logrus.Warningf("Label: %s; error parsing labels: %s",
fmt.Sprintf("name=%s", labelname), err.Error())
continue
}
endpoints, err := s.sharedInformer.Core().V1().Endpoints().Lister().Endpoints(service.GetNamespace()).List(selector)
if err != nil {
logrus.Warningf("Label: %s; error listing endpoints: %v",
fmt.Sprintf("name=%s", labelname), err)
continue
}
for _, ep := range endpoints {
if ep.Subsets == nil || len(ep.Subsets) == 0 {
continue
}
if ep.GetLabels()["service_kind"] != model.ServiceKindThirdParty.String() {
if ep.Subsets[0].Ports[0].Port != service.Spec.Ports[0].TargetPort.IntVal {
continue
for _, ss := range ep.Subsets {
var addresses []corev1.EndpointAddress
if ss.Addresses != nil && len(ss.Addresses) > 0 {
addresses = append(addresses, ss.Addresses...)
} else {
addresses = append(addresses, ss.NotReadyAddresses...)
}
for _, address := range addresses {
if _, ok := l7PoolMap[epn]; ok { // l7
pool.Nodes = append(pool.Nodes, &v1.Node{
Host: address.IP,
Port: ss.Ports[0].Port,
Weight: backend.weight,
})
}
}
}
for _, backend := range v {
pool := pools[backend.name]
if pool == nil {
pool = &v1.Pool{
Nodes: []*v1.Node{},
}
pool.Name = backend.name
pool.UpstreamHashBy = backend.hashBy
pools[backend.name] = pool
}
// l4
backends = l4PoolBackendMap[ep.ObjectMeta.Name]
for _, backend := range backends {
pool := l4Pools[backend.name]
if pool == nil {
pool = &v1.Pool{
Nodes: []*v1.Node{},
}
for _, ss := range ep.Subsets {
var addresses []corev1.EndpointAddress
if ss.Addresses != nil && len(ss.Addresses) > 0 {
addresses = append(addresses, ss.Addresses...)
} else {
addresses = append(addresses, ss.NotReadyAddresses...)
}
for _, address := range addresses {
if _, ok := poolMap[k]; ok { // l7
pool.Nodes = append(pool.Nodes, &v1.Node{
Host: address.IP,
Port: func(pluginPort, addressPort int32) int32 {
if pluginPort != 0 {
return pluginPort
}
return addressPort
}(pluginPort, ss.Ports[0].Port),
Weight: backend.weight,
})
}
pool.Name = backend.name
l4Pools[backend.name] = pool
}
for _, ss := range ep.Subsets {
var addresses []corev1.EndpointAddress
if ss.Addresses != nil && len(ss.Addresses) > 0 {
addresses = append(addresses, ss.Addresses...)
} else {
addresses = append(addresses, ss.NotReadyAddresses...)
}
for _, address := range addresses {
if _, ok := l4PoolMap[epn]; ok { // l7
pool.Nodes = append(pool.Nodes, &v1.Node{
Host: address.IP,
Port: ss.Ports[0].Port,
Weight: backend.weight,
})
}
}
}
}
}
return pools
}
l7Pools := f(l7PoolBackendMap, l7PoolMap)
l4Pools := f(l4PoolBackendMap, l4PoolMap)
// change map to slice TODO: use map directly
for _, pool := range l7Pools {
httpPools = append(httpPools, pool)
@ -491,20 +465,20 @@ func (s *k8sStore) ListVirtualService() (l7vs []*v1.VirtualService, l4vs []*v1.V
if !s.ingressIsValid(ing) {
continue
}
ingKey := k8s.MetaNamespaceKey(ing)
anns, err := s.GetIngressAnnotations(ingKey)
if err != nil {
logrus.Errorf("Error getting Ingress annotations %q: %v", ingKey, err)
}
if anns.L4.L4Enable && anns.L4.L4Port != 0 {
svcKey := fmt.Sprintf("%v/%v", ing.Namespace, ing.Spec.Backend.ServiceName)
// region l4
host := strings.Replace(anns.L4.L4Host, " ", "", -1)
if host == "" {
host = s.conf.IP
}
host = s.conf.IP
svcKey := fmt.Sprintf("%v/%v", ing.Namespace, ing.Spec.Backend.ServiceName)
protocol := s.GetServiceProtocol(svcKey, ing.Spec.Backend.ServicePort.IntVal)
listening := fmt.Sprintf("%s:%v", host, anns.L4.L4Port)
if string(protocol) == string(v1.ProtocolUDP) {
@ -521,11 +495,12 @@ func (s *k8sStore) ListVirtualService() (l7vs []*v1.VirtualService, l4vs []*v1.V
vs.Namespace = anns.Namespace
vs.ServiceID = anns.Labels["service_id"]
}
l4PoolMap[svcKey] = struct{}{}
l4PoolMap[ing.Spec.Backend.ServiceName] = struct{}{}
l4vsMap[listening] = vs
l4vs = append(l4vs, vs)
backend := backend{name: backendName, weight: anns.Weight.Weight}
l4PoolBackendMap[svcKey] = append(l4PoolBackendMap[svcKey], backend)
l4PoolBackendMap[ing.Spec.Backend.ServiceName] = append(l4PoolBackendMap[ing.Spec.Backend.ServiceName], backend)
// endregion
} else {
// region l7
@ -577,14 +552,15 @@ func (s *k8sStore) ListVirtualService() (l7vs []*v1.VirtualService, l4vs []*v1.V
vs.SSLCert = hostSSLMap[DefVirSrvName]
}
}
l7vsMap[virSrvName] = vs
l7vs = append(l7vs, vs)
}
for _, path := range rule.IngressRuleValue.HTTP.Paths {
svckey := fmt.Sprintf("%s/%s", ing.Namespace, path.Backend.ServiceName)
locKey := fmt.Sprintf("%s_%s", virSrvName, path.Path)
location := srvLocMap[locKey]
l7PoolMap[svckey] = struct{}{}
l7PoolMap[path.Backend.ServiceName] = struct{}{}
// if location do not exists, then creates a new one
if location == nil {
location = &v1.Location{
@ -593,10 +569,9 @@ func (s *k8sStore) ListVirtualService() (l7vs []*v1.VirtualService, l4vs []*v1.V
}
srvLocMap[locKey] = location
vs.Locations = append(vs.Locations, location)
// the first ingress proxy takes effect
location.Proxy = anns.Proxy
}
location.Proxy = anns.Proxy
// If their ServiceName is the same, then the new one will overwrite the old one.
nameCondition := &v1.Condition{}
var backendName string
@ -619,7 +594,7 @@ func (s *k8sStore) ListVirtualService() (l7vs []*v1.VirtualService, l4vs []*v1.V
if anns.UpstreamHashBy != "" {
backend.hashBy = anns.UpstreamHashBy
}
l7PoolBackendMap[svckey] = append(l7PoolBackendMap[svckey], backend)
l7PoolBackendMap[path.Backend.ServiceName] = append(l7PoolBackendMap[path.Backend.ServiceName], backend)
}
}
// endregion
@ -630,57 +605,47 @@ func (s *k8sStore) ListVirtualService() (l7vs []*v1.VirtualService, l4vs []*v1.V
// ingressIsValid checks if the specified ingress is valid
func (s *k8sStore) ingressIsValid(ing *extensions.Ingress) bool {
var svcKey string
var endpointKey string
if ing.Spec.Backend != nil { // stream
svcKey = fmt.Sprintf("%s/%s", ing.Namespace, ing.Spec.Backend.ServiceName)
endpointKey = fmt.Sprintf("%s/%s", ing.Namespace, ing.Spec.Backend.ServiceName)
} else { // http
Loop:
for _, rule := range ing.Spec.Rules {
for _, path := range rule.IngressRuleValue.HTTP.Paths {
svcKey = fmt.Sprintf("%s/%s", ing.Namespace, path.Backend.ServiceName)
if svcKey != "" {
endpointKey = fmt.Sprintf("%s/%s", ing.Namespace, path.Backend.ServiceName)
if endpointKey != "" {
break Loop
}
}
}
}
service, err := s.listers.Service.ByKey(svcKey)
item, exists, err := s.listers.Endpoint.GetByKey(endpointKey)
if err != nil {
logrus.Warningf("Key: %s; error getting key: %v", err)
logrus.Errorf("Can not get endpoint by key(%s): %v", endpointKey, err)
return false
}
labelname := fmt.Sprintf("name=%s", service.GetLabels()["name"])
selector, err := labels.Parse(labelname)
if err != nil {
logrus.Warningf("Label: %s;error parsing labels: %v", labelname, err)
if !exists {
logrus.Warningf("Endpoint \"%s\" does not exist.", endpointKey)
return false
}
eps, err := s.sharedInformer.Core().V1().Endpoints().Lister().Endpoints(service.GetNamespace()).List(selector)
if err != nil {
logrus.Warningf("selector: %s; error listing endpoints: %v",
fmt.Sprintf("name=%s", labelname), err)
endpoint, ok := item.(*corev1.Endpoints)
if !ok {
logrus.Errorf("Cant not convert %v to %v", reflect.TypeOf(item), reflect.TypeOf(endpoint))
return false
}
if eps == nil || len(eps) == 0 {
logrus.Warningf("Selector: %s; empty endpoints", fmt.Sprintf("name=%s", labelname))
if endpoint.Subsets == nil || len(endpoint.Subsets) == 0 {
logrus.Warningf("Endpoints(%s) is empty, ignore it", endpointKey)
return false
}
result := false
for _, ep := range eps {
if ep.Subsets != nil && len(ep.Subsets) > 0 {
e := ep.Subsets[0]
if ep.GetLabels()["service_kind"] != model.ServiceKindThirdParty.String() &&
e.Ports[0].Port != service.Spec.Ports[0].Port {
continue
}
if !((e.Addresses == nil || len(e.Addresses) == 0) && (e.NotReadyAddresses == nil || len(e.NotReadyAddresses) == 0)) {
result = true
break
}
for _, ep := range endpoint.Subsets {
if (ep.Addresses == nil || len(ep.Addresses) == 0) && (ep.NotReadyAddresses == nil || len(ep.NotReadyAddresses) == 0) {
logrus.Warningf("Endpoints(%s) is empty, ignore it", endpointKey)
return false
}
}
return result
return true
}
// GetIngress returns the Ingress matching key.
@ -701,20 +666,6 @@ func (s *k8sStore) ListIngresses() []*extensions.Ingress {
return ingresses
}
// GetServiceNameLabelByKey returns name in the labels of corev1.Service
// matching key(name/namespace).
func (s *k8sStore) GetServiceNameLabelByKey(key string) (string, error) {
svc, err := s.listers.Service.ByKey(key)
if err != nil {
return "", err
}
name, ok := svc.Labels["name"]
if !ok {
return "", fmt.Errorf("label \"name\" not found")
}
return name, nil
}
// GetServiceProtocol returns the Service matching key and port.
func (s *k8sStore) GetServiceProtocol(key string, port int32) corev1.Protocol {
svcs, err := s.listers.Service.ByKey(key)

View File

@ -20,7 +20,6 @@ package prober
import (
"context"
"encoding/json"
"errors"
"sync"
"time"
@ -39,12 +38,13 @@ type Prober interface {
CloseWatch(serviceName string, id string) error
Start()
AddServices(in []*v1.Service)
CheckAndAddService(in *v1.Service) bool
CheckIfExist(in *v1.Service) bool
SetServices([]*v1.Service)
GetServices() []*v1.Service
GetServiceHealth() map[string]*v1.HealthStatus
SetAndUpdateServices([]*v1.Service) error
AddAndUpdateServices([]*v1.Service) error
UpdateServiceProbe(service *v1.Service)
UpdateServicesProbe(services []*v1.Service)
Stop() error
DisableWatcher(serviceName, watcherID string)
@ -276,18 +276,13 @@ func (p *probeManager) AddServices(in []*v1.Service) {
}
// CheckAndAddService checks if the input service exists, if it does not exist, add it.
func (p *probeManager) CheckAndAddService(in *v1.Service) bool {
func (p *probeManager) CheckIfExist(in *v1.Service) bool {
exist := false
for _, svc := range p.services {
if svc.Name == in.Name {
exist = true
}
}
if !exist {
b, _ := json.Marshal(in)
logrus.Debugf("add service: %s", string(b))
p.services = append(p.services, in)
}
return exist
}
@ -300,9 +295,9 @@ func (p *probeManager) isNeedUpdate(new *v1.Service) bool {
}
if old == nil {
// not found old one
return false
return true
}
return new.Equal(old)
return !new.Equal(old)
}
func (p *probeManager) GetServiceHealth() map[string]*v1.HealthStatus {
@ -374,16 +369,35 @@ func (p *probeManager) updateAllServicesProbe() {
}
}
// UpdateServiceProbe updates and runs one service probe.
func (p *probeManager) UpdateServiceProbe(service *v1.Service) {
if service.ServiceHealth == nil || service.Disable || !p.isNeedUpdate(service) {
return
}
logrus.Debugf("Probe: %s; update probe.", service.Name)
// stop old probe
old := p.serviceProbe[service.Name]
if old != nil {
old.Stop()
}
// create new probe
serviceProbe := probe.CreateProbe(p.ctx, p.statusChan, service)
if serviceProbe != nil {
p.serviceProbe[service.Name] = serviceProbe
serviceProbe.Check()
}
}
// UpdateServicesProbe updates and runs services probe.
func (p *probeManager) UpdateServicesProbe(services []*v1.Service) {
if services == nil || len(services) == 0 {
logrus.Debug("Empty services")
return
}
oldSvcs := p.ListServicesBySid(services[0].Sid)
for _, v := range services {
if v.ServiceHealth == nil {
continue
}
if v.Disable {
continue
}
if !p.isNeedUpdate(v) {
delete(oldSvcs, v.Name)
if v.ServiceHealth == nil || v.Disable || !p.isNeedUpdate(v) {
continue
}
logrus.Debugf("Probe: %s; update probe.", v.Name)
@ -392,6 +406,9 @@ func (p *probeManager) UpdateServicesProbe(services []*v1.Service) {
if old != nil {
old.Stop()
}
if !p.CheckIfExist(v) {
p.services = append(p.services, v)
}
// create new probe
serviceProbe := probe.CreateProbe(p.ctx, p.statusChan, v)
if serviceProbe != nil {
@ -399,6 +416,20 @@ func (p *probeManager) UpdateServicesProbe(services []*v1.Service) {
serviceProbe.Check()
}
}
for _, svc := range oldSvcs {
p.StopProbes([]string{svc.Name})
}
}
// ListServicesBySid lists services corresponding to sid.
func (p *probeManager) ListServicesBySid(sid string) map[string]*v1.Service {
res := make(map[string]*v1.Service)
for _, svc := range p.services {
if svc.Sid == sid {
res[svc.Name] = svc
}
}
return res
}
// GetProbe returns a probe associated with name.
@ -413,6 +444,7 @@ func (p *probeManager) StopProbes(names []string) {
logrus.Debugf("Name: %s; Probe not found.", name)
continue
}
logrus.Debugf("Name: %s; Probe stopped", name)
probe.Stop()
delete(p.serviceProbe, name)
for idx, svc := range p.services {

View File

@ -20,7 +20,6 @@ package probe
import (
"context"
"github.com/Sirupsen/logrus"
"github.com/goodrain/rainbond/util/prober/types/v1"
)
@ -34,7 +33,6 @@ type Probe interface {
func CreateProbe(ctx context.Context, statusChan chan *v1.HealthStatus, v *v1.Service) Probe {
ctx, cancel := context.WithCancel(ctx)
if v.ServiceHealth.Model == "tcp" {
logrus.Debug("creat tcp probe...")
t := &TCPProbe{
Name: v.ServiceHealth.Name,
Address: v.ServiceHealth.Address,
@ -47,7 +45,6 @@ func CreateProbe(ctx context.Context, statusChan chan *v1.HealthStatus, v *v1.Se
return t
}
if v.ServiceHealth.Model == "http" {
logrus.Debug("creat http probe...")
t := &HTTPProbe{
Name: v.ServiceHealth.Name,
Address: v.ServiceHealth.Address,

View File

@ -32,7 +32,7 @@ func (h *TCPProbe) Stop() {
// TCPCheck -
func (h *TCPProbe) TCPCheck() {
logrus.Debug("tcp check...")
logrus.Debugf("TCP check; Name: %s; Address: %s", h.Name, h.Address)
timer := time.NewTimer(time.Second * time.Duration(h.TimeInterval))
defer timer.Stop()
for {

View File

@ -25,12 +25,13 @@ const (
StatHealthy string = "healthy"
// StatUnhealthy -
StatUnhealthy string = "unhealthy"
// StaTDeath -
// StatDeath -
StatDeath string = "death"
)
//Service Service
// Service Service
type Service struct {
Sid string `json:"service_id"`
Name string `json:"name"`
ServiceHealth *Health `json:"health"`
Disable bool `json:"disable"`
@ -41,13 +42,16 @@ func (l *Service) Equal(r *Service) bool {
if l == r {
return true
}
if l.Sid != r.Sid {
return false
}
if l.Name != r.Name {
return false
}
if l.Disable != r.Disable {
return false
}
if l.ServiceHealth != r.ServiceHealth {
if !l.ServiceHealth.Equal(r.ServiceHealth) {
return false
}
return true
@ -57,6 +61,7 @@ func (l *Service) Equal(r *Service) bool {
type Health struct {
Name string `json:"name"`
Model string `json:"model"`
IP string `json:"ip"`
Port int `json:"port"`
Address string `json:"address"`
TimeInterval int `json:"time_interval"`
@ -74,6 +79,9 @@ func (l *Health) Equal(r *Health) bool {
if l.Model != r.Model {
return false
}
if l.IP != r.IP {
return false
}
if l.Port != r.Port {
return false
}

View File

@ -53,7 +53,7 @@ func ApplyOne(clientset *kubernetes.Clientset, app *v1.AppService) error {
}
// update endpoints
for _, ep := range app.GetEndpoints() {
ensureEndpoints(ep, clientset)
EnsureEndpoints(ep, clientset)
}
// update ingress
for _, ing := range app.GetIngress() {
@ -140,9 +140,9 @@ func ensureSecret(secret *corev1.Secret, clientSet kubernetes.Interface) {
}
}
func ensureEndpoints(ep *corev1.Endpoints, clientSet kubernetes.Interface) {
// EnsureEndpoints creates or updates endpoints.
func EnsureEndpoints(ep *corev1.Endpoints, clientSet kubernetes.Interface) {
_, err := clientSet.CoreV1().Endpoints(ep.Namespace).Update(ep)
if err != nil {
if k8sErrors.IsNotFound(err) {
_, err := clientSet.CoreV1().Endpoints(ep.Namespace).Create(ep)
@ -259,3 +259,62 @@ func UpgradeSecrets(clientset *kubernetes.Clientset,
}
return nil
}
// UpgradeEndpoints is used to update *corev1.Endpoints.
func UpgradeEndpoints(clientset *kubernetes.Clientset,
as *v1.AppService, old, new []*corev1.Endpoints,
handleErr func(msg string, err error) error) error {
var oldMap = make(map[string]*corev1.Endpoints, len(old))
for i, item := range old {
oldMap[item.Name] = old[i]
}
for _, n := range new {
if o, ok := oldMap[n.Name]; ok {
ep, err := clientset.CoreV1().Endpoints(n.Namespace).Update(n)
if err != nil {
if e := handleErr(fmt.Sprintf("error updating endpoints: %+v: err: %v",
ep, err), err); e != nil {
return e
}
continue
}
as.AddEndpoints(ep)
delete(oldMap, o.Name)
logrus.Debugf("ServiceID: %s; successfully update endpoints: %s", as.ServiceID, ep.Name)
} else {
_, err := clientset.CoreV1().Endpoints(n.Namespace).Create(n)
if err != nil {
if err := handleErr(fmt.Sprintf("error creating endpoints: %+v: err: %v",
n, err), err); err != nil {
return err
}
continue
}
as.AddEndpoints(n)
logrus.Debugf("ServiceID: %s; successfully create endpoints: %s", as.ServiceID, n.Name)
}
}
for _, sec := range oldMap {
if sec != nil {
if err := clientset.CoreV1().Endpoints(sec.Namespace).Delete(sec.Name, &metav1.DeleteOptions{}); err != nil {
if err := handleErr(fmt.Sprintf("error deleting endpoints: %+v: err: %v",
sec, err), err); err != nil {
return err
}
continue
}
logrus.Debugf("ServiceID: %s; successfully delete endpoints: %s", as.ServiceID, sec.Name)
}
}
return nil
}
// UpdateEndpoints uses clientset to update the given Endpoints.
func UpdateEndpoints(ep *corev1.Endpoints, clientSet *kubernetes.Clientset) {
_, err := clientSet.CoreV1().Endpoints(ep.Namespace).Update(ep)
if err != nil {
logrus.Warningf("error updating endpoints: %+v; err: %v", ep, err)
return
}
logrus.Debugf("Key: %s/%s; Successfully update endpoints", ep.GetNamespace(), ep.GetName())
}

View File

@ -21,8 +21,6 @@ package prober
import (
"context"
"fmt"
"strconv"
"strings"
"github.com/Sirupsen/logrus"
"github.com/eapache/channels"
@ -41,9 +39,8 @@ import (
type Prober interface {
Start()
Stop()
AddProbe(ep *corev1.Endpoints)
UpdateProbe(ep *corev1.Endpoints)
StopProbe(ep *corev1.Endpoints)
UpdateProbes(info []*store.ProbeInfo)
StopProbe(uuids []string)
}
// NewProber creates a new third-party service prober.
@ -101,14 +98,14 @@ func (t *tpProbe) Start() {
evt := event.(store.Event)
switch evt.Type {
case store.CreateEvent:
ep := evt.Obj.(*corev1.Endpoints)
t.AddProbe(ep)
infos := evt.Obj.([]*store.ProbeInfo)
t.UpdateProbes(infos)
case store.UpdateEvent:
new := evt.Obj.(*corev1.Endpoints)
t.UpdateProbe(new)
infos := evt.Obj.([]*store.ProbeInfo)
t.UpdateProbes(infos)
case store.DeleteEvent:
ep := evt.Obj.(*corev1.Endpoints)
t.StopProbe(ep)
uuids := evt.Obj.([]string)
t.StopProbe(uuids)
}
case <-t.ctx.Done():
return
@ -122,93 +119,83 @@ func (t *tpProbe) Stop() {
t.cancel()
}
func (t *tpProbe) AddProbe(ep *corev1.Endpoints) {
service, probeInfo, sid := t.createServices(ep)
if service == nil {
logrus.Debugf("Empty service, stop creating probe")
return
}
ip := ep.GetLabels()["ip"]
port, _ := strconv.Atoi(ep.GetLabels()["port"])
// watch
if t.utilprober.CheckAndAddService(service) {
logrus.Debugf("Service: %+v; Exists", service)
return
}
go func(service *v1.Service) {
watcher := t.utilprober.WatchServiceHealthy(service.Name)
t.utilprober.EnableWatcher(watcher.GetServiceName(), watcher.GetID())
defer watcher.Close()
defer t.utilprober.DisableWatcher(watcher.GetServiceName(), watcher.GetID())
for {
select {
case event := <-watcher.Watch():
if event == nil {
return
}
switch event.Status {
case v1.StatHealthy:
obj := &appmv1.RbdEndpoint{
UUID: service.Name,
IP: ip,
Port: port,
Sid: sid,
func (t *tpProbe) UpdateProbes(infos []*store.ProbeInfo) {
var services []*v1.Service
for _, info := range infos {
service, probeInfo := t.createServices(info)
if service == nil {
logrus.Debugf("Empty service, stop creating probe")
continue
}
services = append(services, service)
// watch
if t.utilprober.CheckIfExist(service) {
continue
}
go func(service *v1.Service, info *store.ProbeInfo) {
watcher := t.utilprober.WatchServiceHealthy(service.Name)
t.utilprober.EnableWatcher(watcher.GetServiceName(), watcher.GetID())
defer watcher.Close()
defer t.utilprober.DisableWatcher(watcher.GetServiceName(), watcher.GetID())
for {
select {
case event := <-watcher.Watch():
if event == nil {
return
}
t.updateCh.In() <- discovery.Event{
Type: discovery.HealthEvent,
Obj: obj,
}
case v1.StatDeath, v1.StatUnhealthy:
if event.ErrorNumber > service.ServiceHealth.MaxErrorsNum {
if probeInfo.Mode == model.OfflineFailureAction.String() {
obj := &appmv1.RbdEndpoint{
UUID: service.Name,
IP: ip,
Port: port,
Sid: sid,
}
t.updateCh.In() <- discovery.Event{
Type: discovery.DeleteEvent,
Obj: obj,
}
} else {
obj := &appmv1.RbdEndpoint{
UUID: service.Name,
IP: ip,
Port: port,
Sid: sid,
}
t.updateCh.In() <- discovery.Event{
Type: discovery.UnhealthyEvent,
Obj: obj,
switch event.Status {
case v1.StatHealthy:
obj := &appmv1.RbdEndpoint{
UUID: info.UUID,
IP: info.IP,
Port: int(info.Port),
Sid: info.Sid,
}
t.updateCh.In() <- discovery.Event{
Type: discovery.HealthEvent,
Obj: obj,
}
case v1.StatDeath, v1.StatUnhealthy:
if event.ErrorNumber > service.ServiceHealth.MaxErrorsNum {
if probeInfo.Mode == model.OfflineFailureAction.String() {
obj := &appmv1.RbdEndpoint{
UUID: info.UUID,
IP: info.IP,
Port: int(info.Port),
Sid: info.Sid,
}
t.updateCh.In() <- discovery.Event{
Type: discovery.DeleteEvent,
Obj: obj,
}
} else {
obj := &appmv1.RbdEndpoint{
UUID: info.UUID,
IP: info.IP,
Port: int(info.Port),
Sid: info.Sid,
}
t.updateCh.In() <- discovery.Event{
Type: discovery.UnhealthyEvent,
Obj: obj,
}
}
}
}
case <-t.ctx.Done():
// TODO: should stop for one service, not all services.
return
}
case <-t.ctx.Done():
// TODO: should stop for one service, not all services.
return
}
}
}(service)
// start
t.utilprober.UpdateServicesProbe([]*v1.Service{service})
}
func (t *tpProbe) UpdateProbe(ep *corev1.Endpoints) {
service, _, _ := t.createServices(ep)
if service == nil {
logrus.Debugf("Empty service, stop updating probe")
return
}(service, info)
}
t.utilprober.UpdateServicesProbe([]*v1.Service{service})
t.utilprober.UpdateServicesProbe(services)
}
func (t *tpProbe) StopProbe(ep *corev1.Endpoints) {
name := t.createServiceNames(ep)
logrus.Debugf("Name: %s; Stop probe.", name)
t.utilprober.StopProbes([]string{name})
func (t *tpProbe) StopProbe(uuids []string) {
for _, name := range uuids {
t.utilprober.StopProbes([]string{name})
}
}
// GetProbeInfo returns probe info associated with sid.
@ -231,31 +218,23 @@ func (t *tpProbe) GetProbeInfo(sid string) (*model.TenantServiceProbe, error) {
return probes[0], nil
}
func (t *tpProbe) createServices(ep *corev1.Endpoints) (*v1.Service, *model.TenantServiceProbe, string) {
sid := ep.GetLabels()["service_id"]
ip := ep.GetLabels()["ip"]
port := ep.GetLabels()["port"]
if strings.TrimSpace(sid) == "" {
logrus.Warningf("Endpoints key: %s; ServiceID not found, stop creating probe",
fmt.Sprintf("%s/%s", ep.Namespace, ep.Name))
return nil, nil, ""
}
probeInfo, err := t.GetProbeInfo(sid)
func (t *tpProbe) createServices(probeInfo *store.ProbeInfo) (*v1.Service, *model.TenantServiceProbe) {
tsp, err := t.GetProbeInfo(probeInfo.Sid)
if err != nil {
logrus.Warningf("ServiceID: %s; Unexpected error occurred, ignore the creation of "+
"probes: %s", sid, err.Error())
return nil, nil, ""
"probes: %s", probeInfo.Sid, err.Error())
return nil, nil
}
if probeInfo.Mode == "liveness" {
probeInfo.Mode = model.IgnoreFailureAction.String()
if tsp.Mode == "liveness" {
tsp.Mode = model.IgnoreFailureAction.String()
}
service := createService(probeInfo)
service.Name = ep.GetLabels()["uuid"]
portint, _ := strconv.Atoi(port)
service.ServiceHealth.Port = portint
service.ServiceHealth.Name = service.Name // TODO: no need?
service.ServiceHealth.Address = fmt.Sprintf("%s:%s", ip, port)
return service, probeInfo, sid
service := createService(tsp)
service.Sid = probeInfo.Sid
service.Name = probeInfo.UUID
service.ServiceHealth.Port = int(probeInfo.Port)
service.ServiceHealth.Name = service.Name
service.ServiceHealth.Address = fmt.Sprintf("%s:%d", probeInfo.IP, probeInfo.Port)
return service, tsp
}
func (t *tpProbe) createServiceNames(ep *corev1.Endpoints) string {

View File

@ -20,6 +20,7 @@ package store
import (
"context"
"encoding/json"
"fmt"
"sync"
"time"
@ -78,7 +79,14 @@ const (
type Event struct {
Type EventType
Obj interface{}
Old interface{}
}
// ProbeInfo holds the context of a probe.
type ProbeInfo struct {
Sid string `json:"sid"`
UUID string `json:"uuid"`
IP string `json:"ip"`
Port int32 `json:"port"`
}
//appRuntimeStore app runtime store
@ -154,7 +162,6 @@ func NewStore(clientset *kubernetes.Clientset,
epEventHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
ep := obj.(*corev1.Endpoints)
logrus.Debugf("received add endpoints: %+v", ep)
serviceID := ep.Labels["service_id"]
version := ep.Labels["version"]
createrID := ep.Labels["creater_id"]
@ -165,10 +172,12 @@ func NewStore(clientset *kubernetes.Clientset,
}
if appservice != nil {
appservice.AddEndpoints(ep)
if isThirdParty(ep) {
if isThirdParty(ep) && ep.Subsets != nil && len(ep.Subsets) > 0 {
logrus.Debugf("received add endpoints: %+v", ep)
probeInfos := listProbeInfos(ep, serviceID)
probeCh.In() <- Event{
Type: CreateEvent,
Obj: obj,
Obj: probeInfos,
}
}
return
@ -177,7 +186,6 @@ func NewStore(clientset *kubernetes.Clientset,
},
DeleteFunc: func(obj interface{}) {
ep := obj.(*corev1.Endpoints)
logrus.Debugf("received delete endpoints: %+v", ep)
serviceID := ep.Labels["service_id"]
version := ep.Labels["version"]
createrID := ep.Labels["creater_id"]
@ -190,9 +198,14 @@ func NewStore(clientset *kubernetes.Clientset,
store.DeleteAppService(appservice)
}
if isThirdParty(ep) {
logrus.Debugf("received delete endpoints: %+v", ep)
var uuids []string
for _, item := range ep.Subsets {
uuids = append(uuids, item.Ports[0].Name)
}
probeCh.In() <- Event{
Type: DeleteEvent,
Obj: obj,
Obj: uuids,
}
}
}
@ -212,10 +225,10 @@ func NewStore(clientset *kubernetes.Clientset,
if appservice != nil {
appservice.AddEndpoints(cep)
if isThirdParty(cep) {
curInfos := listProbeInfos(cep, serviceID)
probeCh.In() <- Event{
Type: UpdateEvent,
Obj: cur,
Old: old,
Obj: curInfos,
}
}
}
@ -235,6 +248,67 @@ func NewStore(clientset *kubernetes.Clientset,
return store
}
func listProbeInfos(ep *corev1.Endpoints, sid string) []*ProbeInfo {
var probeInfos []*ProbeInfo
for _, subset := range ep.Subsets {
uuid := subset.Ports[0].Name
port := subset.Ports[0].Port
for _, address := range subset.NotReadyAddresses {
info := &ProbeInfo{
Sid: sid,
UUID: uuid,
IP: address.IP,
Port: port,
}
probeInfos = append(probeInfos, info)
}
for _, address := range subset.Addresses {
info := &ProbeInfo{
Sid: sid,
UUID: uuid,
IP: address.IP,
Port: port,
}
probeInfos = append(probeInfos, info)
}
}
return probeInfos
}
func upgradeProbe(ch chan<- interface{}, old, cur []*ProbeInfo) {
ob, _ := json.Marshal(old)
cb, _ := json.Marshal(cur)
logrus.Debugf("Old probe infos: %s", string(ob))
logrus.Debugf("Current probe infos: %s", string(cb))
oldMap := make(map[string]*ProbeInfo, len(old))
for i := 0; i < len(old); i++ {
oldMap[old[i].UUID] = old[i]
}
for _, c := range cur {
if info := oldMap[c.UUID]; info != nil {
delete(oldMap, c.UUID)
logrus.Debugf("UUID: %s; update probe", c.UUID)
ch <- Event{
Type: UpdateEvent,
Obj: c,
}
} else {
logrus.Debugf("UUID: %s; create probe", c.UUID)
ch <- Event{
Type: CreateEvent,
Obj: []*ProbeInfo{c},
}
}
}
for _, info := range oldMap {
logrus.Debugf("UUID: %s; delete probe", info.UUID)
ch <- Event{
Type: DeleteEvent,
Obj: info,
}
}
}
func (a *appRuntimeStore) init() error {
//init leader namespace
leaderNamespace := a.conf.LeaderElectionNamespace

View File

@ -35,9 +35,11 @@ const (
// UpdateEvent event associated with an object update in a service discovery center
UpdateEvent EventType = "UPDATE"
// DeleteEvent event associated when an object is removed from a service discovery center
DeleteEvent EventType = "DELETE"
DeleteEvent EventType = "DELETE"
// UnhealthyEvent -
UnhealthyEvent EventType = "UNHEALTHY"
HealthEvent EventType = "HEALTH"
// HealthEvent -
HealthEvent EventType = "HEALTH"
)
// Event holds the context of an event.

View File

@ -21,19 +21,18 @@ package thirdparty
import (
"encoding/json"
"fmt"
"github.com/goodrain/rainbond/worker/util"
"strconv"
"sync"
"github.com/Sirupsen/logrus"
"github.com/eapache/channels"
"github.com/goodrain/rainbond/db"
"github.com/goodrain/rainbond/db/model"
"github.com/goodrain/rainbond/worker/appm/f"
"github.com/goodrain/rainbond/worker/appm/store"
"github.com/goodrain/rainbond/worker/appm/thirdparty/discovery"
"github.com/goodrain/rainbond/worker/appm/types/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
k8sErrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
)
@ -63,7 +62,7 @@ func NewThirdPartier(clientset *kubernetes.Clientset,
}
type thirdparty struct {
clientset kubernetes.Interface
clientset *kubernetes.Clientset
store store.Storer
// a collection of stop channel for every service.
@ -87,13 +86,14 @@ func (t *thirdparty) Start() {
}
logrus.Debugf("Received event: %+v", evt)
if evt.Type == v1.StartEvent { // no need to distinguish between event types
needWatch := false
stopCh := t.svcStopCh[evt.Sid]
if stopCh != nil {
if stopCh == nil {
logrus.Debugf("ServiceID: %s; already started.", evt.Sid)
continue
needWatch = true
t.svcStopCh[evt.Sid] = make(chan struct{})
}
t.svcStopCh[evt.Sid] = make(chan struct{})
go t.runStart(evt.Sid)
go t.runStart(evt.Sid, needWatch)
}
if evt.Type == v1.StopEvent {
stopCh := t.svcStopCh[evt.Sid]
@ -122,37 +122,33 @@ func (t *thirdparty) Start() {
}()
}
func (t *thirdparty) runStart(sid string) {
logrus.Debugf("ServiceID: %s; run start...", sid)
func (t *thirdparty) runStart(sid string, needWatch bool) {
as := t.store.GetAppService(sid)
// TODO: when an error occurs, consider retrying.
rbdeps, d := t.ListRbdEndpoints(sid)
b, _ := json.Marshal(rbdeps)
logrus.Debugf("ServiceID: %s; rbd endpoints: %+v", sid, string(b))
if rbdeps == nil || len(rbdeps) == 0 {
logrus.Warningf("ServiceID: %s;Empty rbd endpoints, stop starting third-party service.", sid)
var err error
for i := 3; i > 0; i-- { // retry 3 times
rbdeps, ir := t.ListRbdEndpoints(sid)
if rbdeps == nil || len(rbdeps) == 0 {
logrus.Warningf("ServiceID: %s;Empty rbd endpoints, stop starting third-party service.", sid)
continue
}
var eps []*corev1.Endpoints
eps, err = t.k8sEndpoints(as, rbdeps)
if err != nil {
logrus.Warningf("ServiceID: %s; error creating k8s endpoints: %s", sid, err.Error())
continue
}
for _, ep := range eps {
f.EnsureEndpoints(ep, t.clientset)
}
if needWatch && ir != nil {
ir.Watch()
}
logrus.Infof("ServiceID: %s; successfully running start task", sid)
return
}
eps, err := t.createK8sEndpoints(as, rbdeps)
if err != nil {
logrus.Errorf("ServiceID: %s; error creating k8s endpoints: %s", sid, err.Error())
return
}
// find out old endpoints, and delete it.
old := as.GetEndpoints()
// TODO: can do better
del := findDeletedEndpoints(old, eps)
for _, ep := range del {
deleteEndpoints(ep, t.clientset)
}
for _, ep := range eps {
ensureEndpoints(ep, t.clientset)
}
if d != nil {
d.Watch()
}
logrus.Errorf("ServiceID: %s; error running start task: %v", sid, err)
}
// ListRbdEndpoints lists all rbd endpoints, include static and dynamic.
@ -180,28 +176,20 @@ func (t *thirdparty) ListRbdEndpoints(sid string) ([]*v1.RbdEndpoint, Interacter
return res, d
}
func findDeletedEndpoints(old, new []*corev1.Endpoints) []*corev1.Endpoints {
if old == nil {
logrus.Debugf("empty old endpoints.")
return nil
}
var res []*corev1.Endpoints
for _, o := range old {
del := true
for _, n := range new {
if o.Name == n.Name {
del = false
break
func deleteSubset(as *v1.AppService, rbdep *v1.RbdEndpoint) {
eps := as.GetEndpoints()
for _, ep := range eps {
for idx, item := range ep.Subsets {
if item.Ports[0].Name == rbdep.UUID {
logrus.Debugf("UUID: %s; subset deleted", rbdep.UUID)
ep.Subsets[idx] = ep.Subsets[len(ep.Subsets)-1]
ep.Subsets = ep.Subsets[:len(ep.Subsets)-1]
}
}
if del {
res = append(res, o)
}
}
return res
}
func (t *thirdparty) createK8sEndpoints(as *v1.AppService, epinfo []*v1.RbdEndpoint) ([]*corev1.Endpoints, error) {
func (t *thirdparty) k8sEndpoints(as *v1.AppService, epinfo []*v1.RbdEndpoint) ([]*corev1.Endpoints, error) {
ports, err := db.GetManager().TenantServicesPortDao().GetPortsByServiceID(as.ServiceID)
if err != nil {
return nil, err
@ -212,258 +200,201 @@ func (t *thirdparty) createK8sEndpoints(as *v1.AppService, epinfo []*v1.RbdEndpo
}
p := ports[0]
logrus.Debugf("create outer third-party service")
f := func() []*corev1.Endpoints {
var eps []*corev1.Endpoints
for _, epi := range epinfo {
port, realport := func(targetPort int, realPort int) (int32, bool) { // final port
if realPort == 0 {
return int32(targetPort), false
}
return int32(realPort), true
}(p.ContainerPort, epi.Port)
ep := corev1.Endpoints{}
ep.Namespace = as.TenantID
if p.IsInnerService {
ep.Name = util.CreateEndpointsName(as.TenantName, as.ServiceAlias, epi.UUID)
ep.Labels = as.GetCommonLabels(map[string]string{
"name": as.ServiceAlias + "Service",
"service-kind": model.ServiceKindThirdParty.String(),
})
}
if p.IsOuterService {
ep.Name = util.CreateEndpointsName(as.TenantName, as.ServiceAlias, epi.UUID) + "out"
ep.Labels = as.GetCommonLabels(map[string]string{
"name": as.ServiceAlias + "ServiceOUT",
"service-kind": model.ServiceKindThirdParty.String(),
})
}
ep.Labels["uuid"] = epi.UUID
ep.Labels["ip"] = epi.IP
ep.Labels["port"] = strconv.Itoa(int(port))
ep.Labels["real-port"] = strconv.FormatBool(realport)
subset := corev1.EndpointSubset{
Ports: []corev1.EndpointPort{
{
Port: port,
},
},
Addresses: []corev1.EndpointAddress{
{
IP: epi.IP,
},
},
}
ep.Subsets = append(ep.Subsets, subset)
eps = append(eps, &ep)
}
return eps
}
var res []*corev1.Endpoints
if p.IsInnerService {
res = append(res, f()...)
ep := &corev1.Endpoints{}
ep.Namespace = as.TenantID
// inner or outer
if p.IsInnerService {
ep.Name = fmt.Sprintf("service-%d-%d", p.ID, p.ContainerPort)
ep.Labels = as.GetCommonLabels(map[string]string{
"name": as.ServiceAlias + "Service",
"service-kind": model.ServiceKindThirdParty.String(),
})
}
res = append(res, ep)
}
if p.IsOuterService {
res = append(res, f()...)
ep := &corev1.Endpoints{}
ep.Namespace = as.TenantID
// inner or outer
if p.IsOuterService {
ep.Name = fmt.Sprintf("service-%d-%dout", p.ID, p.ContainerPort)
ep.Labels = as.GetCommonLabels(map[string]string{
"name": as.ServiceAlias + "ServiceOUT",
"service-kind": model.ServiceKindThirdParty.String(),
})
}
res = append(res, ep)
}
var subsets []corev1.EndpointSubset
for _, epi := range epinfo {
subset := corev1.EndpointSubset{
Ports: []corev1.EndpointPort{
{
Name: epi.UUID,
Port: func(targetPort int, realPort int) int32 {
if realPort == 0 {
return int32(targetPort)
}
return int32(realPort)
}(p.ContainerPort, epi.Port),
Protocol: corev1.ProtocolTCP,
},
},
Addresses: []corev1.EndpointAddress{
{
IP: epi.IP,
},
},
}
subsets = append(subsets, subset)
}
for _, item := range res {
item.Subsets = subsets
}
return res, nil
}
func deleteEndpoints(ep *corev1.Endpoints, clientset kubernetes.Interface) {
err := clientset.CoreV1().Endpoints(ep.Namespace).Delete(ep.Name, &metav1.DeleteOptions{})
func updateSubset(as *v1.AppService, rbdep *v1.RbdEndpoint) error {
ports, err := db.GetManager().TenantServicesPortDao().GetPortsByServiceID(as.ServiceID)
if err != nil {
logrus.Debugf("Ignore; error deleting endpoints%+v: %v", ep, err)
return err
}
logrus.Debugf("Delete endpoints: %+v", ep)
}
func ensureEndpoints(ep *corev1.Endpoints, clientSet kubernetes.Interface) {
logrus.Debugf("ensure endpoints: %+v", ep)
_, err := clientSet.CoreV1().Endpoints(ep.Namespace).Update(ep)
if err != nil {
if k8sErrors.IsNotFound(err) {
_, err := clientSet.CoreV1().Endpoints(ep.Namespace).Create(ep)
if err != nil {
logrus.Warningf("error creating endpoints %+v: %v", ep, err)
// third-party service can only have one port
if ports == nil || len(ports) == 0 {
return fmt.Errorf("Port not found")
}
p := ports[0]
subset := corev1.EndpointSubset{
Ports: []corev1.EndpointPort{
{
Name: rbdep.UUID,
Port: func(targetPort int, realPort int) int32 {
if realPort == 0 {
return int32(targetPort)
}
return int32(realPort)
}(p.ContainerPort, rbdep.Port),
Protocol: corev1.ProtocolTCP,
},
},
Addresses: []corev1.EndpointAddress{
{
IP: rbdep.IP,
},
},
}
for _, ep := range as.GetEndpoints() {
exist := false
for idx, item := range ep.Subsets {
if item.Ports[0].Name == subset.Ports[0].Name {
ep.Subsets[idx] = item
exist = true
break
}
return
}
logrus.Warningf("error updating endpoints %+v: %v", ep, err)
}
}
func ensureConfigMap(cm *corev1.ConfigMap, clientSet kubernetes.Interface) {
_, err := clientSet.CoreV1().ConfigMaps(cm.Namespace).Update(cm)
if err != nil {
if k8sErrors.IsNotFound(err) {
_, err := clientSet.CoreV1().ConfigMaps(cm.Namespace).Create(cm)
if err != nil {
logrus.Warningf("error creating ConfigMaps %+v: %v", cm, err)
}
return
if !exist {
ep.Subsets = append(ep.Subsets, subset)
}
logrus.Warningf("error updating ConfigMaps %+v: %v", cm, err)
}
return nil
}
func (t *thirdparty) runUpdate(event discovery.Event) {
ep := event.Obj.(*v1.RbdEndpoint)
as := t.store.GetAppService(ep.Sid)
b, _ := json.Marshal(ep)
switch event.Type {
case discovery.CreateEvent:
logrus.Debugf("Run update; Event received: Type: %v; Body: %s", event.Type, string(b))
endpoints, err := t.createK8sEndpoints(as, []*v1.RbdEndpoint{ep})
if err != nil {
logrus.Warningf("ServiceID: %s; error creating k8s endpoints struct: %s",
ep.Sid, err.Error())
return
}
for _, ep := range endpoints {
ensureEndpoints(ep, t.clientset)
}
case discovery.UpdateEvent:
logrus.Debugf("Run update; Event received: Type: %v; Body: %s", event.Type, string(b))
// TODO: Compare old and new endpoints
// TODO: delete old endpoints
if !ep.IsOnline {
eps := ListOldEndpoints(as, ep)
for _, item := range eps {
deleteEndpoints(item, t.clientset)
fc := func(as *v1.AppService, rbdep *v1.RbdEndpoint, ready bool, msg string, condition func(subset corev1.EndpointSubset) bool) {
var wait sync.WaitGroup
go func() {
wait.Add(1)
defer wait.Done()
for _, ep := range as.GetEndpoints() {
for idx, subset := range ep.Subsets {
if subset.Ports[0].Name == rbdep.UUID && condition(subset) {
logrus.Debugf("Executed; health: %v; msg: %s", ready, msg)
ep.Subsets[idx] = createSubset(rbdep, ready)
f.UpdateEndpoints(ep, t.clientset)
}
}
}
return
}
endpoints, err := t.createK8sEndpoints(as, []*v1.RbdEndpoint{ep})
}()
wait.Wait()
}
rbdep := event.Obj.(*v1.RbdEndpoint)
as := t.store.GetAppService(rbdep.Sid)
b, _ := json.Marshal(rbdep)
msg := fmt.Sprintf("Run update; Event received: Type: %v; Body: %s", event.Type, string(b))
switch event.Type {
case discovery.CreateEvent, discovery.UpdateEvent:
logrus.Debug(msg)
err := updateSubset(as, rbdep)
if err != nil {
logrus.Warningf("ServiceID: %s; error creating k8s endpoints struct: %s",
ep.Sid, err.Error())
logrus.Warningf("ServiceID: %s; error adding subset: %s",
rbdep.Sid, err.Error())
return
}
for _, ep := range endpoints {
ensureEndpoints(ep, t.clientset)
}
_ = f.UpgradeEndpoints(t.clientset, as, as.GetEndpoints(), as.GetEndpoints(),
func(msg string, err error) error {
logrus.Warning(msg)
return nil
})
case discovery.DeleteEvent:
logrus.Debugf("Run update; Event received: Type: %v; Body: %s", event.Type, string(b))
eps := ListOldEndpoints(as, ep)
for _, item := range eps {
deleteEndpoints(item, t.clientset)
}
logrus.Debug(msg)
deleteSubset(as, rbdep)
eps := as.GetEndpoints()
_ = f.UpgradeEndpoints(t.clientset, as, as.GetEndpoints(), eps,
func(msg string, err error) error {
logrus.Warning(msg)
return nil
})
case discovery.HealthEvent:
subset := createSubset(ep, false)
eps := ListOldEndpoints(as, ep)
for _, ep := range eps {
if !isHealth(ep) {
ep.Subsets = []corev1.EndpointSubset{
subset,
}
logrus.Debugf("Run update; Event received: Type: %v; Body: %s", event.Type, string(b))
ensureEndpoints(ep, t.clientset)
}
isUnhealthy := func(subset corev1.EndpointSubset) bool {
return !isHealthy(subset)
}
fc(as, rbdep, true, msg, isUnhealthy)
case discovery.UnhealthyEvent:
subset := createSubset(ep, true)
eps := ListOldEndpoints(as, ep)
for _, ep := range eps {
if isHealth(ep) {
ep.Subsets = []corev1.EndpointSubset{
subset,
}
logrus.Debugf("Run update; Event received: Type: %v; Body: %s", event.Type, string(b))
ensureEndpoints(ep, t.clientset)
}
}
fc(as, rbdep, false, msg, isHealthy)
}
}
func (t *thirdparty) runDelete(sid string) {
as := t.store.GetAppService(sid) // TODO: need to delete?
if services := as.GetServices(); services != nil {
for _, service := range services {
err := t.clientset.CoreV1().Services(as.TenantID).Delete(service.Name, &metav1.DeleteOptions{})
if err != nil && !errors.IsNotFound(err) {
logrus.Warningf("error deleting service: %v", err)
}
}
}
if secrets := as.GetSecrets(); secrets != nil {
for _, secret := range secrets {
if secret != nil {
err := t.clientset.CoreV1().Secrets(as.TenantID).Delete(secret.Name, &metav1.DeleteOptions{})
if err != nil && !errors.IsNotFound(err) {
logrus.Warningf("error deleting secrets: %v", err)
}
t.store.OnDelete(secret)
}
}
}
if ingresses := as.GetIngress(); ingresses != nil {
for _, ingress := range ingresses {
err := t.clientset.ExtensionsV1beta1().Ingresses(as.TenantID).Delete(ingress.Name, &metav1.DeleteOptions{})
if err != nil && !errors.IsNotFound(err) {
logrus.Warningf("error deleting ingress: %v", err)
}
t.store.OnDelete(ingress)
}
}
if configs := as.GetConfigMaps(); configs != nil {
for _, config := range configs {
err := t.clientset.CoreV1().ConfigMaps(as.TenantID).Delete(config.Name, &metav1.DeleteOptions{})
if err != nil && !errors.IsNotFound(err) {
logrus.Warningf("error deleting config map: %v", err)
}
t.store.OnDelete(config)
}
}
if eps := as.GetEndpoints(); eps != nil {
for _, ep := range eps {
logrus.Debugf("Endpoints delete: %+v", ep)
err := t.clientset.CoreV1().Endpoints(as.TenantID).Delete(ep.Name, &metav1.DeleteOptions{})
if err != nil && !errors.IsNotFound(err) {
logrus.Warningf("error deleting endpoint empty old app servicets: %v", err)
logrus.Warningf("error deleting endpoint empty old app endpoints: %v", err)
}
t.store.OnDelete(ep)
}
}
}
func ListOldEndpoints(as *v1.AppService, ep *v1.RbdEndpoint) []*corev1.Endpoints {
var res []*corev1.Endpoints
for _, item := range as.GetEndpoints() {
if item.GetLabels()["uuid"] == ep.UUID {
res = append(res, item)
}
}
return res
}
func createSubset(ep *v1.RbdEndpoint, notReady bool) corev1.EndpointSubset {
func createSubset(ep *v1.RbdEndpoint, ready bool) corev1.EndpointSubset {
address := corev1.EndpointAddress{
IP: ep.IP,
}
port := corev1.EndpointPort{
Port: int32(ep.Port),
Name: ep.UUID,
Port: int32(ep.Port),
Protocol: corev1.ProtocolTCP,
}
subset := corev1.EndpointSubset{}
if notReady {
subset.NotReadyAddresses = append(subset.Addresses, address)
} else {
if ready {
subset.Addresses = append(subset.Addresses, address)
} else {
subset.NotReadyAddresses = append(subset.NotReadyAddresses, address)
}
subset.Ports = append(subset.Ports, port)
return subset
}
func isHealth(ep *corev1.Endpoints) bool {
if ep.Subsets == nil || len(ep.Subsets) == 0 {
return false
}
if ep.Subsets[0].Addresses != nil && len(ep.Subsets[0].Addresses) > 0 {
func isHealthy(subset corev1.EndpointSubset) bool {
if subset.Addresses != nil && len(subset.Addresses) > 0 {
return true
}
return false

View File

@ -615,7 +615,8 @@ func (ctrl *ProvisionController) processNextClaimWorkItem() bool {
if err := ctrl.syncClaimHandler(key); err != nil {
if ctrl.claimQueue.NumRequeues(obj) < ctrl.failedProvisionThreshold {
logrus.Warningf("Retrying syncing claim %q because failures %v < threshold %v", key, ctrl.claimQueue.NumRequeues(obj), ctrl.failedProvisionThreshold)
logrus.Warningf("Retrying syncing claim %q because failures %v < threshold %v",
key, ctrl.claimQueue.NumRequeues(obj), ctrl.failedProvisionThreshold)
ctrl.claimQueue.AddRateLimited(obj)
} else {
logrus.Errorf("Giving up syncing claim %q because failures %v >= threshold %v", key, ctrl.claimQueue.NumRequeues(obj), ctrl.failedProvisionThreshold)

View File

@ -21,9 +21,7 @@ package server
import (
"context"
"fmt"
corev1 "k8s.io/api/core/v1"
"net"
"strconv"
"strings"
"time"
@ -38,6 +36,7 @@ import (
"github.com/goodrain/rainbond/worker/server/pb"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
corev1 "k8s.io/api/core/v1"
)
//RuntimeServer app runtime grpc server
@ -242,36 +241,49 @@ func (r *RuntimeServer) ListThirdPartyEndpoints(ctx context.Context, re *pb.Serv
return new(pb.ThirdPartyEndpoints), nil
}
var pbeps []*pb.ThirdPartyEndpoint
// The same IP may correspond to two endpoints, which are internal and external endpoints.
// So it is need to filter the same IP.
exists := make(map[string]bool)
for _, ep := range as.GetEndpoints() {
if exists[ep.GetLabels()["uuid"]] {
if ep.Subsets == nil || len(ep.Subsets) == 0 {
logrus.Debugf("Key: %s; empty subsets", fmt.Sprintf("%s/%s", ep.Namespace, ep.Name))
continue
}
exists[ep.GetLabels()["uuid"]] = true
pbep := &pb.ThirdPartyEndpoint{
Uuid: ep.GetLabels()["uuid"],
Sid: ep.GetLabels()["service_id"],
Ip: ep.GetLabels()["ip"],
Port: func(item *corev1.Endpoints) int32 {
realport, _ := strconv.ParseBool(item.GetLabels()["realport"])
if realport {
portstr := item.GetLabels()["port"]
port, _ := strconv.Atoi(portstr)
return int32(port)
for idx, subset := range ep.Subsets {
if exists[subset.Ports[0].Name] {
continue
}
ip := func(subset corev1.EndpointSubset) string {
if subset.Addresses != nil && len(subset.Addresses) > 0 {
return subset.Addresses[0].IP
}
return 0
}(ep),
Status: func(item *corev1.Endpoints) string {
if item.Subsets == nil || len(item.Subsets) == 0 {
if subset.NotReadyAddresses != nil && len(subset.NotReadyAddresses) > 0 {
return subset.NotReadyAddresses[0].IP
}
return ""
}(subset)
if strings.TrimSpace(ip) == "" {
logrus.Debugf("Key: %s; Index: %d; IP not found", fmt.Sprintf("%s/%s", ep.Namespace, ep.Name), idx)
continue
}
exists[subset.Ports[0].Name] = true
pbep := &pb.ThirdPartyEndpoint{
Uuid: subset.Ports[0].Name,
Sid: ep.GetLabels()["service_id"],
Ip: ip,
Port: subset.Ports[0].Port,
Status: func(item *corev1.Endpoints) string {
if subset.Addresses != nil && len(subset.Addresses) > 0 {
return "healthy"
}
if subset.NotReadyAddresses != nil && len(subset.NotReadyAddresses) > 0 {
return "unhealthy"
}
return "unknown"
}
if item.Subsets[0].Addresses == nil || len(item.Subsets[0].Addresses) == 0 {
return "unhealthy"
}
return "healthy"
}(ep),
}(ep),
}
pbeps = append(pbeps, pbep)
}
pbeps = append(pbeps, pbep)
}
return &pb.ThirdPartyEndpoints{
Obj: pbeps,
@ -310,9 +322,16 @@ func (r *RuntimeServer) UpdThirdPartyEndpoint(ctx context.Context, re *pb.UpdThi
Port: int(re.Port),
IsOnline: re.IsOnline,
}
r.updateCh.In() <- discovery.Event{
Type: discovery.UpdateEvent,
Obj: rbdep,
if re.IsOnline == false {
r.updateCh.In() <- discovery.Event{
Type: discovery.DeleteEvent,
Obj: rbdep,
}
} else {
r.updateCh.In() <- discovery.Event{
Type: discovery.UpdateEvent,
Obj: rbdep,
}
}
return new(pb.Empty), nil
}