fix some bugs

This commit is contained in:
凡羊羊 2019-09-19 21:08:01 +08:00
parent 632205ed4a
commit 8d8680872e
15 changed files with 263 additions and 199 deletions

View File

@ -30,7 +30,6 @@ import (
"github.com/Sirupsen/logrus"
"github.com/go-chi/chi"
"github.com/goodrain/rainbond/api/controller/validation"
"github.com/goodrain/rainbond/api/handler"
"github.com/goodrain/rainbond/api/middleware"
api_model "github.com/goodrain/rainbond/api/model"
@ -39,6 +38,7 @@ import (
"github.com/goodrain/rainbond/db/errors"
dbmodel "github.com/goodrain/rainbond/db/model"
mqclient "github.com/goodrain/rainbond/mq/client"
validation "github.com/goodrain/rainbond/util/endpoint"
httputil "github.com/goodrain/rainbond/util/http"
"github.com/goodrain/rainbond/worker/client"
"github.com/jinzhu/gorm"
@ -1434,7 +1434,8 @@ func (t *TenantStruct) PortOuterController(w http.ResponseWriter, r *http.Reques
return
}
for _, ep := range endpoints {
if errs := validation.ValidateEndpointIP(ep.IP); len(errs) > 0 {
address := validation.SplitEndpointAddress(ep.IP)
if errs := validation.ValidateEndpointIP(address); len(errs) > 0 {
httputil.ReturnError(r, w, 400, "do not allow operate outer port for thirdpart domain endpoints")
return
}

View File

@ -21,15 +21,14 @@ package controller
import (
"encoding/json"
"net/http"
"strings"
"github.com/Sirupsen/logrus"
"github.com/goodrain/rainbond/api/controller/validation"
"github.com/goodrain/rainbond/api/handler"
"github.com/goodrain/rainbond/api/middleware"
"github.com/goodrain/rainbond/api/model"
"github.com/goodrain/rainbond/db"
"github.com/goodrain/rainbond/db/errors"
validation "github.com/goodrain/rainbond/util/endpoint"
httputil "github.com/goodrain/rainbond/util/http"
)
@ -56,17 +55,19 @@ func (t *ThirdPartyServiceController) addEndpoints(w http.ResponseWriter, r *htt
return
}
// if address is not ip, and then it is domain
ipAddress := strings.Split(data.Address, ":")[0]
address := validation.SplitEndpointAddress(data.Address)
sid := r.Context().Value(middleware.ContextKey("service_id")).(string)
if err := validation.ValidateEndpointIP(ipAddress); len(err) > 0 {
if err := validation.ValidateEndpointIP(address); len(err) > 0 {
// handle domain, check can add new endpoint or not
if !canAddDomainEndpoint(sid) {
if !canAddDomainEndpoint(sid, true) {
logrus.Warningf("new endpoint addres[%s] is domian", address)
httputil.ReturnError(r, w, 400, "do not support multi domain endpoints")
return
}
}
if !canAddDomainEndpoint(sid) {
if !canAddDomainEndpoint(sid, false) {
// handle ip, check can add new endpoint or not
logrus.Warningf("new endpoint address[%s] is ip, but already has domain endpoint", address)
httputil.ReturnError(r, w, 400, "do not support multi domain endpoints")
return
}
@ -82,16 +83,24 @@ func (t *ThirdPartyServiceController) addEndpoints(w http.ResponseWriter, r *htt
httputil.ReturnSuccess(r, w, "success")
}
func canAddDomainEndpoint(sid string) bool {
func canAddDomainEndpoint(sid string, isDomain bool) bool {
endpoints, err := db.GetManager().EndpointsDao().List(sid)
if err != nil {
logrus.Errorf("find endpoints by sid[%s], error: %s", sid, err.Error())
return false
}
if len(endpoints) > 0 {
if len(endpoints) > 0 && isDomain {
return false
}
if !isDomain {
for _, ep := range endpoints {
address := validation.SplitEndpointAddress(ep.IP)
if err := validation.ValidateEndpointIP(address); len(err) > 0 {
return false
}
}
}
return true
}

View File

@ -27,8 +27,6 @@ import (
"strings"
"time"
"github.com/goodrain/rainbond/api/controller/validation"
"github.com/Sirupsen/logrus"
"github.com/coreos/etcd/clientv3"
api_model "github.com/goodrain/rainbond/api/model"
@ -673,38 +671,24 @@ func (s *ServiceAction) ServiceCreate(sc *api_model.ServiceStruct) error {
UUID: core_util.NewUUID(),
IsOnline: &trueValue,
}
if errs := validation.ValidateEndpointIP(o); len(errs) > 0 {
logrus.Debugf("parse domain address: %s", o)
ep.IP = o
address := o
if strings.HasPrefix(address, "https://") {
address = strings.Split(address, "https://")[1]
}
if strings.HasPrefix(address, "https://") {
address = strings.Split(address, "https://")[1]
}
s := strings.Split(address, ":")
if len(s) == 2 {
port, err := strconv.Atoi(s[1])
if err != nil {
logrus.Warningf("string:%s, error parsing string to int", s[1])
} else {
ep.Port = port
}
}
} else {
s := strings.Split(o, ":")
ep.IP = s[0]
if len(s) == 2 {
port, err := strconv.Atoi(s[1])
if err != nil {
logrus.Warningf("string:%s, error parsing string to int", s[1])
continue
} else {
ep.Port = port
}
}
address := o
port := 0
prefix := ""
if strings.HasPrefix(address, "https://") {
address = strings.Split(address, "https://")[1]
prefix = "https://"
}
if strings.HasPrefix(address, "http://") {
address = strings.Split(address, "http://")[1]
prefix = "http://"
}
if strings.Contains(address, ":") {
addressL := strings.Split(address, ":")
address = addressL[0]
port, _ = strconv.Atoi(addressL[1])
}
ep.IP = prefix + address
ep.Port = port
logrus.Debugf("add new endpoint: %v", ep)

View File

@ -24,11 +24,11 @@ import (
"strings"
"github.com/Sirupsen/logrus"
"github.com/goodrain/rainbond/api/controller/validation"
"github.com/goodrain/rainbond/api/model"
"github.com/goodrain/rainbond/db"
dbmodel "github.com/goodrain/rainbond/db/model"
"github.com/goodrain/rainbond/util"
"github.com/goodrain/rainbond/worker/client"
)
@ -88,27 +88,24 @@ func (t *ThirdPartyServiceHanlder) UpdEndpoints(d *model.UpdEndpiontsReq) error
}
func convertAddressPort(s string) (address string, port int) {
if errs := validation.ValidateEndpointIP(s); len(errs) > 0 {
// domain address
address = s
addressport := strings.Split(s, ":")
if len(addressport) == 3 {
port, _ = strconv.Atoi(addressport[2])
}
return address, port
prefix := ""
if strings.HasPrefix(s, "https://") {
s = strings.Split(s, "https://")[1]
prefix = "https://"
}
addressport := strings.Split(s, ":")
// compatible with ipv6
addressSli := addressport[:func() int {
if len(addressport) == 2 {
return len(addressport) - 1
}
return len(addressport)
}()]
address = strings.Join(addressSli, ":")
if len(addressport) == 2 {
port, _ = strconv.Atoi(addressport[1])
if strings.HasPrefix(s, "http://") {
s = strings.Split(s, "http://")[1]
prefix = "http://"
}
if strings.Contains(s, ":") {
sp := strings.Split(s, ":")
address = prefix + sp[0]
port, _ = strconv.Atoi(sp[1])
} else {
address = prefix + s
}
return address, port
}

View File

@ -20,26 +20,25 @@ package v2
import (
"fmt"
"strconv"
"strings"
"time"
"github.com/Sirupsen/logrus"
"github.com/envoyproxy/go-control-plane/envoy/api/v2"
apiv2 "github.com/envoyproxy/go-control-plane/envoy/api/v2"
v2 "github.com/envoyproxy/go-control-plane/envoy/api/v2"
"github.com/envoyproxy/go-control-plane/envoy/api/v2/auth"
"github.com/envoyproxy/go-control-plane/envoy/api/v2/cluster"
core "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
"github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
"github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint"
listener "github.com/envoyproxy/go-control-plane/envoy/api/v2/listener"
route "github.com/envoyproxy/go-control-plane/envoy/api/v2/route"
"github.com/envoyproxy/go-control-plane/envoy/api/v2/listener"
"github.com/envoyproxy/go-control-plane/envoy/api/v2/route"
http_rate_limit "github.com/envoyproxy/go-control-plane/envoy/config/filter/http/rate_limit/v2"
http_connection_manager "github.com/envoyproxy/go-control-plane/envoy/config/filter/network/http_connection_manager/v2"
tcp_proxy "github.com/envoyproxy/go-control-plane/envoy/config/filter/network/tcp_proxy/v2"
configratelimit "github.com/envoyproxy/go-control-plane/envoy/config/ratelimit/v2"
_type "github.com/envoyproxy/go-control-plane/envoy/type"
"github.com/envoyproxy/go-control-plane/pkg/util"
v1 "github.com/goodrain/rainbond/node/core/envoy/v1"
"github.com/goodrain/rainbond/node/core/envoy/v1"
corev1 "k8s.io/api/core/v1"
)
@ -459,74 +458,29 @@ func getEndpointsByLables(endpoints []*corev1.Endpoints, slabels map[string]stri
}
//CreateDNSLoadAssignment create dns loadAssignment
func CreateDNSLoadAssignment(serviceAlias, namespace string, endpoints []*corev1.Endpoints, service *corev1.Service) *v2.ClusterLoadAssignment {
func CreateDNSLoadAssignment(serviceAlias, namespace, domain string, service *corev1.Service) *v2.ClusterLoadAssignment {
destServiceAlias := GetServiceAliasByService(service)
if destServiceAlias == "" {
logrus.Errorf("service alias is empty in k8s service %s", service.Name)
return nil
}
var domain string
var ok bool
if domain, ok = service.Annotations["domain"]; !ok || domain == "" {
logrus.Warnf("service[sid: %s] is not domain endpoint", service.GetUID())
return nil
}
clusterName := fmt.Sprintf("%s_%s_%s_%d", namespace, serviceAlias, destServiceAlias, service.Spec.Ports[0].Port)
name := fmt.Sprintf("%sService", destServiceAlias)
if destServiceAlias == serviceAlias {
name = fmt.Sprintf("%sServiceOUT", destServiceAlias)
}
selectEndpoint := getEndpointsByLables(endpoints, map[string]string{"name": name})
var lendpoints []endpoint.LocalityLbEndpoints
logrus.Debugf("select endpoints : ", selectEndpoint)
if len(selectEndpoint) > 0 {
ep := selectEndpoint[0]
if len(ep.Subsets) > 0 {
subset := ep.Subsets[0]
toport := int(subset.Ports[0].Port)
if (len(service.Spec.Ports) == 0 || service.Spec.Ports[0].TargetPort.IntVal != int32(toport)) && ep.Labels["service_kind"] != "third_party" {
return nil
}
if serviceAlias == destServiceAlias {
if originPort, ok := service.Labels["origin_port"]; ok {
origin, err := strconv.Atoi(originPort)
if err == nil {
toport = origin
}
}
}
protocol := string(subset.Ports[0].Protocol)
addressList := subset.Addresses
var notready bool
if len(addressList) == 0 {
notready = true
addressList = subset.NotReadyAddresses
}
getHealty := func() *endpoint.Endpoint_HealthCheckConfig {
if notready {
return nil
}
return &endpoint.Endpoint_HealthCheckConfig{
PortValue: uint32(toport),
}
}
var lbe []endpoint.LbEndpoint
logrus.Debugf("len(ep.SubSets: %d, ", ep.Subsets[0], protocol)
envoyAddress := CreateSocketAddress(protocol, domain, uint32(toport))
logrus.Debugf("envoyAddress is : ", envoyAddress)
lbe = append(lbe, endpoint.LbEndpoint{
HostIdentifier: &endpoint.LbEndpoint_Endpoint{
Endpoint: &endpoint.Endpoint{
Address: &envoyAddress,
HealthCheckConfig: getHealty(),
},
},
})
lendpoints = append(lendpoints, endpoint.LocalityLbEndpoints{LbEndpoints: lbe})
}
}
protocol, _ := service.Labels["port_protocol"]
port := service.Spec.Ports[0].Port
var lbe []endpoint.LbEndpoint
envoyAddress := CreateSocketAddress(protocol, domain, uint32(port))
logrus.Debugf("envoyAddress is : ", envoyAddress)
lbe = append(lbe, endpoint.LbEndpoint{
HostIdentifier: &endpoint.LbEndpoint_Endpoint{
Endpoint: &endpoint.Endpoint{
Address: &envoyAddress,
HealthCheckConfig: &endpoint.Endpoint_HealthCheckConfig{PortValue:uint32(port)},
},
},
})
lendpoints = append(lendpoints, endpoint.LocalityLbEndpoints{LbEndpoints: lbe})
cla := &v2.ClusterLoadAssignment{
ClusterName: clusterName,
Endpoints: lendpoints,

View File

@ -34,14 +34,14 @@ import (
)
//OneNodeCluster conver cluster of on envoy node
func OneNodeCluster(serviceAlias, namespace string, configs *corev1.ConfigMap, services []*corev1.Service, endpoints []*corev1.Endpoints) ([]cache.Resource, error) {
func OneNodeCluster(serviceAlias, namespace string, configs *corev1.ConfigMap, services []*corev1.Service) ([]cache.Resource, error) {
resources, _, err := GetPluginConfigs(configs)
if err != nil {
return nil, err
}
var clusters []cache.Resource
if resources.BaseServices != nil && len(resources.BaseServices) > 0 {
for _, cl := range upstreamClusters(serviceAlias, namespace, resources.BaseServices, services, endpoints) {
for _, cl := range upstreamClusters(serviceAlias, namespace, resources.BaseServices, services) {
if err := cl.Validate(); err != nil {
logrus.Errorf("cluster validate failure %s", err.Error())
} else {
@ -66,7 +66,7 @@ func OneNodeCluster(serviceAlias, namespace string, configs *corev1.ConfigMap, s
// upstreamClusters handle upstream app cluster
// handle kubernetes inner service
func upstreamClusters(serviceAlias, namespace string, dependsServices []*api_model.BaseService, services []*corev1.Service, endpoints []*corev1.Endpoints) (cdsClusters []*v2.Cluster) {
func upstreamClusters(serviceAlias, namespace string, dependsServices []*api_model.BaseService, services []*corev1.Service) (cdsClusters []*v2.Cluster) {
var clusterConfig = make(map[string]*api_model.BaseService, len(dependsServices))
for i, dService := range dependsServices {
depServiceIndex := fmt.Sprintf("%s_%s_%s_%d", namespace, serviceAlias, dService.DependServiceAlias, dService.Port)
@ -99,7 +99,7 @@ func upstreamClusters(serviceAlias, namespace string, dependsServices []*api_mod
if domain, ok := service.Annotations["domain"]; ok && domain != "" {
logrus.Debugf("domain endpoint[%s], create logical_dns cluster: ", domain)
clusterOption.ClusterType = v2.Cluster_LOGICAL_DNS
clusterOption.LoadAssignment = envoyv2.CreateDNSLoadAssignment(serviceAlias, namespace, endpoints, service)
clusterOption.LoadAssignment = envoyv2.CreateDNSLoadAssignment(serviceAlias, namespace, domain, service)
if strings.HasPrefix(domain, "https://") {
splitDomain := strings.Split(domain, "https://")
if len(splitDomain) == 2 {

View File

@ -50,9 +50,11 @@ func OneNodeClusterLoadAssignment(serviceAlias, namespace string, endpoints []*c
name = fmt.Sprintf("%sServiceOUT", destServiceAlias)
}
selectEndpoint := getEndpointsByLables(endpoints, map[string]string{"name": name})
var lendpoints []endpoint.LocalityLbEndpoints
for _, en := range selectEndpoint {
for _, subset := range en.Subsets {
var lendpoints []endpoint.LocalityLbEndpoints // localityLbEndpoints just support only one content
if len(selectEndpoint) > 0 {
en := selectEndpoint[0]
if len(en.Subsets) > 0 {
subset := en.Subsets[0]
if len(subset.Ports) < 1 {
continue
}
@ -85,9 +87,10 @@ func OneNodeClusterLoadAssignment(serviceAlias, namespace string, endpoints []*c
PortValue: uint32(toport),
}
}
var lbe []endpoint.LbEndpoint
for _, address := range addressList {
envoyAddress := envoyv2.CreateSocketAddress(protocol, address.IP, uint32(toport))
var lbe []endpoint.LbEndpoint // just support one content
if len(addressList) > 0 {
envoyAddress := envoyv2.CreateSocketAddress(protocol, addressList[0].IP, uint32(toport))
lbe = append(lbe, endpoint.LbEndpoint{
HostIdentifier: &endpoint.LbEndpoint_Endpoint{
Endpoint: &endpoint.Endpoint{
@ -100,6 +103,7 @@ func OneNodeClusterLoadAssignment(serviceAlias, namespace string, endpoints []*c
lendpoints = append(lendpoints, endpoint.LocalityLbEndpoints{LbEndpoints: lbe})
}
}
cla := &v2.ClusterLoadAssignment{
ClusterName: clusterName,
Endpoints: lendpoints,

View File

@ -204,7 +204,7 @@ func (d *DiscoverServerManager) UpdateNodeConfig(nc *NodeConfig) error {
} else {
nc.listeners = listeners
}
clusters, err := conver.OneNodeCluster(nc.serviceAlias, nc.namespace, nc.config, services, endpoint)
clusters, err := conver.OneNodeCluster(nc.serviceAlias, nc.namespace, nc.config, services)
if err != nil {
logrus.Errorf("create envoy clusters failure %s", err.Error())
} else {

View File

@ -0,0 +1,77 @@
package validation
import (
"fmt"
"net"
"strings"
k8svalidation "k8s.io/apimachinery/pkg/util/validation"
)
// ValidateDomain tests that the argument is a valid domain.
func ValidateDomain(domain string) []string {
if strings.TrimSpace(domain) == "" {
return nil
}
var errs []string
if strings.Contains(domain, "*") {
errs = k8svalidation.IsWildcardDNS1123Subdomain(domain)
} else {
errs = k8svalidation.IsDNS1123Subdomain(domain)
}
return errs
}
// ValidateEndpointAddress tests that the argument is a valid endpoint address.
func ValidateEndpointAddress(address string) []string {
ip := net.ParseIP(address)
if ip == nil {
return ValidateDomain(address)
}
return ValidateEndpointIP(address)
}
// SplitEndpointAddress split URL to domain or IP
func SplitEndpointAddress(resourceAddress string) (address string) {
if strings.HasPrefix(resourceAddress, "https://") {
resourceAddress = strings.Split(resourceAddress, "https://")[1]
}
if strings.HasPrefix(resourceAddress, "http://") {
resourceAddress = strings.Split(resourceAddress, "http://")[1]
}
if strings.Contains(resourceAddress, ":") {
sp := strings.Split(resourceAddress, ":")
address = sp[0]
} else {
address = resourceAddress
}
return address
}
// ValidateEndpointIP tests that the argument is a valid IP address.
func ValidateEndpointIP(ipAddress string) []string {
// We disallow some IPs as endpoints or external-ips. Specifically,
// unspecified and loopback addresses are nonsensical and link-local
// addresses tend to be used for node-centric purposes (e.g. metadata
// service).
err := []string{}
ip := net.ParseIP(ipAddress)
if ip == nil {
err = append(err, fmt.Sprintf("%s must be a valid IP address", ipAddress))
return err
}
if ip.IsUnspecified() {
err = append(err, fmt.Sprintf("%s may not be unspecified (0.0.0.0)", ipAddress))
}
if ip.IsLoopback() {
err = append(err, fmt.Sprintf("%s may not be in the loopback range (127.0.0.0/8)", ipAddress))
}
if ip.IsLinkLocalUnicast() {
err = append(err, fmt.Sprintf("%s may not be in the link-local range (169.254.0.0/16)", ipAddress))
}
if ip.IsLinkLocalMulticast() {
err = append(err, fmt.Sprintf("%s may not be in the link-local multicast range (224.0.0.0/24)", ipAddress))
}
return err
}

View File

@ -0,0 +1,15 @@
package validation
import (
"fmt"
"testing"
)
func TestSplitEndpointAddress(t *testing.T) {
address := SplitEndpointAddress("http://www.baidu.com")
fmt.Printf("endpoint: %s, addrss: %s, is IP? %t \n", "http://www.baidu.com", address, len(ValidateEndpointIP(address)) == 0)
address = SplitEndpointAddress("http://www.baidu.com:443")
fmt.Printf("endpoint: %s, addrss: %s, is IP? %t \n", "http://www.baidu.com:443", address, len(ValidateEndpointIP(address)) == 0)
address = SplitEndpointAddress("http://10.211.55.3:7070")
fmt.Printf("endpoint: %s, addrss: %s, is IP? %t \n", "http://10.211.55.3:7070", address, len(ValidateEndpointIP(address)) == 0)
}

View File

@ -193,6 +193,7 @@ func (p *probeManager) handleStatus() {
}
func (p *probeManager) updateServiceStatus(status *v1.HealthStatus) {
logrus.Debugf("who: %s, status; %s, info: %s", status.Name, status.Status, status.Info)
p.lock.Lock()
defer p.lock.Unlock()
exist, ok := p.status[status.Name]

View File

@ -56,6 +56,7 @@ func (h *TCPProbe) TCPCheck() {
func GetTCPHealth(address string) map[string]string {
conn, err := net.DialTimeout("tcp", address, 5*time.Second)
if err != nil {
logrus.Warningf("%s connection failure", address)
return map[string]string{"status": v1.StatDeath,
"info": fmt.Sprintf("Address: %s; Tcp connection error", address)}
}

View File

@ -20,9 +20,8 @@ package f
import (
"fmt"
"github.com/Sirupsen/logrus"
v1 "github.com/goodrain/rainbond/worker/appm/types/v1"
"github.com/goodrain/rainbond/worker/appm/types/v1"
corev1 "k8s.io/api/core/v1"
extensions "k8s.io/api/extensions/v1beta1"
k8sErrors "k8s.io/apimachinery/pkg/api/errors"
@ -140,18 +139,23 @@ func ensureSecret(secret *corev1.Secret, clientSet kubernetes.Interface) {
logrus.Warningf("error updating secret %+v: %v", secret, err)
}
}
// EnsureEndpoints creates or updates endpoints.
func EnsureEndpoints(ep *corev1.Endpoints, clientSet kubernetes.Interface) {
_, err := clientSet.CoreV1().Endpoints(ep.Namespace).Update(ep)
oldEP, err := clientSet.CoreV1().Endpoints(ep.Namespace).Get(ep.Name, metav1.GetOptions{})
if err != nil {
if k8sErrors.IsNotFound(err) {
_, err := clientSet.CoreV1().Endpoints(ep.Namespace).Create(ep)
_, err = clientSet.CoreV1().Endpoints(ep.Namespace).Create(ep)
if err != nil {
logrus.Warningf("error creating endpoints %+v: %v", ep, err)
logrus.Warningf("error createing endpoint %+v: %v", ep, err)
}
return
}
logrus.Errorf("error getting endpoint(%s): %v", fmt.Sprintf("%s:%s", ep.Namespace, ep.Name), err)
return
}
ep.ResourceVersion = oldEP.ResourceVersion
_, err = clientSet.CoreV1().Endpoints(ep.Namespace).Update(ep)
if err != nil {
logrus.Warningf("error updating endpoints %+v: %v", ep, err)
}
}
@ -276,6 +280,23 @@ func UpgradeEndpoints(clientset *kubernetes.Clientset,
}
for _, n := range new {
if o, ok := oldMap[n.Name]; ok {
oldEndpoint, err := clientset.CoreV1().Endpoints(n.Namespace).Get(n.Name,metav1.GetOptions{})
if err != nil {
if k8sErrors.IsNotFound(err){
_, err := clientset.CoreV1().Endpoints(n.Namespace).Create(n)
if err != nil {
if err := handleErr(fmt.Sprintf("error creating endpoints: %+v: err: %v",
n, err), err); err != nil {
return err
}
continue
}
}
if e := handleErr(fmt.Sprintf("err get endpoint[%s:%s], err: %+v", n.Namespace, n.Name, err), err); err != nil{
return e
}
}
n.ResourceVersion = oldEndpoint.ResourceVersion
ep, err := clientset.CoreV1().Endpoints(n.Namespace).Update(n)
if err != nil {
if e := handleErr(fmt.Sprintf("error updating endpoints: %+v: err: %v",
@ -315,12 +336,3 @@ func UpgradeEndpoints(clientset *kubernetes.Clientset,
return nil
}
// UpdateEndpoints uses clientset to update the given Endpoints.
func UpdateEndpoints(ep *corev1.Endpoints, clientSet *kubernetes.Clientset) {
_, err := clientSet.CoreV1().Endpoints(ep.Namespace).Update(ep)
if err != nil {
logrus.Warningf("error updating endpoints: %+v; err: %v", ep, err)
return
}
logrus.Debugf("Key: %s/%s; Successfully update endpoints", ep.GetNamespace(), ep.GetName())
}

View File

@ -21,13 +21,12 @@ package prober
import (
"context"
"fmt"
"github.com/Sirupsen/logrus"
"github.com/eapache/channels"
"github.com/goodrain/rainbond/db"
"github.com/goodrain/rainbond/db/model"
uitlprober "github.com/goodrain/rainbond/util/prober"
v1 "github.com/goodrain/rainbond/util/prober/types/v1"
"github.com/goodrain/rainbond/util/prober/types/v1"
"github.com/goodrain/rainbond/worker/appm/store"
"github.com/goodrain/rainbond/worker/appm/thirdparty/discovery"
appmv1 "github.com/goodrain/rainbond/worker/appm/types/v1"
@ -219,6 +218,21 @@ func (t *tpProbe) GetProbeInfo(sid string) (*model.TenantServiceProbe, error) {
}
func (t *tpProbe) createServices(probeInfo *store.ProbeInfo) (*v1.Service, *model.TenantServiceProbe) {
if probeInfo.IP == "8.8.8.8"{
app := t.store.GetAppService(probeInfo.Sid)
if len(app.GetServices()) >= 1 {
appService := app.GetServices()[0]
if appService.Annotations != nil && appService.Annotations["domain"] != ""{
probeInfo.IP = appService.Annotations["domain"]
logrus.Debugf("domain address is : %s", probeInfo.IP)
}
}
if probeInfo.IP == "8.8.8.8"{
logrus.Warningf("serviceID: %s, is a domain thirdpart endpoint, but do not found domain info", probeInfo.Sid)
return nil, nil
}
}
logrus.Debugf("create probe[sid: %s, address: %s, port: %d]", probeInfo.Sid, probeInfo.IP, probeInfo.Port)
tsp, err := t.GetProbeInfo(probeInfo.Sid)
if err != nil {
logrus.Warningf("ServiceID: %s; Unexpected error occurred, ignore the creation of "+

View File

@ -25,9 +25,9 @@ import (
"github.com/Sirupsen/logrus"
"github.com/eapache/channels"
"github.com/goodrain/rainbond/api/controller/validation"
"github.com/goodrain/rainbond/db"
"github.com/goodrain/rainbond/db/model"
validation "github.com/goodrain/rainbond/util/endpoint"
"github.com/goodrain/rainbond/worker/appm/f"
"github.com/goodrain/rainbond/worker/appm/store"
"github.com/goodrain/rainbond/worker/appm/thirdparty/discovery"
@ -194,6 +194,24 @@ func deleteSubset(as *v1.AppService, rbdep *v1.RbdEndpoint) {
ep.Subsets[idx] = ep.Subsets[len(ep.Subsets)-1]
ep.Subsets = ep.Subsets[:len(ep.Subsets)-1]
}
isDomain := false
for _, addr := range item.Addresses {
if addr.IP == "8.8.8.8" {
isDomain = true
}
}
for _, addr := range item.NotReadyAddresses {
if addr.IP == "8.8.8.8" {
isDomain = true
}
}
if isDomain {
for _, service := range as.GetServices() {
if service.Annotations != nil {
delete(service.Annotations, "domain")
}
}
}
}
}
}
@ -240,7 +258,8 @@ func (t *thirdparty) k8sEndpoints(as *v1.AppService, epinfo []*v1.RbdEndpoint) (
var subsets []corev1.EndpointSubset
for _, epi := range epinfo {
logrus.Debugf("make endpoints[address: %s] subset", epi.IP)
if errs := validation.ValidateEndpointIP(epi.IP); len(errs) > 0 {
address := validation.SplitEndpointAddress(epi.IP)
if errs := validation.ValidateEndpointIP(address); len(errs) > 0 {
logrus.Debug("domain endpoint")
if len(as.GetServices()) > 0 {
annotations := as.GetServices()[0].Annotations
@ -271,12 +290,6 @@ func (t *thirdparty) k8sEndpoints(as *v1.AppService, epinfo []*v1.RbdEndpoint) (
}}
for _, item := range res {
item.Subsets = subsets
annotations := item.Annotations
if annotations == nil {
annotations = make(map[string]string)
}
annotations["domain"] = epi.IP
item.Annotations = annotations
}
return res, nil
}
@ -319,7 +332,8 @@ func updateSubset(as *v1.AppService, rbdep *v1.RbdEndpoint) error {
}
p := ports[0]
ipAddress := rbdep.IP
if errs := validation.ValidateEndpointIP(rbdep.IP); len(errs) > 0 {
address := validation.SplitEndpointAddress(rbdep.IP)
if errs := validation.ValidateEndpointIP(address); len(errs) > 0 {
ipAddress = "8.8.8.8"
if len(as.GetServices()) > 0 {
annotations := as.GetServices()[0].Annotations
@ -329,14 +343,6 @@ func updateSubset(as *v1.AppService, rbdep *v1.RbdEndpoint) error {
annotations["domain"] = rbdep.IP
as.GetServices()[0].Annotations = annotations
}
if len(as.GetEndpoints()) > 0 {
annotations := as.GetEndpoints()[0].Annotations
if annotations == nil {
annotations = make(map[string]string)
}
annotations["domain"] = rbdep.IP
as.GetEndpoints()[0].Annotations = annotations
}
}
subset := corev1.EndpointSubset{
Ports: []corev1.EndpointPort{
@ -384,25 +390,15 @@ func (t *thirdparty) runUpdate(event discovery.Event) {
for idx, subset := range ep.Subsets {
if subset.Ports[0].Name == rbdep.UUID && condition(subset) {
logrus.Debugf("Executed; health: %v; msg: %s", ready, msg)
if errs := validation.ValidateEndpointIP(rbdep.IP); len(errs) > 0 {
annotations := ep.Annotations
if annotations == nil {
annotations = make(map[string]string)
}
annotations["domain"] = rbdep.IP
ep.Annotations = annotations
if len(as.GetServices()) > 0 {
annotations := as.GetServices()[0].Annotations
if annotations == nil {
annotations = make(map[string]string)
}
annotations["domain"] = rbdep.IP
as.GetServices()[0].Annotations = annotations
}
address := validation.SplitEndpointAddress(rbdep.IP)
if errs := validation.ValidateEndpointIP(address); len(errs) > 0 {
rbdep.IP = "8.8.8.8"
}
ep.Subsets[idx] = createSubset(rbdep, ready)
f.UpdateEndpoints(ep, t.clientset)
f.UpgradeEndpoints(t.clientset, as, as.GetEndpoints(), []*corev1.Endpoints{ep}, func(msg string, err error) error {
logrus.Errorf("update endpoint[%+v] failure: %+v",ep, err)
return err
})
}
}
}
@ -437,18 +433,17 @@ func (t *thirdparty) runUpdate(event discovery.Event) {
f.EnsureService(service, t.clientset)
}
newEndpoints, err := t.k8sEndpoints(as, []*v1.RbdEndpoint{rbdep})
if err != nil {
logrus.Warnf("update endpoint error: %v", err.Error())
return
}
for _, ep := range newEndpoints {
f.EnsureEndpoints(ep, t.clientset)
}
_ = f.UpgradeEndpoints(t.clientset, as, as.GetEndpoints(), as.GetEndpoints(),
func(msg string, err error) error {
logrus.Warning(msg)
return nil
})
case discovery.DeleteEvent:
logrus.Debug(msg)
deleteSubset(as, rbdep)
for _, service := range as.GetServices() {
f.EnsureService(service, t.clientset)
}
eps := as.GetEndpoints()
_ = f.UpgradeEndpoints(t.clientset, as, as.GetEndpoints(), eps,
func(msg string, err error) error {