Rainbond/builder/job/job.go
2020-04-14 16:54:24 +08:00

198 lines
5.9 KiB
Go

// RAINBOND, Application Management Platform
// Copyright (C) 2014-2017 Goodrain Co., Ltd.
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version. For any non-GPL usage of Rainbond,
// one or multiple Commercial Licenses authorized by Goodrain Co., Ltd.
// must be obtained first.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package job
import (
"bufio"
"context"
"io"
"sync"
"time"
"github.com/Sirupsen/logrus"
"github.com/eapache/channels"
corev1 "k8s.io/api/core/v1"
k8sErrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/informers"
v1 "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
)
//Controller build job controller
type Controller interface {
ExecJob(job *corev1.Pod, logger io.Writer, result *channels.RingChannel) error
GetJob(string) (*corev1.Pod, error)
GetServiceJobs(serviceID string) ([]*corev1.Pod, error)
DeleteJob(job string)
}
type controller struct {
KubeClient kubernetes.Interface
ctx context.Context
jobInformer v1.PodInformer
namespace string
subJobStatus sync.Map
}
var jobController *controller
//InitJobController init job controller
func InitJobController(stop chan struct{}, kubeClient kubernetes.Interface) error {
jobController = &controller{
KubeClient: kubeClient,
namespace: "rbd-system",
}
eventHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
job, _ := obj.(*corev1.Pod)
logrus.Infof("[Watch] Build job pod %s created", job.Name)
},
DeleteFunc: func(obj interface{}) {
job, _ := obj.(*corev1.Pod)
if val, exist := jobController.subJobStatus.Load(job.Name); exist {
ch := val.(*channels.RingChannel)
ch.In() <- "cancel"
}
logrus.Infof("[Watch] Build job pod %s deleted", job.Name)
},
UpdateFunc: func(old, cur interface{}) {
oldJob := old.(*corev1.Pod)
job, _ := cur.(*corev1.Pod)
// ignore job if the phase is not changed.
if oldJob.Status.Phase == job.Status.Phase {
return
}
if len(job.Status.ContainerStatuses) > 0 {
buildContainer := job.Status.ContainerStatuses[0]
terminated := buildContainer.State.Terminated
if terminated != nil && terminated.ExitCode == 0 {
if val, exist := jobController.subJobStatus.Load(job.Name); exist {
logrus.Infof("job %s container exit 0 and complete", job.Name)
ch := val.(*channels.RingChannel)
ch.In() <- "complete"
}
}
if terminated != nil && terminated.ExitCode > 0 {
if val, exist := jobController.subJobStatus.Load(job.Name); exist {
ch := val.(*channels.RingChannel)
ch.In() <- "failed"
}
}
logrus.Infof("job %s container %s state %+v", job.Name, buildContainer.Name, buildContainer.State)
}
},
}
infFactory := informers.NewFilteredSharedInformerFactory(kubeClient, time.Second*3, jobController.namespace,
func(options *metav1.ListOptions) {
options.LabelSelector = "job=codebuild"
})
jobController.jobInformer = infFactory.Core().V1().Pods()
jobController.jobInformer.Informer().AddEventHandlerWithResyncPeriod(eventHandler, time.Second*10)
return jobController.Start(stop)
}
//GetJobController get job controller
func GetJobController() Controller {
return jobController
}
func (c *controller) GetJob(name string) (*corev1.Pod, error) {
return c.jobInformer.Lister().Pods(c.namespace).Get(name)
}
func (c *controller) GetServiceJobs(serviceID string) ([]*corev1.Pod, error) {
s, err := labels.Parse("service=" + serviceID)
if err != nil {
return nil, err
}
jobs, err := c.jobInformer.Lister().Pods(c.namespace).List(s)
if err != nil {
return nil, err
}
return jobs, nil
}
func (c *controller) ExecJob(job *corev1.Pod, logger io.Writer, result *channels.RingChannel) error {
if j, _ := c.GetJob(job.Name); j != nil {
go c.getLogger(job.Name, logger, result)
c.subJobStatus.Store(job.Name, result)
return nil
}
_, err := c.KubeClient.CoreV1().Pods(c.namespace).Create(job)
if err != nil {
return err
}
go c.getLogger(job.Name, logger, result)
c.subJobStatus.Store(job.Name, result)
return nil
}
func (c *controller) Start(stop chan struct{}) error {
go c.jobInformer.Informer().Run(stop)
for !c.jobInformer.Informer().HasSynced() {
time.Sleep(time.Millisecond * 500)
}
return nil
}
func (c *controller) getLogger(job string, writer io.Writer, result *channels.RingChannel) {
defer func() {
result.In() <- "logcomplete"
}()
for {
podLogRequest := c.KubeClient.CoreV1().Pods(c.namespace).GetLogs(job, &corev1.PodLogOptions{Follow: true})
reader, err := podLogRequest.Stream()
if err != nil {
logrus.Warnf("get build job pod log data error: %s, retry net loop", err.Error())
time.Sleep(time.Second * 3)
continue
}
defer reader.Close()
bufReader := bufio.NewReader(reader)
for {
line, err := bufReader.ReadBytes('\n')
if err == io.EOF {
return
}
if err != nil {
logrus.Warningf("get job log error: %s", err.Error())
return
}
writer.Write(line)
}
}
}
func (c *controller) DeleteJob(job string) {
namespace := c.namespace
logrus.Debugf("start delete job: %s", job)
// delete job
if err := c.KubeClient.CoreV1().Pods(namespace).Delete(job, &metav1.DeleteOptions{}); err != nil {
if !k8sErrors.IsNotFound(err) {
logrus.Errorf("delete job failed: %s", err.Error())
}
}
c.subJobStatus.Delete(job)
logrus.Infof("delete job %s finish", job)
}