dedup helm app informer

This commit is contained in:
GLYASAI 2021-06-23 17:23:46 +08:00
parent c32b28329a
commit f271583fbb
7 changed files with 61 additions and 85 deletions

View File

@ -131,7 +131,7 @@ func (a *ApplicationAction) createHelmApp(ctx context.Context, app *dbmodel.Appl
defer cancel()
_, err := a.kubeClient.CoreV1().Namespaces().Create(ctx1, &corev1.Namespace{
ObjectMeta: metav1.ObjectMeta{
Name: app.AppName,
Name: app.TenantID,
Labels: labels,
},
}, metav1.CreateOptions{})

View File

@ -59,21 +59,23 @@ func CreateGatewayManager(dbmanager db.Manager, mqclient client.MQClient, etcdCl
// AddHTTPRule adds http rule to db if it doesn't exists.
func (g *GatewayAction) AddHTTPRule(req *apimodel.AddHTTPRuleStruct) error {
if err := db.GetManager().DB().Transaction(func(tx *gorm.DB) error {
return g.CreateHTTPRule(tx, req)
}); err != nil {
return err
}
return db.GetManager().DB().Transaction(func(tx *gorm.DB) error {
if err := g.CreateHTTPRule(tx, req); err != nil {
return err
}
// Effective immediately
if err := g.SendTaskDeprecated(map[string]interface{}{
"service_id": req.ServiceID,
"action": "add-http-rule",
"limit": map[string]string{"domain": req.Domain},
}); err != nil {
logrus.Errorf("send runtime message about gateway failure %s", err.Error())
}
return nil
// Effective immediately
err := g.SendTaskDeprecated(map[string]interface{}{
"service_id": req.ServiceID,
"action": "add-http-rule",
"limit": map[string]string{"domain": req.Domain},
})
if err != nil {
return fmt.Errorf("send http rule task: %v", err)
}
return nil
})
}
// CreateHTTPRule Create http rules through transactions
@ -123,19 +125,6 @@ func (g *GatewayAction) CreateHTTPRule(tx *gorm.DB, req *apimodel.AddHTTPRuleStr
}
}
// end transaction
if err := tx.Commit().Error; err != nil {
tx.Rollback()
return fmt.Errorf("commit transaction: %v", err)
}
// Effective immediately
if err := g.SendTaskDeprecated(map[string]interface{}{
"service_id": req.ServiceID,
"action": "add-http-rule",
"limit": map[string]string{"domain": req.Domain},
}); err != nil {
logrus.Errorf("send runtime message about gateway failure %s", err.Error())
}
return nil
}
@ -234,14 +223,6 @@ func (g *GatewayAction) UpdateHTTPRule(req *apimodel.UpdateHTTPRuleStruct) error
return nil
}
func (g *GatewayAction) isCertificateBeingUsed(certID string) (bool, error) {
rules, err := g.dbmanager.HTTPRuleDao().GetHTTPRulesByCertificateID(certID)
if err != nil {
return false, fmt.Errorf("list rules by certificate id: %v", err)
}
return len(rules) > 0, nil
}
// DeleteHTTPRule deletes http rule, including certificate and rule extensions
func (g *GatewayAction) DeleteHTTPRule(req *apimodel.DeleteHTTPRuleStruct) error {
// begin transaction
@ -328,7 +309,7 @@ func (g *GatewayAction) UpdateCertificate(req apimodel.AddHTTPRuleStruct, httpRu
return err
}
if cert == nil {
return fmt.Errorf("Certificate doesn't exist based on certificateID(%s)", req.CertificateID)
return fmt.Errorf("certificate doesn't exist based on certificateID(%s)", req.CertificateID)
}
cert.CertificateName = fmt.Sprintf("cert-%s", util.NewUUID()[0:8])
@ -339,19 +320,22 @@ func (g *GatewayAction) UpdateCertificate(req apimodel.AddHTTPRuleStruct, httpRu
// AddTCPRule adds tcp rule.
func (g *GatewayAction) AddTCPRule(req *apimodel.AddTCPRuleStruct) error {
if err := g.dbmanager.DB().Transaction(func(tx *gorm.DB) error {
return g.CreateTCPRule(tx, req)
}); err != nil {
return err
}
if err := g.SendTaskDeprecated(map[string]interface{}{
"service_id": req.ServiceID,
"action": "add-tcp-rule",
"limit": map[string]string{"tcp-address": fmt.Sprintf("%s:%d", req.IP, req.Port)},
}); err != nil {
logrus.Errorf("send runtime message about gateway failure %s", err.Error())
}
return nil
return g.dbmanager.DB().Transaction(func(tx *gorm.DB) error {
if err := g.CreateTCPRule(tx, req); err != nil {
return err
}
err := g.SendTaskDeprecated(map[string]interface{}{
"service_id": req.ServiceID,
"action": "add-tcp-rule",
"limit": map[string]string{"tcp-address": fmt.Sprintf("%s:%d", req.IP, req.Port)},
})
if err != nil {
return fmt.Errorf("send tcp rule task: %v", err)
}
return nil
})
}
// CreateTCPRule Create tcp rules through transactions
@ -379,17 +363,6 @@ func (g *GatewayAction) CreateTCPRule(tx *gorm.DB, req *apimodel.AddTCPRuleStruc
}
}
// end transaction
if err := tx.Commit().Error; err != nil {
return err
}
if err := g.SendTaskDeprecated(map[string]interface{}{
"service_id": tcpRule.ServiceID,
"action": "add-tcp-rule",
"limit": map[string]string{"tcp-address": fmt.Sprintf("%s:%d", tcpRule.IP, tcpRule.Port)},
}); err != nil {
logrus.Errorf("send runtime message about gateway failure %s", err.Error())
}
return nil
}
@ -630,7 +603,7 @@ func (g *GatewayAction) SendTaskDeprecated(in map[string]interface{}) error {
sid := in["service_id"].(string)
service, err := db.GetManager().TenantServiceDao().GetServiceByID(sid)
if err != nil {
return fmt.Errorf("Unexpected error occurred while getting Service by ServiceID(%s): %v", sid, err)
return fmt.Errorf("unexpected error occurred while getting Service by ServiceID(%s): %v", sid, err)
}
body := make(map[string]interface{})
body["deploy_version"] = service.DeployVersion
@ -643,7 +616,7 @@ func (g *GatewayAction) SendTaskDeprecated(in map[string]interface{}) error {
TaskBody: body,
})
if err != nil {
return fmt.Errorf("Unexpected error occurred while sending task: %v", err)
return fmt.Errorf("unexpected error occurred while sending task: %v", err)
}
return nil
}

