use informer instead of watch

This commit is contained in:
GLYASAI 2020-03-11 16:56:04 +08:00
parent a61c46bc08
commit d03d29e024

View File

@ -2,15 +2,16 @@ package discover
import ( import (
"context" "context"
"errors"
"sync" "sync"
"time" "time"
"github.com/Sirupsen/logrus" "github.com/Sirupsen/logrus"
corev1 "k8s.io/api/core/v1" corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"github.com/goodrain/rainbond/cmd/node/option" "github.com/goodrain/rainbond/cmd/node/option"
"github.com/goodrain/rainbond/discover/config" "github.com/goodrain/rainbond/discover/config"
@ -66,48 +67,53 @@ func (k *k8sDiscover) AddUpdateProject(name string, callback CallbackUpdate) {
} }
func (k *k8sDiscover) discover(name string, callback CallbackUpdate) { func (k *k8sDiscover) discover(name string, callback CallbackUpdate) {
ctx, cancel := context.WithCancel(k.ctx)
defer cancel()
endpoints := k.list(name) endpoints := k.list(name)
if len(endpoints) > 0 { if len(endpoints) > 0 {
callback.UpdateEndpoints(config.SYNC, endpoints...) callback.UpdateEndpoints(config.SYNC, endpoints...)
} }
var timeout int64 = 60 * 60 * 24 // a day sharedInformer := informers.NewSharedInformerFactoryWithOptions(
k.clientset,
10*time.Second,
informers.WithNamespace(k.cfg.RbdNamespace),
informers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.LabelSelector = "name=" + name
}),
)
REWATCH: eventHandler := cache.ResourceEventHandlerFuncs{
w, err := k.clientset.CoreV1().Pods(k.cfg.RbdNamespace).Watch(metav1.ListOptions{ AddFunc: func(obj interface{}) {
LabelSelector: "name=" + name, pod := obj.(*corev1.Pod)
TimeoutSeconds: &timeout,
})
if err != nil {
k.rewatchWithErr(name, callback, err)
return
}
for {
select {
case <-ctx.Done():
return
case event, ok := <-w.ResultChan():
if !ok {
// rewatch when watch timeout
goto REWATCH
}
pod := event.Object.(*corev1.Pod)
ep := endpointForPod(pod) ep := endpointForPod(pod)
switch event.Type { callback.UpdateEndpoints(config.SYNC, ep)
case watch.Deleted: },
callback.UpdateEndpoints(config.DELETE, ep) DeleteFunc: func(obj interface{}) {
case watch.Added, watch.Modified: pod := obj.(*corev1.Pod)
if !isPodReady(pod) { ep := endpointForPod(pod)
continue callback.UpdateEndpoints(config.DELETE, ep)
} },
callback.UpdateEndpoints(config.SYNC, ep) UpdateFunc: func(old, cur interface{}) {
case watch.Error: oldPod := old.(*corev1.Pod)
k.rewatchWithErr(name, callback, err) curPod := cur.(*corev1.Pod)
if oldPod.Status.Phase == curPod.Status.Phase {
return
} }
} if !isPodReady(curPod) {
return
}
ep := endpointForPod(curPod)
callback.UpdateEndpoints(config.SYNC, ep)
},
}
infomer := sharedInformer.Core().V1().Pods().Informer()
infomer.AddEventHandler(eventHandler)
// start
go infomer.Run(k.ctx.Done())
if !cache.WaitForCacheSync(k.ctx.Done(), infomer.HasSynced) {
k.rewatchWithErr(name, callback, errors.New("timeout wait for cache sync"))
} }
} }