use sync.map

This commit is contained in:
GLYASAI 2020-04-06 20:55:42 +08:00
parent f326a3e4be
commit 66f97bff59

View File

@ -48,7 +48,7 @@ type controller struct {
ctx context.Context ctx context.Context
jobInformer v1.PodInformer jobInformer v1.PodInformer
namespace string namespace string
subJobStatus map[string]chan string subJobStatus sync.Map
lock sync.Mutex lock sync.Mutex
} }
@ -59,7 +59,6 @@ func InitJobController(stop chan struct{}, kubeClient kubernetes.Interface) erro
jobController = &controller{ jobController = &controller{
KubeClient: kubeClient, KubeClient: kubeClient,
namespace: "rbd-system", namespace: "rbd-system",
subJobStatus: make(map[string]chan string),
} }
eventHandler := cache.ResourceEventHandlerFuncs{ eventHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { AddFunc: func(obj interface{}) {
@ -68,28 +67,34 @@ func InitJobController(stop chan struct{}, kubeClient kubernetes.Interface) erro
}, },
DeleteFunc: func(obj interface{}) { DeleteFunc: func(obj interface{}) {
job, _ := obj.(*corev1.Pod) job, _ := obj.(*corev1.Pod)
jobController.lock.Lock() if val, exist := jobController.subJobStatus.Load(job.Name); exist {
defer jobController.lock.Unlock() ch := val.(chan string)
if ch, exist := jobController.subJobStatus[job.Name]; exist {
ch <- "cancel" ch <- "cancel"
} }
logrus.Infof("[Watch] Build job pod %s deleted", job.Name) logrus.Infof("[Watch] Build job pod %s deleted", job.Name)
}, },
UpdateFunc: func(old, cur interface{}) { UpdateFunc: func(old, cur interface{}) {
jobController.lock.Lock() oldJob := old.(*corev1.Pod)
defer jobController.lock.Unlock()
job, _ := cur.(*corev1.Pod) job, _ := cur.(*corev1.Pod)
// ignore job if the phase is not changed.
if oldJob.Status.Phase == job.Status.Phase {
return
}
if len(job.Status.ContainerStatuses) > 0 { if len(job.Status.ContainerStatuses) > 0 {
buildContainer := job.Status.ContainerStatuses[0] buildContainer := job.Status.ContainerStatuses[0]
terminated := buildContainer.State.Terminated terminated := buildContainer.State.Terminated
if terminated != nil && terminated.ExitCode == 0 { if terminated != nil && terminated.ExitCode == 0 {
if ch, exist := jobController.subJobStatus[job.Name]; exist { if val, exist := jobController.subJobStatus.Load(job.Name); exist {
logrus.Infof("job %s container exit 0 and complete", job.Name) logrus.Infof("job %s container exit 0 and complete", job.Name)
ch := val.(chan string)
ch <- "complete" ch <- "complete"
} }
} }
if terminated != nil && terminated.ExitCode > 0 { if terminated != nil && terminated.ExitCode > 0 {
if ch, exist := jobController.subJobStatus[job.Name]; exist { if val, exist := jobController.subJobStatus.Load(job.Name); exist {
ch := val.(chan string)
ch <- "failed" ch <- "failed"
} }
} }
@ -128,11 +133,9 @@ func (c *controller) GetServiceJobs(serviceID string) ([]*corev1.Pod, error) {
} }
func (c *controller) ExecJob(job *corev1.Pod, logger io.Writer, result chan string) error { func (c *controller) ExecJob(job *corev1.Pod, logger io.Writer, result chan string) error {
c.lock.Lock()
defer c.lock.Unlock()
if j, _ := c.GetJob(job.Name); j != nil { if j, _ := c.GetJob(job.Name); j != nil {
go c.getLogger(job.Name, logger, result) go c.getLogger(job.Name, logger, result)
c.subJobStatus[job.Name] = result c.subJobStatus.Store(job.Name, result)
return nil return nil
} }
_, err := c.KubeClient.CoreV1().Pods(c.namespace).Create(job) _, err := c.KubeClient.CoreV1().Pods(c.namespace).Create(job)
@ -140,7 +143,7 @@ func (c *controller) ExecJob(job *corev1.Pod, logger io.Writer, result chan stri
return err return err
} }
go c.getLogger(job.Name, logger, result) go c.getLogger(job.Name, logger, result)
c.subJobStatus[job.Name] = result c.subJobStatus.Store(job.Name, result)
return nil return nil
} }
@ -191,6 +194,6 @@ func (c *controller) DeleteJob(job string) {
} }
c.lock.Lock() c.lock.Lock()
defer c.lock.Unlock() defer c.lock.Unlock()
delete(c.subJobStatus, job) c.subJobStatus.Delete(job)
logrus.Infof("delete job %s finish", job) logrus.Infof("delete job %s finish", job)
} }