diff --git a/builder/job/job.go b/builder/job/job.go index 87eba533b..62e3e9edf 100644 --- a/builder/job/job.go +++ b/builder/job/job.go @@ -48,7 +48,7 @@ type controller struct { ctx context.Context jobInformer v1.PodInformer namespace string - subJobStatus map[string]chan string + subJobStatus sync.Map lock sync.Mutex } @@ -57,9 +57,8 @@ var jobController *controller //InitJobController init job controller func InitJobController(stop chan struct{}, kubeClient kubernetes.Interface) error { jobController = &controller{ - KubeClient: kubeClient, - namespace: "rbd-system", - subJobStatus: make(map[string]chan string), + KubeClient: kubeClient, + namespace: "rbd-system", } eventHandler := cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { @@ -68,28 +67,34 @@ func InitJobController(stop chan struct{}, kubeClient kubernetes.Interface) erro }, DeleteFunc: func(obj interface{}) { job, _ := obj.(*corev1.Pod) - jobController.lock.Lock() - defer jobController.lock.Unlock() - if ch, exist := jobController.subJobStatus[job.Name]; exist { + if val, exist := jobController.subJobStatus.Load(job.Name); exist { + ch := val.(chan string) ch <- "cancel" } logrus.Infof("[Watch] Build job pod %s deleted", job.Name) }, UpdateFunc: func(old, cur interface{}) { - jobController.lock.Lock() - defer jobController.lock.Unlock() + oldJob := old.(*corev1.Pod) job, _ := cur.(*corev1.Pod) + + // ignore job if the phase is not changed. + if oldJob.Status.Phase == job.Status.Phase { + return + } + if len(job.Status.ContainerStatuses) > 0 { buildContainer := job.Status.ContainerStatuses[0] terminated := buildContainer.State.Terminated if terminated != nil && terminated.ExitCode == 0 { - if ch, exist := jobController.subJobStatus[job.Name]; exist { + if val, exist := jobController.subJobStatus.Load(job.Name); exist { logrus.Infof("job %s container exit 0 and complete", job.Name) + ch := val.(chan string) ch <- "complete" } } if terminated != nil && terminated.ExitCode > 0 { - if ch, exist := jobController.subJobStatus[job.Name]; exist { + if val, exist := jobController.subJobStatus.Load(job.Name); exist { + ch := val.(chan string) ch <- "failed" } } @@ -128,11 +133,9 @@ func (c *controller) GetServiceJobs(serviceID string) ([]*corev1.Pod, error) { } func (c *controller) ExecJob(job *corev1.Pod, logger io.Writer, result chan string) error { - c.lock.Lock() - defer c.lock.Unlock() if j, _ := c.GetJob(job.Name); j != nil { go c.getLogger(job.Name, logger, result) - c.subJobStatus[job.Name] = result + c.subJobStatus.Store(job.Name, result) return nil } _, err := c.KubeClient.CoreV1().Pods(c.namespace).Create(job) @@ -140,7 +143,7 @@ func (c *controller) ExecJob(job *corev1.Pod, logger io.Writer, result chan stri return err } go c.getLogger(job.Name, logger, result) - c.subJobStatus[job.Name] = result + c.subJobStatus.Store(job.Name, result) return nil } @@ -191,6 +194,6 @@ func (c *controller) DeleteJob(job string) { } c.lock.Lock() defer c.lock.Unlock() - delete(c.subJobStatus, job) + c.subJobStatus.Delete(job) logrus.Infof("delete job %s finish", job) }