fix channel block

This commit is contained in:
fanyangyang 2020-05-07 18:43:01 +08:00
parent 3ee13b0abf
commit 8631c15465

View File

@ -96,34 +96,23 @@ func InitJobController(stop chan struct{}, kubeClient kubernetes.Interface) erro
}
if terminated != nil && terminated.ExitCode > 0 {
if val, exist := jobController.subJobStatus.Load(job.Name); exist {
logrus.Infof("job[%s] container exit %d and failed", job.Name, terminated.ExitCode)
ch := val.(*channels.RingChannel)
ch.In() <- "failed"
}
}
if terminated != nil {
logrus.Debugf("job[%s] container is ready", job.Name)
if val, exist := jobController.jobContainerStatus.Load(job.Name); exist {
jobContainerCh := val.(*channels.RingChannel)
jobContainerCh.In() <- struct{}{}
}
}
if buildContainer.State.Waiting != nil && buildContainer.State.Waiting.Reason == "CrashLoopBackOff" {
logrus.Infof("job %s container status is waiting and reason is CrashLoopBackOff", job.Name)
if val, exist := jobController.jobContainerStatus.Load(job.Name); exist {
jobContainerCh := val.(*channels.RingChannel)
jobContainerCh.In() <- struct{}{}
}
if val, exist := jobController.subJobStatus.Load(job.Name); exist {
ch := val.(*channels.RingChannel)
ch.In() <- "failed"
}
}
if buildContainer.State.Running != nil {
if buildContainer.State.Running != nil || terminated != nil {
// job container is ready
logrus.Debugf("job[%s] container is ready", job.Name)
if val, exist := jobController.jobContainerStatus.Load(job.Name); exist {
jobContainerCh := val.(*channels.RingChannel)
jobContainerCh.In() <- struct{}{}
jobContainerCh := val.(chan struct{})
// no block channel write
select {
case jobContainerCh <- struct{}{}:
logrus.Debugf("job[%s] push log message successfully", job.Name)
default:
// if channel is block, ignore it
}
logrus.Infof("job[%s] container running, read log", job.Name)
}
}
}
@ -131,7 +120,7 @@ func InitJobController(stop chan struct{}, kubeClient kubernetes.Interface) erro
}
infFactory := informers.NewSharedInformerFactoryWithOptions(
kubeClient,
time.Second*3,
time.Second*10,
informers.WithNamespace(jobController.namespace),
informers.WithTweakListOptions(func(options *metav1.ListOptions) {
options.LabelSelector = "job=codebuild"
@ -164,7 +153,7 @@ func (c *controller) GetServiceJobs(serviceID string) ([]*corev1.Pod, error) {
func (c *controller) ExecJob(ctx context.Context, job *corev1.Pod, logger io.Writer, result *channels.RingChannel) error {
// one job, one job container channel
jobContainerCh := channels.NewRingChannel(1)
jobContainerCh := make(chan struct{}, 1)
c.jobContainerStatus.Store(job.Name, jobContainerCh)
if j, _ := c.GetJob(job.Name); j != nil {
go c.getLogger(ctx, job.Name, logger, result, jobContainerCh)
@ -188,45 +177,40 @@ func (c *controller) Start(stop chan struct{}) error {
return nil
}
func (c *controller) getLogger(ctx context.Context, job string, writer io.Writer, result, jobContainerCh *channels.RingChannel) {
func (c *controller) getLogger(ctx context.Context, job string, writer io.Writer, result *channels.RingChannel, jobContainerCh chan struct{}) {
defer func() {
logrus.Infof("job[%s] get log complete", job)
result.In() <- "logcomplete"
}()
once := sync.Once{}
for {
select {
case <-ctx.Done():
logrus.Debugf("job[%s] task is done, exit get log func", job)
return
case <-jobContainerCh.Out():
// get log only do once
logrus.Debugf("job[%s] get container ready message", job)
once.Do(func() {
logrus.Debugf("job[%s] container is ready, start get log stream", job)
podLogRequest := c.KubeClient.CoreV1().Pods(c.namespace).GetLogs(job, &corev1.PodLogOptions{Follow: true})
reader, err := podLogRequest.Stream()
if err != nil {
logrus.Warnf("get build job pod log data error: %s", err.Error())
case <-jobContainerCh:
// reader log just only do once, if complete, exit this func
logrus.Debugf("job[%s] container is ready, start get log stream", job)
podLogRequest := c.KubeClient.CoreV1().Pods(c.namespace).GetLogs(job, &corev1.PodLogOptions{Follow: true})
reader, err := podLogRequest.Stream()
if err != nil {
logrus.Warnf("get build job pod log data error: %s", err.Error())
return
}
logrus.Debugf("get job[%s] log stream successfully, ready for reading log", job)
defer reader.Close()
bufReader := bufio.NewReader(reader)
for {
line, err := bufReader.ReadBytes('\n')
if err == io.EOF {
logrus.Debugf("job[%s] get log eof", job)
return
}
logrus.Debugf("get job[%s] log stream successfully, ready for reading log", job)
defer reader.Close()
bufReader := bufio.NewReader(reader)
for {
line, err := bufReader.ReadBytes('\n')
if err == io.EOF {
logrus.Debugf("job[%s] get log eof", job)
return
}
if err != nil {
logrus.Warningf("get job log error: %s", err.Error())
return
}
writer.Write(line)
if err != nil {
logrus.Warningf("get job log error: %s", err.Error())
return
}
})
return
writer.Write(line)
}
}
}
}