View File

@ -100,6 +100,9 @@ type Storer interface {
ListPods(namespace string, selector labels.Selector) ([]*corev1.Pod, error)
ListReplicaSets(namespace string, selector labels.Selector) ([]*appsv1.ReplicaSet, error)
ListServices(namespace string, selector labels.Selector) ([]*corev1.Service, error)
Informer() *Informer
Lister() *Lister
}
// EventType type of event associated with an informer
@ -342,6 +345,14 @@ func NewStore(
return store
}
func (a *appRuntimeStore) Informer() *Informer {
return a.informers
}
func (a *appRuntimeStore) Lister() *Lister {
return a.listers
}
func listProbeInfos(ep *corev1.Endpoints, sid string) []*ProbeInfo {
var probeInfos []*ProbeInfo
addProbe := func(pi *ProbeInfo) {

View File

@ -104,10 +104,7 @@ func (t *TaskManager) Do() {
case <-t.ctx.Done():
return
default:
ctx, cancel := context.WithCancel(t.ctx)
defer cancel()
data, err := t.client.Dequeue(ctx, &pb.DequeueRequest{Topic: client.WorkerTopic, ClientHost: hostname + "-worker"})
data, err := t.client.Dequeue(t.ctx, &pb.DequeueRequest{Topic: client.WorkerTopic, ClientHost: hostname + "-worker"})
if err != nil {
if grpc1.ErrorDesc(err) == context.DeadlineExceeded.Error() {
continue

View File

@ -20,11 +20,11 @@ package helmapp
import (
"context"
"time"
"github.com/goodrain/rainbond/pkg/generated/clientset/versioned"
"github.com/goodrain/rainbond/pkg/generated/listers/rainbond/v1alpha1"
"github.com/sirupsen/logrus"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
)
@ -37,11 +37,16 @@ type Controller struct {
}
// NewController creates a new helm app controller.
func NewController(ctx context.Context, stopCh chan struct{}, kubeClient clientset.Interface, clientset versioned.Interface, resyncPeriod time.Duration,
func NewController(ctx context.Context,
stopCh chan struct{},
kubeClient clientset.Interface,
clientset versioned.Interface,
informer cache.SharedIndexInformer,
lister v1alpha1.HelmAppLister,
repoFile, repoCache, chartCache string) *Controller {
workQueue := workqueue.New()
finalizerQueue := workqueue.New()
storer := NewStorer(clientset, resyncPeriod, workQueue, finalizerQueue)
storer := NewStorer(informer, lister, workQueue, finalizerQueue)
controlLoop := NewControlLoop(ctx, kubeClient, clientset, storer, workQueue, repoFile, repoCache, chartCache)
finalizer := NewFinalizer(ctx, kubeClient, clientset, finalizerQueue, repoFile, repoCache, chartCache)

View File

@ -23,11 +23,8 @@ import (
"time"
rainbondv1alpha1 "github.com/goodrain/rainbond/pkg/apis/rainbond/v1alpha1"
"github.com/goodrain/rainbond/pkg/generated/clientset/versioned"
"github.com/goodrain/rainbond/pkg/generated/informers/externalversions"
"github.com/goodrain/rainbond/pkg/generated/listers/rainbond/v1alpha1"
k8sutil "github.com/goodrain/rainbond/util/k8s"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
@ -45,17 +42,10 @@ type store struct {
}
// NewStorer creates a new storer.
func NewStorer(clientset versioned.Interface,
resyncPeriod time.Duration,
func NewStorer(informer cache.SharedIndexInformer,
lister v1alpha1.HelmAppLister,
workqueue workqueue.Interface,
finalizerQueue workqueue.Interface) Storer {
// create informers factory, enable and assign required informers
sharedInformer := externalversions.NewSharedInformerFactoryWithOptions(clientset, resyncPeriod,
externalversions.WithNamespace(corev1.NamespaceAll))
lister := sharedInformer.Rainbond().V1alpha1().HelmApps().Lister()
informer := sharedInformer.Rainbond().V1alpha1().HelmApps().Informer()
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
helmApp := obj.(*rainbondv1alpha1.HelmApp)
@ -72,7 +62,6 @@ func NewStorer(clientset versioned.Interface,
finalizerQueue.Add(obj)
},
})
return &store{
informer: informer,
lister: lister,

View File

@ -99,7 +99,8 @@ func NewMasterController(conf option.Config, store store.Storer, kubeClient kube
}, serverVersion.GitVersion)
stopCh := make(chan struct{})
helmAppController := helmapp.NewController(ctx, stopCh, kubeClient, rainbondClient, 5*time.Second, conf.Helm.RepoFile, conf.Helm.RepoCache, conf.Helm.RepoCache)
helmAppController := helmapp.NewController(ctx, stopCh, kubeClient, rainbondClient,
store.Informer().HelmApp, store.Lister().HelmApp, conf.Helm.RepoFile, conf.Helm.RepoCache, conf.Helm.RepoCache)
return &Controller{
conf: conf,