delete deprecated thirdcomponent

This commit is contained in:
GLYASAI 2021-08-05 13:35:22 +08:00
parent 19fec5d0b0
commit cc4f3d5c49
8 changed files with 9 additions and 1175 deletions

View File

@ -32,7 +32,6 @@ import (
"github.com/goodrain/rainbond/pkg/generated/clientset/versioned"
etcdutil "github.com/goodrain/rainbond/util/etcd"
k8sutil "github.com/goodrain/rainbond/util/k8s"
"github.com/goodrain/rainbond/worker/appm"
"github.com/goodrain/rainbond/worker/appm/componentdefinition"
"github.com/goodrain/rainbond/worker/appm/controller"
"github.com/goodrain/rainbond/worker/appm/store"
@ -98,15 +97,8 @@ func Run(s *option.Worker) error {
componentdefinition.NewComponentDefinitionBuilder(s.Config.RBDNamespace)
//step 4: create component resource store
startCh := channels.NewRingChannel(1024)
updateCh := channels.NewRingChannel(1024)
probeCh := channels.NewRingChannel(1024)
cachestore := store.NewStore(restConfig, clientset, rainbondClient, db.GetManager(), s.Config, startCh, probeCh)
appmController := appm.NewAPPMController(clientset, cachestore, startCh, updateCh, probeCh)
if err := appmController.Start(); err != nil {
logrus.Errorf("error starting appm controller: %v", err)
}
defer appmController.Stop()
cachestore := store.NewStore(restConfig, clientset, rainbondClient, db.GetManager(), s.Config)
if err := cachestore.Start(); err != nil {
logrus.Error("start kube cache store error", err)
return err
@ -128,7 +120,7 @@ func Run(s *option.Worker) error {
//step 7 : create discover module
garbageCollector := gc.NewGarbageCollector(clientset)
taskManager := discover.NewTaskManager(s.Config, cachestore, controllerManager, garbageCollector, startCh)
taskManager := discover.NewTaskManager(s.Config, cachestore, controllerManager, garbageCollector)
if err := taskManager.Start(); err != nil {
return err
}

View File

@ -1,73 +0,0 @@
// RAINBOND, Application Management Platform
// Copyright (C) 2014-2017 Goodrain Co., Ltd.
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version. For any non-GPL usage of Rainbond,
// one or multiple Commercial Licenses authorized by Goodrain Co., Ltd.
// must be obtained first.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package appm
import (
"github.com/eapache/channels"
"github.com/goodrain/rainbond/worker/appm/prober"
"github.com/goodrain/rainbond/worker/appm/store"
"github.com/goodrain/rainbond/worker/appm/thirdparty"
"github.com/sirupsen/logrus"
"k8s.io/client-go/kubernetes"
)
// NewAPPMController creates a new appm controller.
func NewAPPMController(clientset kubernetes.Interface,
store store.Storer,
startCh *channels.RingChannel,
updateCh *channels.RingChannel,
probeCh *channels.RingChannel) *Controller {
c := &Controller{
store: store,
updateCh: updateCh,
startCh: startCh,
probeCh: probeCh,
stopCh: make(chan struct{}),
}
// create prober first, then thirdparty
c.prober = prober.NewProber(c.store, c.probeCh, c.updateCh)
c.thirdparty = thirdparty.NewThirdPartier(clientset, c.store, c.startCh, c.updateCh, c.stopCh, c.prober)
return c
}
// Controller describes a new appm controller.
type Controller struct {
store store.Storer
thirdparty thirdparty.ThirdPartier
prober prober.Prober
startCh *channels.RingChannel
updateCh *channels.RingChannel
probeCh *channels.RingChannel
stopCh chan struct{}
}
// Start starts appm controller
func (c *Controller) Start() error {
c.thirdparty.Start()
c.prober.Start()
logrus.Debugf("start thirdparty appm manager success")
return nil
}
// Stop stops appm controller.
func (c *Controller) Stop() {
close(c.stopCh)
c.prober.Stop()
}

View File

@ -1,294 +0,0 @@
// RAINBOND, Application Management Platform
// Copyright (C) 2014-2017 Goodrain Co., Ltd.
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version. For any non-GPL usage of Rainbond,
// one or multiple Commercial Licenses authorized by Goodrain Co., Ltd.
// must be obtained first.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package prober
import (
"context"
"fmt"
"net"
"strings"
"sync"
"time"
"github.com/eapache/channels"
"github.com/goodrain/rainbond/db"
"github.com/goodrain/rainbond/db/model"
uitlprober "github.com/goodrain/rainbond/util/prober"
v1 "github.com/goodrain/rainbond/util/prober/types/v1"
"github.com/goodrain/rainbond/worker/appm/store"
"github.com/goodrain/rainbond/worker/appm/thirdparty/discovery"
appmv1 "github.com/goodrain/rainbond/worker/appm/types/v1"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
)
// Prober is the interface that wraps the required methods to maintain status
// about upstream servers(Endpoints) associated with a third-party service.
type Prober interface {
Start()
Stop()
UpdateProbes(info []*store.ProbeInfo)
StopProbe(uuids []string)
IsUsedProbe(sid string) bool
}
// NewProber creates a new third-party service prober.
func NewProber(store store.Storer,
probeCh *channels.RingChannel,
updateCh *channels.RingChannel) Prober {
ctx, cancel := context.WithCancel(context.Background())
return &tpProbe{
utilprober: uitlprober.NewProber(ctx, cancel),
dbm: db.GetManager(),
store: store,
updateCh: updateCh,
probeCh: probeCh,
watcher: make(map[string]map[string]uitlprober.Watcher),
ctx: ctx,
cancel: cancel,
}
}
// third-party service probe
type tpProbe struct {
utilprober uitlprober.Prober
dbm db.Manager
store store.Storer
probeCh *channels.RingChannel
updateCh *channels.RingChannel
ctx context.Context
cancel context.CancelFunc
watcher map[string]map[string]uitlprober.Watcher
lock sync.Mutex
}
func createService(probe *model.TenantServiceProbe) *v1.Service {
return &v1.Service{
Disable: probe.IsUsed == nil || *probe.IsUsed != 1,
ServiceHealth: &v1.Health{
Model: probe.Scheme,
TimeInterval: probe.PeriodSecond,
MaxErrorsNum: probe.FailureThreshold,
MaxTimeoutSecond: probe.TimeoutSecond,
},
}
}
func (t *tpProbe) Start() {
t.utilprober.Start()
go func() {
for {
select {
case event := <-t.probeCh.Out():
if event == nil {
return
}
evt := event.(store.Event)
switch evt.Type {
case store.CreateEvent:
infos := evt.Obj.([]*store.ProbeInfo)
t.UpdateProbes(infos)
case store.UpdateEvent:
infos := evt.Obj.([]*store.ProbeInfo)
t.UpdateProbes(infos)
case store.DeleteEvent:
uuids := evt.Obj.([]string)
t.StopProbe(uuids)
}
case <-t.ctx.Done():
return
}
}
}()
}
// Stop stops prober.
func (t *tpProbe) Stop() {
t.cancel()
}
func (t *tpProbe) UpdateProbes(infos []*store.ProbeInfo) {
t.lock.Lock()
defer t.lock.Unlock()
var services []*v1.Service
for _, info := range infos {
service, probeInfo := t.createServices(info)
if service == nil {
t.utilprober.StopProbes([]string{info.UUID})
continue
}
services = append(services, service)
// watch
if swatchers, exist := t.watcher[service.Sid]; exist && swatchers != nil {
if watcher, exist := swatchers[service.Name]; exist && watcher != nil {
continue
}
} else {
t.watcher[service.Sid] = make(map[string]uitlprober.Watcher)
}
logrus.Infof("create probe[sid: %s, address: %s, port: %d]", service.Sid, service.ServiceHealth.Address, service.ServiceHealth.Port)
watcher := t.utilprober.WatchServiceHealthy(service.Name)
t.utilprober.EnableWatcher(watcher.GetServiceName(), watcher.GetID())
t.watcher[service.Sid][service.Name] = watcher
go func(watcher uitlprober.Watcher, info *store.ProbeInfo) {
defer watcher.Close()
defer t.utilprober.DisableWatcher(watcher.GetServiceName(), watcher.GetID())
defer delete(t.watcher[service.Sid], service.Name)
for {
select {
case event, ok := <-watcher.Watch():
if !ok {
return
}
if event == nil {
logrus.Errorf("get nil event from prober status chan, will retry")
time.Sleep(time.Second * 3)
}
switch event.Status {
case v1.StatHealthy:
obj := &appmv1.RbdEndpoint{
UUID: info.UUID,
IP: info.IP,
Port: int(info.Port),
Sid: info.Sid,
}
t.updateCh.In() <- discovery.Event{
Type: discovery.HealthEvent,
Obj: obj,
}
case v1.StatDeath, v1.StatUnhealthy:
if event.ErrorNumber > service.ServiceHealth.MaxErrorsNum {
if probeInfo.Mode == model.OfflineFailureAction.String() {
obj := &appmv1.RbdEndpoint{
UUID: info.UUID,
IP: info.IP,
Port: int(info.Port),
Sid: info.Sid,
}
t.updateCh.In() <- discovery.Event{
Type: discovery.UnhealthyEvent,
Obj: obj,
}
}
}
}
case <-t.ctx.Done():
// TODO: should stop for one service, not all services.
logrus.Infof("third app %s probe watcher exist", service.Name)
return
}
}
}(watcher, info)
}
//Method internally to determine if the configuration has changed
//remove old address probe
t.utilprober.UpdateServicesProbe(services)
}
func (t *tpProbe) StopProbe(uuids []string) {
for _, name := range uuids {
t.utilprober.StopProbes([]string{name})
}
}
// GetProbeInfo returns probe info associated with sid.
// If there is a probe in the database, return directly
// If there is no probe in the database, return a default probe
func (t *tpProbe) GetProbeInfo(sid string) (*model.TenantServiceProbe, error) {
probes, err := t.dbm.ServiceProbeDao().GetServiceProbes(sid)
if err != nil || probes == nil || len(probes) == 0 || *(probes[0].IsUsed) == 0 {
if err != nil {
logrus.Warningf("ServiceID: %s; error getting probes: %v", sid, err)
}
return nil, nil
}
return probes[0], nil
}
func (t *tpProbe) IsUsedProbe(sid string) bool {
if p, _ := t.GetProbeInfo(sid); p != nil {
return true
}
return false
}
func (t *tpProbe) createServices(probeInfo *store.ProbeInfo) (*v1.Service, *model.TenantServiceProbe) {
if probeInfo.IP == "1.1.1.1" {
app := t.store.GetAppService(probeInfo.Sid)
if len(app.GetServices(true)) >= 1 {
appService := app.GetServices(true)[0]
if appService.Annotations != nil && appService.Annotations["domain"] != "" {
probeInfo.IP = appService.Annotations["domain"]
logrus.Debugf("domain address is : %s", probeInfo.IP)
}
}
if probeInfo.IP == "1.1.1.1" {
logrus.Warningf("serviceID: %s, is a domain thirdpart endpoint, but do not found domain info", probeInfo.Sid)
return nil, nil
}
}
tsp, err := t.GetProbeInfo(probeInfo.Sid)
if err != nil {
logrus.Warningf("ServiceID: %s; Unexpected error occurred, ignore the creation of "+
"probes: %s", probeInfo.Sid, err.Error())
return nil, nil
}
if tsp == nil {
return nil, nil
}
if tsp.Mode == "liveness" {
tsp.Mode = model.IgnoreFailureAction.String()
}
service := createService(tsp)
service.Sid = probeInfo.Sid
service.Name = probeInfo.UUID
service.ServiceHealth.Port = int(probeInfo.Port)
service.ServiceHealth.Name = service.Name
address := fmt.Sprintf("%s:%d", probeInfo.IP, probeInfo.Port)
if service.ServiceHealth.Model == "tcp" {
address = parseTCPHostAddress(probeInfo.IP, probeInfo.Port)
}
service.ServiceHealth.Address = address
return service, tsp
}
func (t *tpProbe) createServiceNames(ep *corev1.Endpoints) string {
return ep.GetLabels()["uuid"]
}
func parseTCPHostAddress(address string, port int32) string {
if strings.HasPrefix(address, "https://") {
address = strings.Split(address, "https://")[1]
}
if strings.HasPrefix(address, "http://") {
address = strings.Split(address, "http://")[1]
}
if strings.Contains(address, ":") {
address = strings.Split(address, ":")[0]
}
ns, err := net.LookupHost(address)
if err != nil || len(ns) == 0 {
return address
}
address = ns[0]
address = fmt.Sprintf("%s:%d", address, port)
return address
}

View File

@ -1,8 +0,0 @@
package prober
import "testing"
func TestParseTCPHostAddress(t *testing.T) {
re := parseTCPHostAddress("rm-2ze0xlsi14xz6q6sz.mysql.rds.aliyuncs.com", 3306)
t.Log(re)
}

View File

@ -25,7 +25,6 @@ import (
"sync"
"time"
"github.com/eapache/channels"
"github.com/goodrain/rainbond/api/util/bcode"
"github.com/goodrain/rainbond/cmd/worker/option"
"github.com/goodrain/rainbond/db"
@ -37,7 +36,6 @@ import (
k8sutil "github.com/goodrain/rainbond/util/k8s"
"github.com/goodrain/rainbond/worker/appm/componentdefinition"
"github.com/goodrain/rainbond/worker/appm/conversion"
"github.com/goodrain/rainbond/worker/appm/f"
v1 "github.com/goodrain/rainbond/worker/appm/types/v1"
"github.com/goodrain/rainbond/worker/server/pb"
workerutil "github.com/goodrain/rainbond/worker/util"
@ -92,7 +90,6 @@ type Storer interface {
UnRegistPodUpdateListener(string)
RegisterVolumeTypeListener(string, chan<- *model.TenantServiceVolumeType)
UnRegisterVolumeTypeListener(string)
InitOneThirdPartService(service *model.TenantServices) error
GetCrds() ([]*apiextensions.CustomResourceDefinition, error)
GetCrd(name string) (*apiextensions.CustomResourceDefinition, error)
GetServiceMonitorClient() (*versioned.Clientset, error)
@ -125,14 +122,6 @@ type Event struct {
Obj interface{}
}
// ProbeInfo holds the context of a probe.
type ProbeInfo struct {
Sid string `json:"sid"`
UUID string `json:"uuid"`
IP string `json:"ip"`
Port int32 `json:"port"`
}
//appRuntimeStore app runtime store
//cache all kubernetes object and appservice
type appRuntimeStore struct {
@ -150,7 +139,6 @@ type appRuntimeStore struct {
appCount int32
dbmanager db.Manager
conf option.Config
startCh *channels.RingChannel
stopch chan struct{}
podUpdateListeners map[string]chan<- *corev1.Pod
podUpdateListenerLock sync.Mutex
@ -165,9 +153,7 @@ func NewStore(
clientset kubernetes.Interface,
rainbondClient rainbondversioned.Interface,
dbmanager db.Manager,
conf option.Config,
startCh *channels.RingChannel,
probeCh *channels.RingChannel) Storer {
conf option.Config) Storer {
ctx, cancel := context.WithCancel(context.Background())
store := &appRuntimeStore{
kubeconfig: kubeconfig,
@ -181,7 +167,6 @@ func NewStore(
conf: conf,
dbmanager: dbmanager,
crClients: make(map[string]interface{}),
startCh: startCh,
resourceCache: NewResourceCache(),
podUpdateListeners: make(map[string]chan<- *corev1.Pod, 1),
volumeTypeListeners: make(map[string]chan<- *model.TenantServiceVolumeType, 1),
@ -255,9 +240,6 @@ func NewStore(
store.informers.ComponentDefinition = rainbondInformer.Rainbond().V1alpha1().ComponentDefinitions().Informer()
store.informers.ComponentDefinition.AddEventHandlerWithResyncPeriod(componentdefinition.GetComponentDefinitionBuilder(), time.Second*300)
isThirdParty := func(ep *corev1.Endpoints) bool {
return ep.Labels["service-kind"] == model.ServiceKindThirdParty.String()
}
// Endpoint Event Handler
epEventHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
@ -273,15 +255,6 @@ func NewStore(
}
if appservice != nil {
appservice.AddEndpoints(ep)
if isThirdParty(ep) && ep.Subsets != nil && len(ep.Subsets) > 0 {
logrus.Debugf("received add endpoints: %+v", ep)
probeInfos := listProbeInfos(ep, serviceID)
probeCh.In() <- Event{
Type: CreateEvent,
Obj: probeInfos,
}
}
return
}
}
},
@ -298,17 +271,6 @@ func NewStore(
logrus.Debugf("ServiceID: %s; Action: DeleteFunc;service is closed", serviceID)
store.DeleteAppService(appservice)
}
if isThirdParty(ep) {
logrus.Debugf("received delete endpoints: %+v", ep)
var uuids []string
for _, item := range ep.Subsets {
uuids = append(uuids, item.Ports[0].Name)
}
probeCh.In() <- Event{
Type: DeleteEvent,
Obj: uuids,
}
}
}
}
},
@ -325,13 +287,6 @@ func NewStore(
}
if appservice != nil {
appservice.AddEndpoints(cep)
if isThirdParty(cep) {
curInfos := listProbeInfos(cep, serviceID)
probeCh.In() <- Event{
Type: UpdateEvent,
Obj: curInfos,
}
}
}
}
},
@ -365,51 +320,6 @@ func (a *appRuntimeStore) Lister() *Lister {
return a.listers
}
func listProbeInfos(ep *corev1.Endpoints, sid string) []*ProbeInfo {
var probeInfos []*ProbeInfo
addProbe := func(pi *ProbeInfo) {
for _, c := range probeInfos {
if c.IP == pi.IP && c.Port == pi.Port {
return
}
}
probeInfos = append(probeInfos, pi)
}
for _, subset := range ep.Subsets {
for _, port := range subset.Ports {
if ep.Annotations != nil {
if domain, ok := ep.Annotations["domain"]; ok && domain != "" {
logrus.Debugf("thirdpart service[sid: %s] add domain endpoint[domain: %s] probe", sid, domain)
probeInfos = []*ProbeInfo{{
Sid: sid,
UUID: fmt.Sprintf("%s_%d", domain, port.Port),
IP: domain,
Port: port.Port,
}}
return probeInfos
}
}
for _, address := range subset.NotReadyAddresses {
addProbe(&ProbeInfo{
Sid: sid,
UUID: fmt.Sprintf("%s_%d", address.IP, port.Port),
IP: address.IP,
Port: port.Port,
})
}
for _, address := range subset.Addresses {
addProbe(&ProbeInfo{
Sid: sid,
UUID: fmt.Sprintf("%s_%d", address.IP, port.Port),
IP: address.IP,
Port: port.Port,
})
}
}
}
return probeInfos
}
func (a *appRuntimeStore) init() error {
//init leader namespace
leaderNamespace := a.conf.LeaderElectionNamespace
@ -442,64 +352,11 @@ func (a *appRuntimeStore) Start() error {
// init core componentdefinition
componentdefinition.GetComponentDefinitionBuilder().InitCoreComponentDefinition(a.rainbondClient)
go func() {
a.initThirdPartyService()
a.initCustomResourceInformer(stopch)
}()
return nil
}
func (a *appRuntimeStore) initThirdPartyService() error {
logrus.Debugf("begin initializing third-party services.")
// TODO: list third party services that have open ports directly.
svcs, err := a.dbmanager.TenantServiceDao().ListThirdPartyServices()
if err != nil {
logrus.Errorf("error listing third-party services: %v", err)
return err
}
for _, svc := range svcs {
disCfg, _ := a.dbmanager.ThirdPartySvcDiscoveryCfgDao().GetByServiceID(svc.ServiceID)
if disCfg != nil && disCfg.Type == "kubernetes" {
continue
}
if err = a.InitOneThirdPartService(svc); err != nil {
logrus.Errorf("init thridpart service error: %v", err)
return err
}
a.startCh.In() <- &v1.Event{
Type: v1.StartEvent, // TODO: no need to distinguish between event types.
Sid: svc.ServiceID,
}
}
logrus.Infof("initializing third-party services success")
return nil
}
// InitOneThirdPartService init one thridpart service
func (a *appRuntimeStore) InitOneThirdPartService(service *model.TenantServices) error {
// ignore service without open port.
if !a.dbmanager.TenantServicesPortDao().HasOpenPort(service.ServiceID) {
return nil
}
appService, err := conversion.InitCacheAppService(a.dbmanager, service.ServiceID, "Rainbond")
if err != nil {
logrus.Errorf("error initializing cache app service: %v", err)
return err
}
if appService.IsCustomComponent() {
return nil
}
a.RegistAppService(appService)
err = f.ApplyOne(context.Background(), nil, a.clientset, appService)
if err != nil {
logrus.Errorf("error applying rule: %v", err)
return err
}
logrus.Infof("init third app %s kubernetes resource", appService.ServiceAlias)
return nil
}
//Ready if all kube informers is syncd, store is ready
func (a *appRuntimeStore) Ready() bool {
return a.informers.Ready()

View File

@ -1,606 +0,0 @@
// RAINBOND, Application Management Platform
// Copyright (C) 2014-2017 Goodrain Co., Ltd.
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version. For any non-GPL usage of Rainbond,
// one or multiple Commercial Licenses authorized by Goodrain Co., Ltd.
// must be obtained first.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package thirdparty
import (
"context"
"fmt"
"github.com/eapache/channels"
"github.com/goodrain/rainbond/db"
"github.com/goodrain/rainbond/db/model"
validation "github.com/goodrain/rainbond/util/endpoint"
"github.com/goodrain/rainbond/worker/appm/f"
"github.com/goodrain/rainbond/worker/appm/prober"
"github.com/goodrain/rainbond/worker/appm/store"
"github.com/goodrain/rainbond/worker/appm/thirdparty/discovery"
v1 "github.com/goodrain/rainbond/worker/appm/types/v1"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
)
// ThirdPartier is the interface that wraps the required methods to update status
// about upstream servers(Endpoints) associated with a third-party service.
type ThirdPartier interface {
Start()
}
// NewThirdPartier creates a new ThirdPartier.
func NewThirdPartier(clientset kubernetes.Interface,
store store.Storer,
startCh *channels.RingChannel,
updateCh *channels.RingChannel,
stopCh chan struct{},
prober prober.Prober) ThirdPartier {
t := &thirdparty{
clientset: clientset,
store: store,
svcStopCh: make(map[string]chan struct{}),
startCh: startCh,
updateCh: updateCh,
stopCh: stopCh,
prober: prober,
}
return t
}
type thirdparty struct {
clientset kubernetes.Interface
store store.Storer
prober prober.Prober
// a collection of stop channel for every service.
svcStopCh map[string]chan struct{}
startCh *channels.RingChannel
updateCh *channels.RingChannel
stopCh chan struct{}
}
// Start starts receiving event that update k8s endpoints status from start channel(startCh).
func (t *thirdparty) Start() {
go func() {
for {
select {
case event := <-t.updateCh.Out():
devent, ok := event.(discovery.Event)
if !ok {
logrus.Warningf("Unexpected event received %+v", event)
continue
}
t.runUpdate(devent)
case <-t.stopCh:
for _, stopCh := range t.svcStopCh {
close(stopCh)
}
return
}
}
}()
go func() {
for {
select {
case event := <-t.startCh.Out():
evt, ok := event.(*v1.Event)
if !ok {
logrus.Warningf("Unexpected event received %+v", event)
continue
}
logrus.Debugf("Received event: %+v", evt)
if evt.Type == v1.StartEvent { // no need to distinguish between event types
needWatch := false
stopCh := t.svcStopCh[evt.Sid]
if stopCh == nil {
logrus.Debugf("ServiceID: %s; already started.", evt.Sid)
needWatch = true
t.svcStopCh[evt.Sid] = make(chan struct{})
}
go t.runStart(evt.Sid, needWatch)
}
if evt.Type == v1.StopEvent {
stopCh := t.svcStopCh[evt.Sid]
if stopCh == nil {
logrus.Warningf("ServiceID: %s; The third-party service has not started yet, cant't be stoped", evt.Sid)
continue
}
t.runDelete(evt.Sid)
close(stopCh)
delete(t.svcStopCh, evt.Sid)
}
case <-t.stopCh:
for _, stopCh := range t.svcStopCh {
close(stopCh)
}
return
}
}
}()
}
func (t *thirdparty) runStart(sid string, needWatch bool) {
as := t.store.GetAppService(sid)
if as == nil {
logrus.Warnf("get app service from store failure, sid=%s", sid)
return
}
var err error
for i := 3; i > 0; i-- {
rbdeps, ir := t.ListRbdEndpoints(sid)
if rbdeps == nil || len(rbdeps) == 0 {
logrus.Warningf("ServiceID: %s;Empty rbd endpoints, stop starting third-party service.", sid)
continue
}
var eps []*corev1.Endpoints
eps, err = t.k8sEndpoints(as, rbdeps)
if err != nil {
logrus.Warningf("ServiceID: %s; error creating k8s endpoints: %s", sid, err.Error())
continue
}
for _, ep := range eps {
if err := f.EnsureEndpoints(ep, t.clientset); err != nil {
logrus.Errorf("create or update endpoint %s failure %s", ep.Name, err.Error())
}
}
for _, service := range as.GetServices(true) {
if err := f.EnsureService(service, t.clientset); err != nil {
logrus.Errorf("create or update service %s failure %s", service.Name, err.Error())
}
}
if needWatch && ir != nil {
ir.Watch()
}
logrus.Infof("ServiceID: %s; successfully running start task", sid)
return
}
logrus.Errorf("ServiceID: %s; error running start task: %v", sid, err)
}
// ListRbdEndpoints lists all rbd endpoints, include static and dynamic.
func (t *thirdparty) ListRbdEndpoints(sid string) ([]*v1.RbdEndpoint, Interacter) {
var res []*v1.RbdEndpoint
// static
s := NewStaticInteracter(sid)
slist, err := s.List()
if err != nil {
logrus.Warningf("ServiceID: %s;error listing static rbd endpoints: %v", sid, err)
}
if slist != nil && len(slist) > 0 {
res = append(res, slist...)
}
d := NewDynamicInteracter(sid, t.updateCh, t.stopCh)
if d != nil {
dlist, err := d.List()
if err != nil {
logrus.Warningf("ServiceID: %s;error listing dynamic rbd endpoints: %v", sid, err)
}
if dlist != nil && len(dlist) > 0 {
res = append(res, dlist...)
}
}
return res, d
}
func deleteSubset(as *v1.AppService, rbdep *v1.RbdEndpoint) {
eps := as.GetEndpoints(true)
for _, ep := range eps {
for idx, item := range ep.Subsets {
if item.Ports[0].Name == rbdep.UUID {
logrus.Debugf("UUID: %s; subset deleted", rbdep.UUID)
ep.Subsets[idx] = ep.Subsets[len(ep.Subsets)-1]
ep.Subsets = ep.Subsets[:len(ep.Subsets)-1]
}
isDomain := false
for _, addr := range item.Addresses {
if addr.IP == "1.1.1.1" {
isDomain = true
}
}
for _, addr := range item.NotReadyAddresses {
if addr.IP == "1.1.1.1" {
isDomain = true
}
}
if isDomain {
for _, service := range as.GetServices(true) {
if service.Annotations != nil {
if rbdep.IP == service.Annotations["domain"] {
delete(service.Annotations, "domain")
}
}
}
}
}
}
}
func (t *thirdparty) k8sEndpoints(as *v1.AppService, epinfo []*v1.RbdEndpoint) ([]*corev1.Endpoints, error) {
ports, err := db.GetManager().TenantServicesPortDao().GetPortsByServiceID(as.ServiceID)
if err != nil {
return nil, err
}
// third-party service can only have one port
if len(ports) == 0 {
return nil, fmt.Errorf("port not found")
}
p := ports[0]
var res []*corev1.Endpoints
if *p.IsInnerService {
ep := &corev1.Endpoints{}
ep.Namespace = as.TenantID
// inner or outer
if *p.IsInnerService {
ep.Name = fmt.Sprintf("service-%d-%d", p.ID, p.ContainerPort)
if p.K8sServiceName != "" {
ep.Name = p.K8sServiceName
}
ep.Labels = as.GetCommonLabels(map[string]string{
"name": as.ServiceAlias + "Service",
"service-kind": model.ServiceKindThirdParty.String(),
})
}
res = append(res, ep)
}
if *p.IsOuterService {
ep := &corev1.Endpoints{}
ep.Namespace = as.TenantID
// inner or outer
if *p.IsOuterService {
ep.Name = fmt.Sprintf("service-%d-%dout", p.ID, p.ContainerPort)
ep.Labels = as.GetCommonLabels(map[string]string{
"name": as.ServiceAlias + "ServiceOUT",
"service-kind": model.ServiceKindThirdParty.String(),
})
}
res = append(res, ep)
}
var subsets []corev1.EndpointSubset
for _, epi := range epinfo {
logrus.Debugf("make endpoints[address: %s] subset", epi.IP)
subset := corev1.EndpointSubset{
Ports: []corev1.EndpointPort{
{
Name: epi.UUID,
Port: func(targetPort int, realPort int) int32 {
if realPort == 0 {
return int32(targetPort)
}
return int32(realPort)
}(p.ContainerPort, epi.Port),
Protocol: corev1.ProtocolTCP,
},
},
}
eaddressIP := epi.IP
address := validation.SplitEndpointAddress(epi.IP)
if validation.IsDomainNotIP(address) {
if len(as.GetServices(false)) > 0 {
annotations := as.GetServices(false)[0].Annotations
if annotations == nil {
annotations = make(map[string]string)
}
annotations["domain"] = epi.IP
as.GetServices(false)[0].Annotations = annotations
}
eaddressIP = "1.1.1.1"
}
eaddress := []corev1.EndpointAddress{
{
IP: eaddressIP,
},
}
useProbe := t.prober.IsUsedProbe(as.ServiceID)
if useProbe {
subset.NotReadyAddresses = eaddress
} else {
subset.Addresses = eaddress
}
subsets = append(subsets, subset)
}
//all endpoint for one third app is same
for _, item := range res {
item.Subsets = subsets
}
return res, nil
}
func (t *thirdparty) createSubsetForAllEndpoint(as *v1.AppService, rbdep *v1.RbdEndpoint) error {
port, err := db.GetManager().TenantServicesPortDao().GetPortsByServiceID(as.ServiceID)
if err != nil {
return err
}
// third-party service can only have one port
if port == nil || len(port) == 0 {
return fmt.Errorf("Port not found")
}
ipAddress := rbdep.IP
address := validation.SplitEndpointAddress(rbdep.IP)
if validation.IsDomainNotIP(address) {
//domain endpoint set ip is 1.1.1.1
ipAddress = "1.1.1.1"
if len(as.GetServices(false)) > 0 {
annotations := as.GetServices(false)[0].Annotations
if annotations == nil {
annotations = make(map[string]string)
}
annotations["domain"] = rbdep.IP
as.GetServices(false)[0].Annotations = annotations
}
}
subset := corev1.EndpointSubset{
Ports: []corev1.EndpointPort{
{
Name: rbdep.UUID,
Port: func() int32 {
//if endpoint have port, will ues this port
//or use service port
if rbdep.Port != 0 {
return int32(rbdep.Port)
}
return int32(port[0].ContainerPort)
}(),
Protocol: corev1.ProtocolTCP,
},
},
}
eaddress := []corev1.EndpointAddress{
{
IP: ipAddress,
},
}
useProbe := t.prober.IsUsedProbe(as.ServiceID)
if useProbe {
subset.NotReadyAddresses = eaddress
} else {
subset.Addresses = eaddress
}
for _, ep := range as.GetEndpoints(true) {
existPort := false
existAddress := false
for i, item := range ep.Subsets {
for _, port := range item.Ports {
if port.Port == int32(subset.Ports[0].Port) && len(item.Ports) < 2 {
for _, a := range item.Addresses {
if a.IP == ipAddress {
existAddress = true
break
}
}
for _, a := range item.NotReadyAddresses {
if a.IP == ipAddress {
existAddress = true
break
}
}
if !existAddress {
if useProbe {
ep.Subsets[i].NotReadyAddresses = append(ep.Subsets[i].NotReadyAddresses, subset.NotReadyAddresses...)
} else {
ep.Subsets[i].Addresses = append(ep.Subsets[i].NotReadyAddresses, subset.Addresses...)
}
}
existPort = true
}
}
}
if !existPort {
ep.Subsets = append(ep.Subsets, subset)
}
if err := f.EnsureEndpoints(ep, t.clientset); err != nil {
logrus.Errorf("update endpoint %s failure %s", ep.Name, err.Error())
}
}
return nil
}
func (t *thirdparty) runUpdate(event discovery.Event) {
updateAddress := func(as *v1.AppService, rbdep *v1.RbdEndpoint, ready bool) {
ad := validation.SplitEndpointAddress(rbdep.IP)
for _, ep := range as.GetEndpoints(true) {
var needUpdate bool
for idx, subset := range ep.Subsets {
for _, port := range subset.Ports {
address := subset.Addresses
if ready {
address = subset.NotReadyAddresses
}
for i, addr := range address {
ipequal := fmt.Sprintf("%s_%d", addr.IP, port.Port) == fmt.Sprintf("%s_%d", rbdep.IP, rbdep.Port)
if (addr.IP == "1.1.1.1" && validation.IsDomainNotIP(ad)) || ipequal {
if validation.IsDomainNotIP(ad) {
rbdep.IP = "1.1.1.1"
}
ep.Subsets[idx] = updateSubsetAddress(ready, subset, address[i])
needUpdate = true
break
}
}
logrus.Debugf("not found need update address by %s", fmt.Sprintf("%s_%d", rbdep.IP, rbdep.Port))
}
}
if needUpdate {
if err := f.EnsureEndpoints(ep, t.clientset); err != nil {
logrus.Errorf("update endpoint %s failure %s", ep.Name, err.Error())
}
}
}
}
// do not have multiple ports, multiple addresses
removeAddress := func(as *v1.AppService, rbdep *v1.RbdEndpoint) {
ad := validation.SplitEndpointAddress(rbdep.IP)
for _, ep := range as.GetEndpoints(true) {
var needUpdate bool
var newSubsets []corev1.EndpointSubset
for idx, subset := range ep.Subsets {
var handleSubset bool
for i, port := range subset.Ports {
address := append(subset.Addresses, subset.NotReadyAddresses...)
for j, addr := range address {
ipequal := fmt.Sprintf("%s_%d", addr.IP, port.Port) == fmt.Sprintf("%s_%d", rbdep.IP, rbdep.Port)
if (addr.IP == "1.1.1.1" && validation.IsDomainNotIP(ad)) || ipequal {
//multiple port remove port, Instead remove the address
if len(subset.Ports) > 1 {
subset.Ports = append(subset.Ports[:i], subset.Ports[:i]...)
newSubsets = append(newSubsets, subset)
} else {
if validation.IsDomainNotIP(ad) {
rbdep.IP = "1.1.1.1"
}
newsub := removeSubsetAddress(ep.Subsets[idx], address[j])
if len(newsub.Addresses) != 0 || len(newsub.NotReadyAddresses) != 0 {
newSubsets = append(newSubsets, newsub)
}
}
needUpdate = true
handleSubset = true
break
}
}
}
if !handleSubset {
newSubsets = append(newSubsets, subset)
}
}
ep.Subsets = newSubsets
if needUpdate {
if err := f.EnsureEndpoints(ep, t.clientset); err != nil {
logrus.Errorf("update endpoint %s failure %s", ep.Name, err.Error())
}
}
}
}
rbdep := event.Obj.(*v1.RbdEndpoint)
if rbdep == nil {
logrus.Warning("update event obj transfer to *v1.RbdEndpoint failure")
return
}
as := t.store.GetAppService(rbdep.Sid)
if as == nil {
logrus.Warnf("get app service from store failure, sid=%s", rbdep.Sid)
return
}
//rbdep.IP may be set "1.1.1.1" if it is domain
//so cache doamin address for show after handle complete
showEndpointIP := rbdep.IP
switch event.Type {
case discovery.UpdateEvent, discovery.CreateEvent:
err := t.createSubsetForAllEndpoint(as, rbdep)
if err != nil {
logrus.Warningf("ServiceID: %s; error adding subset: %s",
rbdep.Sid, err.Error())
return
}
for _, service := range as.GetServices(true) {
if err := f.EnsureService(service, t.clientset); err != nil {
logrus.Errorf("create or update service %s failure %s", service.Name, err.Error())
}
}
logrus.Debugf("upgrade endpoints and service for third app %s", as.ServiceAlias)
case discovery.DeleteEvent:
removeAddress(as, rbdep)
logrus.Debugf("third endpoint %s ip %s is deleted", rbdep.UUID, showEndpointIP)
case discovery.HealthEvent:
updateAddress(as, rbdep, true)
logrus.Debugf("third endpoint %s ip %s is onlined", rbdep.UUID, showEndpointIP)
case discovery.UnhealthyEvent:
logrus.Debugf("third endpoint %s ip %s is offlined", rbdep.UUID, showEndpointIP)
updateAddress(as, rbdep, false)
}
}
func (t *thirdparty) runDelete(sid string) {
as := t.store.GetAppService(sid) // TODO: need to delete?
if eps := as.GetEndpoints(true); eps != nil {
for _, ep := range eps {
logrus.Debugf("Endpoints delete: %+v", ep)
err := t.clientset.CoreV1().Endpoints(as.TenantID).Delete(context.Background(), ep.Name, metav1.DeleteOptions{})
if err != nil && !errors.IsNotFound(err) {
logrus.Warningf("error deleting endpoint empty old app endpoints: %v", err)
}
}
}
}
func updateSubsetAddress(ready bool, subset corev1.EndpointSubset, address corev1.EndpointAddress) corev1.EndpointSubset {
if ready {
for i, a := range subset.NotReadyAddresses {
if a.IP == address.IP {
subset.NotReadyAddresses = append(subset.NotReadyAddresses[:i], subset.NotReadyAddresses[i+1:]...)
}
}
var exist bool
for _, a := range subset.Addresses {
if a.IP == address.IP {
exist = true
break
}
}
if !exist {
subset.Addresses = append(subset.Addresses, address)
}
} else {
for i, a := range subset.Addresses {
if a.IP == address.IP {
subset.Addresses = append(subset.Addresses[:i], subset.Addresses[i+1:]...)
}
}
var exist bool
for _, a := range subset.NotReadyAddresses {
if a.IP == address.IP {
exist = true
break
}
}
if !exist {
subset.NotReadyAddresses = append(subset.NotReadyAddresses, address)
}
}
return subset
}
func removeSubsetAddress(subset corev1.EndpointSubset, address corev1.EndpointAddress) corev1.EndpointSubset {
for i, a := range subset.Addresses {
if a.IP == address.IP {
subset.Addresses = append(subset.Addresses[:i], subset.Addresses[i+1:]...)
}
}
for i, a := range subset.NotReadyAddresses {
if a.IP == address.IP {
subset.NotReadyAddresses = append(subset.NotReadyAddresses[:i], subset.NotReadyAddresses[i+1:]...)
}
}
return subset
}
func isHealthy(subset corev1.EndpointSubset) bool {
if subset.Addresses != nil && len(subset.Addresses) > 0 {
return true
}
return false
}

View File

@ -24,7 +24,6 @@ import (
"os"
"time"
"github.com/eapache/channels"
"github.com/goodrain/rainbond/cmd/worker/option"
"github.com/goodrain/rainbond/mq/api/grpc/pb"
"github.com/goodrain/rainbond/mq/client"
@ -59,11 +58,10 @@ type TaskManager struct {
func NewTaskManager(cfg option.Config,
store store.Storer,
controllermanager *controller.Manager,
garbageCollector *gc.GarbageCollector,
startCh *channels.RingChannel) *TaskManager {
garbageCollector *gc.GarbageCollector) *TaskManager {
ctx, cancel := context.WithCancel(context.Background())
handleManager := handle.NewManager(ctx, cfg, store, controllermanager, garbageCollector, startCh)
handleManager := handle.NewManager(ctx, cfg, store, controllermanager, garbageCollector)
healthStatus["status"] = "health"
healthStatus["info"] = "worker service health"
return &TaskManager{

View File

@ -25,11 +25,6 @@ import (
"strings"
"time"
"github.com/eapache/channels"
"github.com/sirupsen/logrus"
k8sErrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/goodrain/rainbond/cmd/worker/option"
"github.com/goodrain/rainbond/db"
dbmodel "github.com/goodrain/rainbond/db/model"
@ -41,6 +36,9 @@ import (
v1 "github.com/goodrain/rainbond/worker/appm/types/v1"
"github.com/goodrain/rainbond/worker/discover/model"
"github.com/goodrain/rainbond/worker/gc"
"github.com/sirupsen/logrus"
k8sErrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
//Manager manager
@ -51,8 +49,6 @@ type Manager struct {
dbmanager db.Manager
controllerManager *controller.Manager
garbageCollector *gc.GarbageCollector
startCh *channels.RingChannel
}
//NewManager now handle
@ -60,8 +56,7 @@ func NewManager(ctx context.Context,
config option.Config,
store store.Storer,
controllerManager *controller.Manager,
garbageCollector *gc.GarbageCollector,
startCh *channels.RingChannel) *Manager {
garbageCollector *gc.GarbageCollector) *Manager {
return &Manager{
ctx: ctx,
@ -70,7 +65,6 @@ func NewManager(ctx context.Context,
store: store,
controllerManager: controllerManager,
garbageCollector: garbageCollector,
startCh: startCh,
}
}
@ -426,32 +420,6 @@ func (m *Manager) applyRuleExec(task *model.Task) error {
return fmt.Errorf("component apply rule controller failure:%s", err.Error())
}
if svc.Kind == dbmodel.ServiceKindThirdParty.String() && strings.HasPrefix(body.Action, "port") {
if oldAppService == nil {
m.store.RegistAppService(newAppService)
}
if err = m.store.InitOneThirdPartService(svc); err != nil {
logrus.Errorf("application apply service resource failure: %s", err.Error())
return fmt.Errorf("application apply service resource failure: %s", err.Error())
}
if body.Action == "port-open" {
m.startCh.In() <- &v1.Event{
Type: v1.StartEvent,
Sid: body.ServiceID,
Port: body.Port,
IsInner: body.IsInner,
}
}
if body.Action == "port-close" {
if !db.GetManager().TenantServicesPortDao().HasOpenPort(body.ServiceID) {
m.startCh.In() <- &v1.Event{
Type: v1.StopEvent,
Sid: body.ServiceID,
}
}
}
}
return nil
}