mirror of
https://gitee.com/rainbond/Rainbond.git
synced 2024-11-29 18:27:58 +08:00
change third app health check code
This commit is contained in:
parent
7221fe1bc8
commit
6ab0b926c2
@ -420,11 +420,11 @@ type TenantServiceProbe struct {
|
||||
//http请求头,key=value,key2=value2
|
||||
HTTPHeader string `gorm:"column:http_header;size:300" json:"http_header" validate:"http_header"`
|
||||
//初始化等候时间
|
||||
InitialDelaySecond int `gorm:"column:initial_delay_second;size:2;default:1" json:"initial_delay_second" validate:"initial_delay_second"`
|
||||
InitialDelaySecond int `gorm:"column:initial_delay_second;size:2;default:4" json:"initial_delay_second" validate:"initial_delay_second"`
|
||||
//检测间隔时间
|
||||
PeriodSecond int `gorm:"column:period_second;size:2;default:3" json:"period_second" validate:"period_second"`
|
||||
//检测超时时间
|
||||
TimeoutSecond int `gorm:"column:timeout_second;size:3;default:30" json:"timeout_second" validate:"timeout_second"`
|
||||
TimeoutSecond int `gorm:"column:timeout_second;size:3;default:5" json:"timeout_second" validate:"timeout_second"`
|
||||
//是否启用
|
||||
IsUsed *int `gorm:"column:is_used;size:1;default:1" json:"is_used" validate:"is_used"`
|
||||
//标志为失败的检测次数
|
||||
|
@ -20,7 +20,6 @@ package prober
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@ -33,7 +32,6 @@ import (
|
||||
//Prober Prober
|
||||
type Prober interface {
|
||||
GetServiceHealthy(serviceName string) (*v1.HealthStatus, bool)
|
||||
GetCurrentServiceHealthy(serviceName string) (*v1.HealthStatus, error)
|
||||
WatchServiceHealthy(serviceName string) Watcher
|
||||
CloseWatch(serviceName string, id string) error
|
||||
Start()
|
||||
@ -112,35 +110,6 @@ func (p *probeManager) GetServiceHealthy(serviceName string) (*v1.HealthStatus,
|
||||
return v, ok
|
||||
}
|
||||
|
||||
func (p *probeManager) GetCurrentServiceHealthy(serviceName string) (*v1.HealthStatus, error) {
|
||||
if len(p.services) == 0 {
|
||||
return nil, errors.New("services list is empty")
|
||||
}
|
||||
for _, v := range p.services {
|
||||
if v.Name == serviceName {
|
||||
if v.ServiceHealth.Model == "tcp" {
|
||||
statusMap := probe.GetTCPHealth(v.ServiceHealth.Address)
|
||||
result := &v1.HealthStatus{
|
||||
Name: v.Name,
|
||||
Status: statusMap["status"],
|
||||
Info: statusMap["info"],
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
if v.ServiceHealth.Model == "http" {
|
||||
statusMap := probe.GetHTTPHealth(v.ServiceHealth.Address)
|
||||
result := &v1.HealthStatus{
|
||||
Name: v.Name,
|
||||
Status: statusMap["status"],
|
||||
Info: statusMap["info"],
|
||||
}
|
||||
return result, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
return nil, errors.New("the service does not exist")
|
||||
}
|
||||
|
||||
func (p *probeManager) WatchServiceHealthy(serviceName string) Watcher {
|
||||
logrus.Debugf("service name: %s;watch service healthy...", serviceName)
|
||||
healthCh := make(chan *v1.HealthStatus, 10)
|
||||
@ -392,13 +361,16 @@ func (p *probeManager) UpdateServiceProbe(service *v1.Service) {
|
||||
// UpdateServicesProbe updates and runs services probe.
|
||||
func (p *probeManager) UpdateServicesProbe(services []*v1.Service) {
|
||||
if services == nil || len(services) == 0 {
|
||||
logrus.Debug("Empty services")
|
||||
return
|
||||
}
|
||||
oldSvcs := p.ListServicesBySid(services[0].Sid)
|
||||
for _, v := range services {
|
||||
delete(oldSvcs, v.Name)
|
||||
if v.ServiceHealth == nil || v.Disable || !p.isNeedUpdate(v) {
|
||||
if v.ServiceHealth == nil || !p.isNeedUpdate(v) {
|
||||
continue
|
||||
}
|
||||
if v.Disable {
|
||||
p.StopProbes([]string{v.Name})
|
||||
continue
|
||||
}
|
||||
logrus.Debugf("Probe: %s; update probe.", v.Name)
|
||||
@ -443,10 +415,8 @@ func (p *probeManager) StopProbes(names []string) {
|
||||
for _, name := range names {
|
||||
probe := p.serviceProbe[name]
|
||||
if probe == nil {
|
||||
logrus.Debugf("Name: %s; Probe not found.", name)
|
||||
continue
|
||||
}
|
||||
logrus.Debugf("Name: %s; Probe stopped", name)
|
||||
probe.Stop()
|
||||
delete(p.serviceProbe, name)
|
||||
for idx, svc := range p.services {
|
||||
@ -454,5 +424,6 @@ func (p *probeManager) StopProbes(names []string) {
|
||||
p.services = append(p.services[:idx], p.services[idx+1:]...)
|
||||
}
|
||||
}
|
||||
logrus.Debugf("Name: %s; Probe stopped", name)
|
||||
}
|
||||
}
|
||||
|
@ -34,13 +34,14 @@ import (
|
||||
|
||||
// HTTPProbe probes through the http protocol
|
||||
type HTTPProbe struct {
|
||||
Name string
|
||||
Address string
|
||||
ResultsChan chan *v1.HealthStatus
|
||||
Ctx context.Context
|
||||
Cancel context.CancelFunc
|
||||
TimeInterval int
|
||||
MaxErrorsNum int
|
||||
Name string
|
||||
Address string
|
||||
ResultsChan chan *v1.HealthStatus
|
||||
Ctx context.Context
|
||||
Cancel context.CancelFunc
|
||||
TimeInterval int
|
||||
MaxErrorsNum int
|
||||
TimeoutSecond int
|
||||
}
|
||||
|
||||
//Check starts http probe.
|
||||
@ -61,7 +62,7 @@ func (h *HTTPProbe) HTTPCheck() {
|
||||
timer := time.NewTimer(time.Second * time.Duration(h.TimeInterval))
|
||||
defer timer.Stop()
|
||||
for {
|
||||
HealthMap := GetHTTPHealth(h.Address)
|
||||
HealthMap := h.GetHTTPHealth()
|
||||
result := &v1.HealthStatus{
|
||||
Name: h.Name,
|
||||
Status: HealthMap["status"],
|
||||
@ -90,9 +91,10 @@ func isClientTimeout(err error) bool {
|
||||
}
|
||||
|
||||
//GetHTTPHealth get http health
|
||||
func GetHTTPHealth(address string) map[string]string {
|
||||
func (h *HTTPProbe) GetHTTPHealth() map[string]string {
|
||||
address := h.Address
|
||||
c := &http.Client{
|
||||
Timeout: 10 * time.Second,
|
||||
Timeout: time.Duration(h.TimeoutSecond) * time.Second,
|
||||
}
|
||||
if strings.HasPrefix(address, "https://") {
|
||||
c.Transport = &http.Transport{
|
||||
@ -113,6 +115,9 @@ func GetHTTPHealth(address string) map[string]string {
|
||||
}
|
||||
logrus.Debugf("http probe check address; %s", address)
|
||||
resp, err := c.Get(addr.String())
|
||||
if resp.Body != nil {
|
||||
defer resp.Body.Close()
|
||||
}
|
||||
if err != nil {
|
||||
if isClientTimeout(err) {
|
||||
return map[string]string{"status": service.Stat_death, "info": "Request service timeout"}
|
||||
@ -120,9 +125,6 @@ func GetHTTPHealth(address string) map[string]string {
|
||||
logrus.Debugf("http probe request error %s", err.Error())
|
||||
return map[string]string{"status": service.Stat_unhealthy, "info": err.Error()}
|
||||
}
|
||||
if resp.Body != nil {
|
||||
defer resp.Body.Close()
|
||||
}
|
||||
if resp.StatusCode >= 400 {
|
||||
logrus.Debugf("http probe check address %s return code %d", address, resp.StatusCode)
|
||||
return map[string]string{"status": service.Stat_unhealthy, "info": "Service unhealthy"}
|
||||
|
@ -20,7 +20,8 @@ package probe
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/goodrain/rainbond/util/prober/types/v1"
|
||||
|
||||
v1 "github.com/goodrain/rainbond/util/prober/types/v1"
|
||||
)
|
||||
|
||||
//Probe probe
|
||||
@ -31,28 +32,38 @@ type Probe interface {
|
||||
|
||||
//CreateProbe create probe
|
||||
func CreateProbe(ctx context.Context, statusChan chan *v1.HealthStatus, v *v1.Service) Probe {
|
||||
timeoutSecond := v.ServiceHealth.MaxTimeoutSecond
|
||||
if timeoutSecond == 0 {
|
||||
timeoutSecond = 5
|
||||
}
|
||||
interval := v.ServiceHealth.TimeInterval
|
||||
if interval == 0 {
|
||||
interval = 5
|
||||
}
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
if v.ServiceHealth.Model == "tcp" {
|
||||
t := &TCPProbe{
|
||||
Name: v.ServiceHealth.Name,
|
||||
Address: v.ServiceHealth.Address,
|
||||
Ctx: ctx,
|
||||
Cancel: cancel,
|
||||
ResultsChan: statusChan,
|
||||
TimeInterval: v.ServiceHealth.TimeInterval,
|
||||
MaxErrorsNum: v.ServiceHealth.MaxErrorsNum,
|
||||
Name: v.ServiceHealth.Name,
|
||||
Address: v.ServiceHealth.Address,
|
||||
Ctx: ctx,
|
||||
Cancel: cancel,
|
||||
ResultsChan: statusChan,
|
||||
TimeInterval: interval,
|
||||
MaxErrorsNum: v.ServiceHealth.MaxErrorsNum,
|
||||
TimeoutSecond: timeoutSecond,
|
||||
}
|
||||
return t
|
||||
}
|
||||
if v.ServiceHealth.Model == "http" {
|
||||
t := &HTTPProbe{
|
||||
Name: v.ServiceHealth.Name,
|
||||
Address: v.ServiceHealth.Address,
|
||||
Ctx: ctx,
|
||||
Cancel: cancel,
|
||||
ResultsChan: statusChan,
|
||||
TimeInterval: v.ServiceHealth.TimeInterval,
|
||||
MaxErrorsNum: v.ServiceHealth.MaxErrorsNum,
|
||||
Name: v.ServiceHealth.Name,
|
||||
Address: v.ServiceHealth.Address,
|
||||
Ctx: ctx,
|
||||
Cancel: cancel,
|
||||
ResultsChan: statusChan,
|
||||
TimeInterval: v.ServiceHealth.TimeInterval,
|
||||
MaxErrorsNum: v.ServiceHealth.MaxErrorsNum,
|
||||
TimeoutSecond: v.ServiceHealth.MaxTimeoutSecond,
|
||||
}
|
||||
return t
|
||||
}
|
||||
|
@ -12,13 +12,14 @@ import (
|
||||
|
||||
// TCPProbe probes through the tcp protocol
|
||||
type TCPProbe struct {
|
||||
Name string
|
||||
Address string
|
||||
ResultsChan chan *v1.HealthStatus
|
||||
Ctx context.Context
|
||||
Cancel context.CancelFunc
|
||||
TimeInterval int
|
||||
MaxErrorsNum int
|
||||
Name string
|
||||
Address string
|
||||
ResultsChan chan *v1.HealthStatus
|
||||
Ctx context.Context
|
||||
Cancel context.CancelFunc
|
||||
TimeoutSecond int
|
||||
TimeInterval int
|
||||
MaxErrorsNum int
|
||||
}
|
||||
|
||||
// Check starts tcp probe.
|
||||
@ -33,11 +34,11 @@ func (h *TCPProbe) Stop() {
|
||||
|
||||
// TCPCheck -
|
||||
func (h *TCPProbe) TCPCheck() {
|
||||
logrus.Debugf("TCP check; Name: %s; Address: %s", h.Name, h.Address)
|
||||
logrus.Debugf("TCP check; Name: %s; Address: %s Interval %d", h.Name, h.Address, h.TimeInterval)
|
||||
timer := time.NewTimer(time.Second * time.Duration(h.TimeInterval))
|
||||
defer timer.Stop()
|
||||
for {
|
||||
HealthMap := GetTCPHealth(h.Address)
|
||||
HealthMap := h.GetTCPHealth()
|
||||
result := &v1.HealthStatus{
|
||||
Name: h.Name,
|
||||
Status: HealthMap["status"],
|
||||
@ -54,13 +55,16 @@ func (h *TCPProbe) TCPCheck() {
|
||||
}
|
||||
|
||||
//GetTCPHealth get tcp health
|
||||
func GetTCPHealth(address string) map[string]string {
|
||||
conn, err := net.DialTimeout("tcp", address, 5*time.Second)
|
||||
func (h *TCPProbe) GetTCPHealth() map[string]string {
|
||||
address := h.Address
|
||||
conn, err := net.DialTimeout("tcp", address, time.Duration(h.TimeoutSecond)*time.Second)
|
||||
if conn != nil {
|
||||
defer conn.Close()
|
||||
}
|
||||
if err != nil {
|
||||
logrus.Warningf("probe health check, %s connection failure", address)
|
||||
logrus.Debugf("probe health check, %s connection failure", address)
|
||||
return map[string]string{"status": v1.StatDeath,
|
||||
"info": fmt.Sprintf("Address: %s; Tcp connection error", address)}
|
||||
}
|
||||
defer conn.Close()
|
||||
return map[string]string{"status": v1.StatHealthy, "info": "service health"}
|
||||
}
|
||||
|
@ -59,13 +59,14 @@ func (l *Service) Equal(r *Service) bool {
|
||||
|
||||
//Health ServiceHealth
|
||||
type Health struct {
|
||||
Name string `json:"name"`
|
||||
Model string `json:"model"`
|
||||
IP string `json:"ip"`
|
||||
Port int `json:"port"`
|
||||
Address string `json:"address"`
|
||||
TimeInterval int `json:"time_interval"`
|
||||
MaxErrorsNum int `json:"max_errors_num"`
|
||||
Name string `json:"name"`
|
||||
Model string `json:"model"`
|
||||
IP string `json:"ip"`
|
||||
Port int `json:"port"`
|
||||
Address string `json:"address"`
|
||||
TimeInterval int `json:"time_interval"`
|
||||
MaxErrorsNum int `json:"max_errors_num"`
|
||||
MaxTimeoutSecond int `json:"max_timeout"`
|
||||
}
|
||||
|
||||
// Equal check if the left health(l) is equal to the right health(r)
|
||||
|
@ -43,7 +43,7 @@ func NewAPPMController(clientset *kubernetes.Clientset,
|
||||
}
|
||||
// create prober first, then thirdparty
|
||||
c.prober = prober.NewProber(c.store, c.probeCh, c.updateCh)
|
||||
c.thirdparty = thirdparty.NewThirdPartier(clientset, c.store, c.startCh, c.updateCh, c.stopCh)
|
||||
c.thirdparty = thirdparty.NewThirdPartier(clientset, c.store, c.startCh, c.updateCh, c.stopCh, c.prober)
|
||||
return c
|
||||
}
|
||||
|
||||
|
@ -23,6 +23,8 @@ import (
|
||||
"fmt"
|
||||
"net"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/Sirupsen/logrus"
|
||||
"github.com/eapache/channels"
|
||||
@ -43,6 +45,7 @@ type Prober interface {
|
||||
Stop()
|
||||
UpdateProbes(info []*store.ProbeInfo)
|
||||
StopProbe(uuids []string)
|
||||
IsUsedProbe(sid string) bool
|
||||
}
|
||||
|
||||
// NewProber creates a new third-party service prober.
|
||||
@ -57,9 +60,9 @@ func NewProber(store store.Storer,
|
||||
|
||||
updateCh: updateCh,
|
||||
probeCh: probeCh,
|
||||
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
watcher: make(map[string]map[string]uitlprober.Watcher),
|
||||
ctx: ctx,
|
||||
cancel: cancel,
|
||||
}
|
||||
}
|
||||
|
||||
@ -68,21 +71,22 @@ type tpProbe struct {
|
||||
utilprober uitlprober.Prober
|
||||
dbm db.Manager
|
||||
store store.Storer
|
||||
|
||||
probeCh *channels.RingChannel
|
||||
updateCh *channels.RingChannel
|
||||
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
probeCh *channels.RingChannel
|
||||
updateCh *channels.RingChannel
|
||||
ctx context.Context
|
||||
cancel context.CancelFunc
|
||||
watcher map[string]map[string]uitlprober.Watcher
|
||||
lock sync.Mutex
|
||||
}
|
||||
|
||||
func createService(probe *model.TenantServiceProbe) *v1.Service {
|
||||
return &v1.Service{
|
||||
Disable: false,
|
||||
Disable: probe.IsUsed == nil || *probe.IsUsed != 1,
|
||||
ServiceHealth: &v1.Health{
|
||||
Model: probe.Scheme,
|
||||
TimeInterval: probe.InitialDelaySecond,
|
||||
MaxErrorsNum: probe.FailureThreshold,
|
||||
Model: probe.Scheme,
|
||||
TimeInterval: probe.PeriodSecond,
|
||||
MaxErrorsNum: probe.FailureThreshold,
|
||||
MaxTimeoutSecond: probe.TimeoutSecond,
|
||||
},
|
||||
}
|
||||
}
|
||||
@ -122,30 +126,42 @@ func (t *tpProbe) Stop() {
|
||||
}
|
||||
|
||||
func (t *tpProbe) UpdateProbes(infos []*store.ProbeInfo) {
|
||||
t.lock.Lock()
|
||||
defer t.lock.Unlock()
|
||||
var services []*v1.Service
|
||||
for _, info := range infos {
|
||||
service, probeInfo := t.createServices(info)
|
||||
if service == nil {
|
||||
logrus.Debugf("Empty service, stop creating probe")
|
||||
t.utilprober.StopProbes([]string{info.UUID})
|
||||
continue
|
||||
}
|
||||
services = append(services, service)
|
||||
// watch
|
||||
if t.utilprober.CheckIfExist(service) {
|
||||
continue
|
||||
if swatchers, exist := t.watcher[service.Sid]; exist && swatchers != nil {
|
||||
if watcher, exist := swatchers[service.Name]; exist && watcher != nil {
|
||||
continue
|
||||
}
|
||||
} else {
|
||||
t.watcher[service.Sid] = make(map[string]uitlprober.Watcher)
|
||||
}
|
||||
logrus.Infof("create probe[sid: %s, address: %s, port: %d]", service.Sid, service.ServiceHealth.IP, service.ServiceHealth.Port)
|
||||
go func(service *v1.Service, info *store.ProbeInfo) {
|
||||
watcher := t.utilprober.WatchServiceHealthy(service.Name)
|
||||
t.utilprober.EnableWatcher(watcher.GetServiceName(), watcher.GetID())
|
||||
watcher := t.utilprober.WatchServiceHealthy(service.Name)
|
||||
t.utilprober.EnableWatcher(watcher.GetServiceName(), watcher.GetID())
|
||||
t.watcher[service.Sid][service.Name] = watcher
|
||||
go func(watcher uitlprober.Watcher, info *store.ProbeInfo) {
|
||||
defer watcher.Close()
|
||||
defer t.utilprober.DisableWatcher(watcher.GetServiceName(), watcher.GetID())
|
||||
defer delete(t.watcher[service.Sid], service.Name)
|
||||
for {
|
||||
select {
|
||||
case event := <-watcher.Watch():
|
||||
if event == nil {
|
||||
case event, ok := <-watcher.Watch():
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
if event == nil {
|
||||
logrus.Errorf("get nil event from prober status chan, will retry")
|
||||
time.Sleep(time.Second * 3)
|
||||
}
|
||||
switch event.Status {
|
||||
case v1.StatHealthy:
|
||||
obj := &appmv1.RbdEndpoint{
|
||||
@ -180,7 +196,7 @@ func (t *tpProbe) UpdateProbes(infos []*store.ProbeInfo) {
|
||||
return
|
||||
}
|
||||
}
|
||||
}(service, info)
|
||||
}(watcher, info)
|
||||
}
|
||||
//Method internally to determine if the configuration has changed
|
||||
//remove old address probe
|
||||
@ -207,6 +223,13 @@ func (t *tpProbe) GetProbeInfo(sid string) (*model.TenantServiceProbe, error) {
|
||||
return probes[0], nil
|
||||
}
|
||||
|
||||
func (t *tpProbe) IsUsedProbe(sid string) bool {
|
||||
if p, _ := t.GetProbeInfo(sid); p != nil {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (t *tpProbe) createServices(probeInfo *store.ProbeInfo) (*v1.Service, *model.TenantServiceProbe) {
|
||||
if probeInfo.IP == "1.1.1.1" {
|
||||
app := t.store.GetAppService(probeInfo.Sid)
|
||||
@ -252,7 +275,6 @@ func (t *tpProbe) createServiceNames(ep *corev1.Endpoints) string {
|
||||
}
|
||||
|
||||
func parseTCPHostAddress(address string, port int32) string {
|
||||
logrus.Debugf("tcp probe address=%s, port=%d", address, port)
|
||||
if strings.HasPrefix(address, "https://") {
|
||||
address = strings.Split(address, "https://")[1]
|
||||
}
|
||||
@ -262,15 +284,11 @@ func parseTCPHostAddress(address string, port int32) string {
|
||||
if strings.Contains(address, ":") {
|
||||
address = strings.Split(address, ":")[0]
|
||||
}
|
||||
|
||||
ns, err := net.LookupHost(address)
|
||||
if err != nil || len(ns) == 0 {
|
||||
return address
|
||||
}
|
||||
|
||||
address = ns[0]
|
||||
|
||||
address = fmt.Sprintf("%s:%d", address, port)
|
||||
logrus.Debugf("parse tcp probe address = %s", address)
|
||||
return address
|
||||
}
|
||||
|
@ -593,7 +593,7 @@ func (a *appRuntimeStore) getAppService(serviceID, version, createrID string, cr
|
||||
var err error
|
||||
appservice, err = conversion.InitCacheAppService(a.dbmanager, serviceID, createrID)
|
||||
if err != nil {
|
||||
logrus.Errorf("init cache app service failure:%s", err.Error())
|
||||
logrus.Debugf("init cache app service %s failure:%s ", serviceID, err.Error())
|
||||
return nil, err
|
||||
}
|
||||
a.RegistAppService(appservice)
|
||||
|
137
worker/appm/thirdparty/thirdparty.go
vendored
137
worker/appm/thirdparty/thirdparty.go
vendored
@ -27,6 +27,7 @@ import (
|
||||
"github.com/goodrain/rainbond/db/model"
|
||||
validation "github.com/goodrain/rainbond/util/endpoint"
|
||||
"github.com/goodrain/rainbond/worker/appm/f"
|
||||
"github.com/goodrain/rainbond/worker/appm/prober"
|
||||
"github.com/goodrain/rainbond/worker/appm/store"
|
||||
"github.com/goodrain/rainbond/worker/appm/thirdparty/discovery"
|
||||
v1 "github.com/goodrain/rainbond/worker/appm/types/v1"
|
||||
@ -47,15 +48,16 @@ func NewThirdPartier(clientset *kubernetes.Clientset,
|
||||
store store.Storer,
|
||||
startCh *channels.RingChannel,
|
||||
updateCh *channels.RingChannel,
|
||||
stopCh chan struct{}) ThirdPartier {
|
||||
stopCh chan struct{},
|
||||
prober prober.Prober) ThirdPartier {
|
||||
t := &thirdparty{
|
||||
clientset: clientset,
|
||||
store: store,
|
||||
|
||||
svcStopCh: make(map[string]chan struct{}),
|
||||
startCh: startCh,
|
||||
updateCh: updateCh,
|
||||
stopCh: stopCh,
|
||||
prober: prober,
|
||||
}
|
||||
return t
|
||||
}
|
||||
@ -63,7 +65,7 @@ func NewThirdPartier(clientset *kubernetes.Clientset,
|
||||
type thirdparty struct {
|
||||
clientset *kubernetes.Clientset
|
||||
store store.Storer
|
||||
|
||||
prober prober.Prober
|
||||
// a collection of stop channel for every service.
|
||||
svcStopCh map[string]chan struct{}
|
||||
|
||||
@ -261,41 +263,6 @@ func (t *thirdparty) k8sEndpoints(as *v1.AppService, epinfo []*v1.RbdEndpoint) (
|
||||
var subsets []corev1.EndpointSubset
|
||||
for _, epi := range epinfo {
|
||||
logrus.Debugf("make endpoints[address: %s] subset", epi.IP)
|
||||
address := validation.SplitEndpointAddress(epi.IP)
|
||||
if validation.IsDomainNotIP(address) {
|
||||
if len(as.GetServices()) > 0 {
|
||||
annotations := as.GetServices()[0].Annotations
|
||||
if annotations == nil {
|
||||
annotations = make(map[string]string)
|
||||
}
|
||||
annotations["domain"] = epi.IP
|
||||
as.GetServices()[0].Annotations = annotations
|
||||
}
|
||||
subsets = []corev1.EndpointSubset{corev1.EndpointSubset{
|
||||
Ports: []corev1.EndpointPort{
|
||||
{
|
||||
Name: epi.UUID,
|
||||
Port: func(targetPort int, realPort int) int32 {
|
||||
if realPort == 0 {
|
||||
return int32(targetPort)
|
||||
}
|
||||
return int32(realPort)
|
||||
}(p.ContainerPort, epi.Port),
|
||||
Protocol: corev1.ProtocolUDP,
|
||||
},
|
||||
},
|
||||
// init set not ready address
|
||||
NotReadyAddresses: []corev1.EndpointAddress{
|
||||
{
|
||||
IP: "1.1.1.1",
|
||||
},
|
||||
},
|
||||
}}
|
||||
for _, item := range res {
|
||||
item.Subsets = subsets
|
||||
}
|
||||
return res, nil
|
||||
}
|
||||
subset := corev1.EndpointSubset{
|
||||
Ports: []corev1.EndpointPort{
|
||||
{
|
||||
@ -309,21 +276,41 @@ func (t *thirdparty) k8sEndpoints(as *v1.AppService, epinfo []*v1.RbdEndpoint) (
|
||||
Protocol: corev1.ProtocolTCP,
|
||||
},
|
||||
},
|
||||
NotReadyAddresses: []corev1.EndpointAddress{
|
||||
{
|
||||
IP: epi.IP,
|
||||
},
|
||||
}
|
||||
eaddressIP := epi.IP
|
||||
address := validation.SplitEndpointAddress(epi.IP)
|
||||
if validation.IsDomainNotIP(address) {
|
||||
if len(as.GetServices()) > 0 {
|
||||
annotations := as.GetServices()[0].Annotations
|
||||
if annotations == nil {
|
||||
annotations = make(map[string]string)
|
||||
}
|
||||
annotations["domain"] = epi.IP
|
||||
as.GetServices()[0].Annotations = annotations
|
||||
}
|
||||
eaddressIP = "1.1.1.1"
|
||||
}
|
||||
eaddress := []corev1.EndpointAddress{
|
||||
{
|
||||
IP: eaddressIP,
|
||||
},
|
||||
}
|
||||
useProbe := t.prober.IsUsedProbe(as.ServiceID)
|
||||
if useProbe {
|
||||
subset.NotReadyAddresses = eaddress
|
||||
} else {
|
||||
subset.Addresses = eaddress
|
||||
}
|
||||
subsets = append(subsets, subset)
|
||||
}
|
||||
//all endpoint for one third app is same
|
||||
for _, item := range res {
|
||||
item.Subsets = subsets
|
||||
}
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func createSubsetForAllEndpoint(as *v1.AppService, rbdep *v1.RbdEndpoint) error {
|
||||
func (t *thirdparty) createSubsetForAllEndpoint(as *v1.AppService, rbdep *v1.RbdEndpoint) error {
|
||||
port, err := db.GetManager().TenantServicesPortDao().GetPortsByServiceID(as.ServiceID)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -346,6 +333,7 @@ func createSubsetForAllEndpoint(as *v1.AppService, rbdep *v1.RbdEndpoint) error
|
||||
as.GetServices()[0].Annotations = annotations
|
||||
}
|
||||
}
|
||||
|
||||
subset := corev1.EndpointSubset{
|
||||
Ports: []corev1.EndpointPort{
|
||||
{
|
||||
@ -361,44 +349,60 @@ func createSubsetForAllEndpoint(as *v1.AppService, rbdep *v1.RbdEndpoint) error
|
||||
Protocol: corev1.ProtocolTCP,
|
||||
},
|
||||
},
|
||||
NotReadyAddresses: []corev1.EndpointAddress{
|
||||
{
|
||||
IP: ipAddress,
|
||||
},
|
||||
}
|
||||
eaddress := []corev1.EndpointAddress{
|
||||
{
|
||||
IP: ipAddress,
|
||||
},
|
||||
}
|
||||
useProbe := t.prober.IsUsedProbe(as.ServiceID)
|
||||
if useProbe {
|
||||
subset.NotReadyAddresses = eaddress
|
||||
} else {
|
||||
subset.Addresses = eaddress
|
||||
}
|
||||
|
||||
for _, ep := range as.GetEndpoints() {
|
||||
existPort := false
|
||||
existAddress := false
|
||||
for i, item := range ep.Subsets {
|
||||
if len(item.Ports) > 0 && item.Ports[0].Port == int32(subset.Ports[0].Port) {
|
||||
for _, a := range item.Addresses {
|
||||
if a.IP == ipAddress {
|
||||
existAddress = true
|
||||
break
|
||||
for _, port := range item.Ports {
|
||||
if port.Port == int32(subset.Ports[0].Port) {
|
||||
for _, a := range item.Addresses {
|
||||
if a.IP == ipAddress {
|
||||
existAddress = true
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
for _, a := range item.NotReadyAddresses {
|
||||
if a.IP == ipAddress {
|
||||
existAddress = true
|
||||
break
|
||||
for _, a := range item.NotReadyAddresses {
|
||||
if a.IP == ipAddress {
|
||||
existAddress = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !existAddress {
|
||||
if useProbe {
|
||||
ep.Subsets[i].NotReadyAddresses = append(ep.Subsets[i].NotReadyAddresses, subset.Addresses...)
|
||||
} else {
|
||||
ep.Subsets[i].Addresses = append(ep.Subsets[i].NotReadyAddresses, subset.Addresses...)
|
||||
}
|
||||
}
|
||||
existPort = true
|
||||
}
|
||||
if !existAddress {
|
||||
ep.Subsets[i].NotReadyAddresses = append(ep.Subsets[i].NotReadyAddresses, subset.Addresses...)
|
||||
}
|
||||
existPort = true
|
||||
}
|
||||
}
|
||||
if !existPort {
|
||||
ep.Subsets = append(ep.Subsets, subset)
|
||||
}
|
||||
if err := f.EnsureEndpoints(ep, t.clientset); err != nil {
|
||||
logrus.Errorf("update endpoint %s failure %s", ep.Name, err.Error())
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (t *thirdparty) runUpdate(event discovery.Event) {
|
||||
|
||||
updateAddress := func(as *v1.AppService, rbdep *v1.RbdEndpoint, ready bool) {
|
||||
for _, ep := range as.GetEndpoints() {
|
||||
var needUpdate bool
|
||||
@ -458,7 +462,7 @@ func (t *thirdparty) runUpdate(event discovery.Event) {
|
||||
showEndpointIP := rbdep.IP
|
||||
switch event.Type {
|
||||
case discovery.UpdateEvent, discovery.CreateEvent:
|
||||
err := createSubsetForAllEndpoint(as, rbdep)
|
||||
err := t.createSubsetForAllEndpoint(as, rbdep)
|
||||
if err != nil {
|
||||
logrus.Warningf("ServiceID: %s; error adding subset: %s",
|
||||
rbdep.Sid, err.Error())
|
||||
@ -469,18 +473,13 @@ func (t *thirdparty) runUpdate(event discovery.Event) {
|
||||
logrus.Errorf("create or update service %s failure %s", service.Name, err.Error())
|
||||
}
|
||||
}
|
||||
for _, e := range as.GetEndpoints() {
|
||||
if err := f.EnsureEndpoints(e, t.clientset); err != nil {
|
||||
logrus.Errorf("create or update endpoint %s failure %s", e.Name, err.Error())
|
||||
}
|
||||
}
|
||||
logrus.Debugf("upgrade endpoints and service for third app %s", as.ServiceAlias)
|
||||
case discovery.DeleteEvent:
|
||||
removeAddress(as, rbdep)
|
||||
logrus.Infof("third endpoint %s ip %s is deleted", rbdep.UUID, showEndpointIP)
|
||||
logrus.Debugf("third endpoint %s ip %s is deleted", rbdep.UUID, showEndpointIP)
|
||||
case discovery.HealthEvent:
|
||||
updateAddress(as, rbdep, true)
|
||||
logrus.Infof("third endpoint %s ip %s is onlined", rbdep.UUID, showEndpointIP)
|
||||
logrus.Debugf("third endpoint %s ip %s is onlined", rbdep.UUID, showEndpointIP)
|
||||
case discovery.UnhealthyEvent:
|
||||
logrus.Debugf("third endpoint %s ip %s is offlined", rbdep.UUID, showEndpointIP)
|
||||
updateAddress(as, rbdep, false)
|
||||
|
@ -334,11 +334,13 @@ func (r *RuntimeServer) ListThirdPartyEndpoints(ctx context.Context, re *pb.Serv
|
||||
ip = as.GetServices()[0].Annotations["domain"]
|
||||
}
|
||||
addEndpoint(&pb.ThirdPartyEndpoint{
|
||||
Uuid: port.Name,
|
||||
Sid: ep.GetLabels()["service_id"],
|
||||
Ip: ip,
|
||||
Port: port.Port,
|
||||
Status: "healthy",
|
||||
Uuid: port.Name,
|
||||
Sid: ep.GetLabels()["service_id"],
|
||||
Ip: ip,
|
||||
Port: port.Port,
|
||||
Status: func() string {
|
||||
return "healthy"
|
||||
}(),
|
||||
})
|
||||
}
|
||||
for _, address := range subset.NotReadyAddresses {
|
||||
@ -347,11 +349,13 @@ func (r *RuntimeServer) ListThirdPartyEndpoints(ctx context.Context, re *pb.Serv
|
||||
ip = as.GetServices()[0].Annotations["domain"]
|
||||
}
|
||||
addEndpoint(&pb.ThirdPartyEndpoint{
|
||||
Uuid: port.Name,
|
||||
Sid: ep.GetLabels()["service_id"],
|
||||
Ip: ip,
|
||||
Port: port.Port,
|
||||
Status: "unhealthy",
|
||||
Uuid: port.Name,
|
||||
Sid: ep.GetLabels()["service_id"],
|
||||
Ip: ip,
|
||||
Port: port.Port,
|
||||
Status: func() string {
|
||||
return "unhealthy"
|
||||
}(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user