2018-03-14 14:12:26 +08:00
// Copyright (C) 2014-2018 Goodrain Co., Ltd.
2017-11-07 11:40:44 +08:00
// RAINBOND, Application Management Platform
2018-03-14 14:33:31 +08:00
2017-11-07 11:40:44 +08:00
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version. For any non-GPL usage of Rainbond,
// one or multiple Commercial Licenses authorized by Goodrain Co., Ltd.
// must be obtained first.
2018-03-14 14:33:31 +08:00
2017-11-07 11:40:44 +08:00
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
2018-03-14 14:33:31 +08:00
2017-11-07 11:40:44 +08:00
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package exector
import (
2019-06-29 15:20:11 +08:00
"context"
2017-11-07 11:40:44 +08:00
"fmt"
2019-06-29 15:20:11 +08:00
"runtime"
2018-03-30 15:37:36 +08:00
"runtime/debug"
2019-06-29 15:20:11 +08:00
"sync"
2017-11-07 11:40:44 +08:00
"time"
2018-01-10 13:35:05 +08:00
"github.com/Sirupsen/logrus"
2018-07-23 17:39:01 +08:00
2018-01-23 12:13:22 +08:00
"github.com/coreos/etcd/clientv3"
2018-12-07 14:24:14 +08:00
"github.com/docker/docker/client"
2018-12-04 15:09:00 +08:00
"github.com/goodrain/rainbond/cmd/builder/option"
2018-04-24 16:44:59 +08:00
"github.com/goodrain/rainbond/db"
dbmodel "github.com/goodrain/rainbond/db/model"
"github.com/goodrain/rainbond/event"
"github.com/goodrain/rainbond/mq/api/grpc/pb"
2018-12-04 15:09:00 +08:00
mqclient "github.com/goodrain/rainbond/mq/client"
2018-05-25 18:24:17 +08:00
"github.com/goodrain/rainbond/util"
2019-03-08 19:22:22 +08:00
workermodel "github.com/goodrain/rainbond/worker/discover/model"
2017-11-07 11:40:44 +08:00
"github.com/tidwall/gjson"
)
2019-03-05 15:00:39 +08:00
//TaskNum task number
var TaskNum float64
//ErrorNum error run task number
var ErrorNum float64
2018-07-11 15:44:29 +08:00
2017-11-07 11:40:44 +08:00
//Manager 任务执行管理器
type Manager interface {
AddTask ( * pb . TaskMessage ) error
2019-06-29 15:20:11 +08:00
SetReturnTaskChan ( chan * pb . TaskMessage )
2017-11-07 11:40:44 +08:00
Start ( ) error
Stop ( ) error
}
//NewManager new manager
2018-12-04 15:09:00 +08:00
func NewManager ( conf option . Config , mqc mqclient . MQClient ) ( Manager , error ) {
2017-11-07 11:40:44 +08:00
dockerClient , err := client . NewEnvClient ( )
if err != nil {
return nil , err
}
2018-01-23 12:13:22 +08:00
etcdCli , err := clientv3 . New ( clientv3 . Config {
Endpoints : conf . EtcdEndPoints ,
DialTimeout : 5 * time . Second ,
} )
2018-06-07 14:27:03 +08:00
if err != nil {
return nil , err
}
2019-06-29 15:20:11 +08:00
maxConcurrentTask := runtime . NumCPU ( ) * 2
ctx , cancel := context . WithCancel ( context . Background ( ) )
logrus . Infof ( "The maximum number of concurrent build tasks supported by the current node is %d" , maxConcurrentTask )
2017-11-07 11:40:44 +08:00
return & exectorManager {
2019-06-29 15:20:11 +08:00
DockerClient : dockerClient ,
EtcdCli : etcdCli ,
mqClient : mqc ,
tasks : make ( chan * pb . TaskMessage , maxConcurrentTask ) ,
maxConcurrentTask : maxConcurrentTask ,
ctx : ctx ,
cancel : cancel ,
2017-11-07 11:40:44 +08:00
} , nil
}
type exectorManager struct {
2019-06-29 15:20:11 +08:00
DockerClient * client . Client
EtcdCli * clientv3 . Client
tasks chan * pb . TaskMessage
callback chan * pb . TaskMessage
maxConcurrentTask int
mqClient mqclient . MQClient
ctx context . Context
cancel context . CancelFunc
runningTask sync . Map
2017-11-07 11:40:44 +08:00
}
2018-05-07 15:27:44 +08:00
//TaskWorker worker interface
type TaskWorker interface {
2018-05-07 15:10:12 +08:00
Run ( timeout time . Duration ) error
GetLogger ( ) event . Logger
Name ( ) string
Stop ( ) error
2018-05-23 11:08:44 +08:00
//ErrorCallBack if run error will callback
ErrorCallBack ( err error )
2018-05-07 15:10:12 +08:00
}
2018-05-28 11:14:56 +08:00
var workerCreaterList = make ( map [ string ] func ( [ ] byte , * exectorManager ) ( TaskWorker , error ) )
2018-05-07 15:10:12 +08:00
//RegisterWorker register worker creater
2018-05-28 11:14:56 +08:00
func RegisterWorker ( name string , fun func ( [ ] byte , * exectorManager ) ( TaskWorker , error ) ) {
2018-05-07 15:10:12 +08:00
workerCreaterList [ name ] = fun
}
2019-06-29 15:20:11 +08:00
//ErrCallback do not handle this task
var ErrCallback = fmt . Errorf ( "callback task to mq" )
func ( e * exectorManager ) SetReturnTaskChan ( re chan * pb . TaskMessage ) {
e . callback = re
}
2017-11-07 11:40:44 +08:00
//TaskType:
2018-03-24 16:16:47 +08:00
//build_from_image build app from docker image
//build_from_source_code build app from source code
//build_from_market_slug build app from app market by download slug
//service_check check service source info
//plugin_image_build build plugin from image
//plugin_dockerfile_build build plugin from dockerfile
//share-slug share app with slug
//share-image share app with image
2017-11-07 11:40:44 +08:00
func ( e * exectorManager ) AddTask ( task * pb . TaskMessage ) error {
2019-06-29 15:20:11 +08:00
select {
case e . tasks <- task :
TaskNum ++
return nil
2017-11-07 11:40:44 +08:00
default :
2019-06-29 15:20:11 +08:00
logrus . Infof ( "The current number of parallel builds exceeds the maximum" )
if e . callback != nil {
e . callback <- task
return nil
}
return ErrCallback
}
}
func ( e * exectorManager ) runTask ( f func ( task * pb . TaskMessage ) , task * pb . TaskMessage ) {
e . runningTask . LoadOrStore ( task . TaskId , task )
f ( task )
e . runningTask . Delete ( task . TaskId )
}
func ( e * exectorManager ) runTaskWithErr ( f func ( task * pb . TaskMessage ) error , task * pb . TaskMessage ) {
e . runningTask . LoadOrStore ( task . TaskId , task )
if err := f ( task ) ; err != nil {
logrus . Errorf ( "run builder task failure %s" , err . Error ( ) )
}
e . runningTask . Delete ( task . TaskId )
}
func ( e * exectorManager ) RunTask ( ) {
for {
select {
case <- e . ctx . Done ( ) :
return
case task := <- e . tasks :
switch task . TaskType {
case "build_from_image" :
go e . runTask ( e . buildFromImage , task )
case "build_from_source_code" :
go e . runTask ( e . buildFromSourceCode , task )
case "build_from_market_slug" :
//deprecated
e . buildFromMarketSlug ( task )
case "service_check" :
go e . runTask ( e . serviceCheck , task )
case "plugin_image_build" :
go e . runTask ( e . pluginImageBuild , task )
case "plugin_dockerfile_build" :
go e . runTask ( e . pluginDockerfileBuild , task )
case "share-slug" :
//deprecated
e . slugShare ( task )
case "share-image" :
go e . runTask ( e . imageShare , task )
default :
go e . runTaskWithErr ( e . exec , task )
}
}
2018-05-07 15:10:12 +08:00
}
}
2018-10-18 18:39:21 +08:00
func ( e * exectorManager ) exec ( task * pb . TaskMessage ) error {
2019-03-26 11:07:06 +08:00
creator , ok := workerCreaterList [ task . TaskType ]
2018-05-07 15:10:12 +08:00
if ! ok {
2018-10-18 18:39:21 +08:00
return fmt . Errorf ( "`%s` tasktype can't support" , task . TaskType )
2017-11-07 11:40:44 +08:00
}
2019-03-26 11:07:06 +08:00
worker , err := creator ( task . TaskBody , e )
2018-05-23 11:08:44 +08:00
if err != nil {
logrus . Errorf ( "create worker for builder error.%s" , err )
return err
}
2019-06-29 15:20:11 +08:00
defer event . GetManager ( ) . ReleaseLogger ( worker . GetLogger ( ) )
defer func ( ) {
if r := recover ( ) ; r != nil {
fmt . Println ( r )
debug . PrintStack ( )
worker . GetLogger ( ) . Error ( util . Translation ( "Please try again or contact customer service" ) , map [ string ] string { "step" : "callback" , "status" : "failure" } )
worker . ErrorCallBack ( fmt . Errorf ( "%s" , r ) )
2018-05-23 11:08:44 +08:00
}
2018-05-07 15:10:12 +08:00
} ( )
2019-06-29 15:20:11 +08:00
if err := worker . Run ( time . Minute * 10 ) ; err != nil {
ErrorNum ++
worker . ErrorCallBack ( err )
}
2017-11-07 11:40:44 +08:00
return nil
}
2018-04-11 21:38:48 +08:00
//buildFromImage build app from docker image
2018-10-18 18:39:21 +08:00
func ( e * exectorManager ) buildFromImage ( task * pb . TaskMessage ) {
i := NewImageBuildItem ( task . TaskBody )
2018-01-25 22:10:34 +08:00
i . DockerClient = e . DockerClient
2018-12-04 15:09:00 +08:00
i . Logger . Info ( "Start with the image build application task" , map [ string ] string { "step" : "builder-exector" , "status" : "starting" } )
2019-06-29 15:20:11 +08:00
logrus . Debugf ( "start build from image worker" )
defer event . GetManager ( ) . ReleaseLogger ( i . Logger )
defer func ( ) {
if r := recover ( ) ; r != nil {
fmt . Println ( r )
debug . PrintStack ( )
i . Logger . Error ( "Back end service drift. Please check the rbd-chaos log" , map [ string ] string { "step" : "callback" , "status" : "failure" } )
2018-01-25 22:10:34 +08:00
}
} ( )
2019-06-29 15:20:11 +08:00
start := time . Now ( )
defer func ( ) {
logrus . Debugf ( "complete build from source code, consuming time %s" , time . Now ( ) . Sub ( start ) . String ( ) )
} ( )
for n := 0 ; n < 2 ; n ++ {
2018-08-27 18:37:41 +08:00
err := i . Run ( time . Minute * 30 )
if err != nil {
2019-06-29 15:20:11 +08:00
logrus . Errorf ( "build from image error: %s" , err . Error ( ) )
if n < 1 {
i . Logger . Error ( "The application task to build from the mirror failed to execute, will try" , map [ string ] string { "step" : "build-exector" , "status" : "failure" } )
} else {
ErrorNum ++
i . Logger . Error ( "The application task to build from the image failed to execute" , map [ string ] string { "step" : "callback" , "status" : "failure" } )
if err := i . UpdateVersionInfo ( "failure" ) ; err != nil {
logrus . Debugf ( "update version Info error: %s" , err . Error ( ) )
}
2018-05-23 16:43:16 +08:00
}
2018-12-04 15:09:00 +08:00
} else {
2019-03-08 19:22:22 +08:00
var configs = make ( map [ string ] string , len ( i . Configs ) )
for k , v := range i . Configs {
configs [ k ] = v . String ( )
}
err = e . sendAction ( i . TenantID , i . ServiceID , i . EventID , i . DeployVersion , i . Action , configs , i . Logger )
2018-12-04 15:09:00 +08:00
if err != nil {
i . Logger . Error ( "Send upgrade action failed" , map [ string ] string { "step" : "callback" , "status" : "failure" } )
}
2019-06-29 15:20:11 +08:00
break
}
}
}
//buildFromSourceCode build app from source code
//support git repository
func ( e * exectorManager ) buildFromSourceCode ( task * pb . TaskMessage ) {
i := NewSouceCodeBuildItem ( task . TaskBody )
i . DockerClient = e . DockerClient
i . Logger . Info ( "Build app version from source code start" , map [ string ] string { "step" : "builder-exector" , "status" : "starting" } )
start := time . Now ( )
logrus . Debugf ( "start build from source code" )
defer event . GetManager ( ) . ReleaseLogger ( i . Logger )
defer func ( ) {
if r := recover ( ) ; r != nil {
fmt . Println ( r )
debug . PrintStack ( )
i . Logger . Error ( "Back end service drift. Please check the rbd-chaos log" , map [ string ] string { "step" : "callback" , "status" : "failure" } )
2018-03-15 18:30:02 +08:00
}
2018-01-30 15:10:12 +08:00
} ( )
2019-06-29 15:20:11 +08:00
defer func ( ) {
logrus . Debugf ( "Complete build from source code, consuming time %s" , time . Now ( ) . Sub ( start ) . String ( ) )
} ( )
err := i . Run ( time . Minute * 30 )
if err != nil {
logrus . Errorf ( "build from source code error: %s" , err . Error ( ) )
i . Logger . Error ( "Build app version from source code failure" , map [ string ] string { "step" : "callback" , "status" : "failure" } )
vi := & dbmodel . VersionInfo {
FinalStatus : "failure" ,
EventID : i . EventID ,
CodeVersion : i . commit . Hash ,
CommitMsg : i . commit . Message ,
Author : i . commit . Author ,
}
if err := i . UpdateVersionInfo ( vi ) ; err != nil {
logrus . Debugf ( "update version Info error: %s" , err . Error ( ) )
}
} else {
var configs = make ( map [ string ] string , len ( i . Configs ) )
for k , v := range i . Configs {
configs [ k ] = v . String ( )
}
err = e . sendAction ( i . TenantID , i . ServiceID , i . EventID , i . DeployVersion , i . Action , configs , i . Logger )
if err != nil {
i . Logger . Error ( "Send upgrade action failed" , map [ string ] string { "step" : "callback" , "status" : "failure" } )
}
}
2018-01-25 22:10:34 +08:00
}
2018-04-11 21:38:48 +08:00
//buildFromMarketSlug build app from market slug
2018-10-18 18:39:21 +08:00
func ( e * exectorManager ) buildFromMarketSlug ( task * pb . TaskMessage ) {
eventID := gjson . GetBytes ( task . TaskBody , "event_id" ) . String ( )
2018-03-05 17:40:56 +08:00
logger := event . GetManager ( ) . GetLogger ( eventID )
2018-12-04 15:09:00 +08:00
logger . Info ( "Build app version from market slug start" , map [ string ] string { "step" : "builder-exector" , "status" : "starting" } )
2018-10-18 18:39:21 +08:00
i , err := NewMarketSlugItem ( task . TaskBody )
2018-03-05 17:40:56 +08:00
if err != nil {
logrus . Error ( "create build from market slug task error." , err . Error ( ) )
return
}
go func ( ) {
2018-08-24 10:02:34 +08:00
start := time . Now ( )
2018-03-05 17:40:56 +08:00
defer event . GetManager ( ) . ReleaseLogger ( i . Logger )
2018-03-30 15:37:36 +08:00
defer func ( ) {
if r := recover ( ) ; r != nil {
2018-04-12 18:09:13 +08:00
fmt . Println ( r )
2018-03-30 15:37:36 +08:00
debug . PrintStack ( )
2018-08-27 17:32:56 +08:00
i . Logger . Error ( "Back end service drift. Please check the rbd-chaos log" , map [ string ] string { "step" : "callback" , "status" : "failure" } )
2018-03-30 15:37:36 +08:00
}
} ( )
2018-08-24 10:02:34 +08:00
defer func ( ) {
logrus . Debugf ( "complete build from market slug consuming time %s" , time . Now ( ) . Sub ( start ) . String ( ) )
} ( )
2018-03-05 17:40:56 +08:00
for n := 0 ; n < 2 ; n ++ {
err := i . Run ( )
if err != nil {
logrus . Errorf ( "image share error: %s" , err . Error ( ) )
2018-03-15 15:59:18 +08:00
if n < 1 {
2018-12-04 15:09:00 +08:00
i . Logger . Error ( "Build app version from market slug failure, will try" , map [ string ] string { "step" : "builder-exector" , "status" : "failure" } )
2018-03-05 17:40:56 +08:00
} else {
2018-07-27 14:55:34 +08:00
ErrorNum ++
2018-12-04 15:09:00 +08:00
i . Logger . Error ( "Build app version from market slug failure" , map [ string ] string { "step" : "callback" , "status" : "failure" } )
2018-03-05 17:40:56 +08:00
}
} else {
2019-03-08 19:22:22 +08:00
err = e . sendAction ( i . TenantID , i . ServiceID , i . EventID , i . DeployVersion , i . Action , i . Configs , i . Logger )
2018-12-04 15:09:00 +08:00
if err != nil {
i . Logger . Error ( "Send upgrade action failed" , map [ string ] string { "step" : "callback" , "status" : "failure" } )
}
2018-03-05 17:40:56 +08:00
break
}
}
} ( )
2018-03-04 22:48:50 +08:00
}
2018-12-04 15:09:00 +08:00
//rollingUpgradeTaskBody upgrade message body type
type rollingUpgradeTaskBody struct {
TenantID string ` json:"tenant_id" `
ServiceID string ` json:"service_id" `
EventID string ` json:"event_id" `
Strategy [ ] string ` json:"strategy" `
}
2019-03-08 19:22:22 +08:00
func ( e * exectorManager ) sendAction ( tenantID , serviceID , eventID , newVersion , actionType string , configs map [ string ] string , logger event . Logger ) error {
2018-12-04 15:09:00 +08:00
switch actionType {
case "upgrade" :
if err := db . GetManager ( ) . TenantServiceDao ( ) . UpdateDeployVersion ( serviceID , newVersion ) ; err != nil {
return fmt . Errorf ( "Update app service deploy version failure.Please try the upgrade again" )
}
2019-03-08 19:22:22 +08:00
body := workermodel . RollingUpgradeTaskBody {
TenantID : tenantID ,
ServiceID : serviceID ,
NewDeployVersion : newVersion ,
EventID : eventID ,
Configs : configs ,
2018-12-04 15:09:00 +08:00
}
if err := e . mqClient . SendBuilderTopic ( mqclient . TaskStruct {
2018-12-04 18:08:51 +08:00
Topic : mqclient . WorkerTopic ,
2018-12-04 15:09:00 +08:00
TaskType : "rolling_upgrade" ,
TaskBody : body ,
} ) ; err != nil {
return err
}
logger . Info ( "Build success,start upgrade app service" , map [ string ] string { "step" : "builder" , "status" : "running" } )
2018-12-04 15:21:33 +08:00
return nil
2018-12-04 15:09:00 +08:00
default :
logger . Info ( "Build success,do not other action" , map [ string ] string { "step" : "last" , "status" : "success" } )
}
return nil
}
2018-04-11 21:38:48 +08:00
//slugShare share app of slug
2018-10-18 18:39:21 +08:00
func ( e * exectorManager ) slugShare ( task * pb . TaskMessage ) {
i , err := NewSlugShareItem ( task . TaskBody , e . EtcdCli )
2018-03-04 22:48:50 +08:00
if err != nil {
logrus . Error ( "create share image task error." , err . Error ( ) )
return
}
i . Logger . Info ( "开始分享应用" , map [ string ] string { "step" : "builder-exector" , "status" : "starting" } )
2018-02-07 16:10:26 +08:00
status := "success"
2018-03-01 11:30:54 +08:00
go func ( ) {
2018-02-06 17:41:47 +08:00
defer event . GetManager ( ) . ReleaseLogger ( i . Logger )
2018-04-12 18:09:13 +08:00
defer func ( ) {
if r := recover ( ) ; r != nil {
fmt . Println ( r )
debug . PrintStack ( )
i . Logger . Error ( "后端服务开小差,请重试或联系客服" , map [ string ] string { "step" : "callback" , "status" : "failure" } )
}
} ( )
2018-03-15 15:59:18 +08:00
for n := 0 ; n < 2 ; n ++ {
2018-03-04 22:48:50 +08:00
err := i . ShareService ( )
2018-02-06 17:41:47 +08:00
if err != nil {
2018-03-04 22:48:50 +08:00
logrus . Errorf ( "image share error: %s" , err . Error ( ) )
2018-03-15 15:59:18 +08:00
if n < 1 {
2018-03-04 22:48:50 +08:00
i . Logger . Error ( "应用分享失败,开始重试" , map [ string ] string { "step" : "builder-exector" , "status" : "failure" } )
2018-03-01 11:30:54 +08:00
} else {
2018-07-27 14:55:34 +08:00
ErrorNum ++
2018-03-04 22:48:50 +08:00
i . Logger . Error ( "分享应用任务执行失败" , map [ string ] string { "step" : "builder-exector" , "status" : "failure" } )
2018-03-01 11:30:54 +08:00
status = "failure"
2018-02-06 17:41:47 +08:00
}
2018-03-01 11:30:54 +08:00
} else {
2018-03-04 22:48:50 +08:00
status = "success"
2018-02-06 17:41:47 +08:00
break
}
}
2018-03-04 22:48:50 +08:00
if err := i . UpdateShareStatus ( status ) ; err != nil {
logrus . Debugf ( "Add image share result error: %s" , err . Error ( ) )
}
2018-02-06 17:41:47 +08:00
} ( )
2018-02-05 19:23:59 +08:00
}
2018-04-11 21:38:48 +08:00
//imageShare share app of docker image
2018-10-18 18:39:21 +08:00
func ( e * exectorManager ) imageShare ( task * pb . TaskMessage ) {
i , err := NewImageShareItem ( task . TaskBody , e . DockerClient , e . EtcdCli )
2018-03-04 22:48:50 +08:00
if err != nil {
logrus . Error ( "create share image task error." , err . Error ( ) )
return
}
i . Logger . Info ( "开始分享应用" , map [ string ] string { "step" : "builder-exector" , "status" : "starting" } )
2018-02-07 16:10:26 +08:00
status := "success"
2019-06-29 15:20:11 +08:00
defer event . GetManager ( ) . ReleaseLogger ( i . Logger )
defer func ( ) {
if r := recover ( ) ; r != nil {
fmt . Println ( r )
debug . PrintStack ( )
i . Logger . Error ( "后端服务开小差,请重试或联系客服" , map [ string ] string { "step" : "callback" , "status" : "failure" } )
}
} ( )
for n := 0 ; n < 2 ; n ++ {
err := i . ShareService ( )
if err != nil {
logrus . Errorf ( "image share error: %s" , err . Error ( ) )
if n < 1 {
i . Logger . Error ( "应用分享失败,开始重试" , map [ string ] string { "step" : "builder-exector" , "status" : "failure" } )
2018-03-01 11:30:54 +08:00
} else {
2019-06-29 15:20:11 +08:00
ErrorNum ++
i . Logger . Error ( "分享应用任务执行失败" , map [ string ] string { "step" : "builder-exector" , "status" : "failure" } )
status = "failure"
2018-02-07 11:11:35 +08:00
}
2019-06-29 15:20:11 +08:00
} else {
status = "success"
break
2018-02-07 11:11:35 +08:00
}
2019-06-29 15:20:11 +08:00
}
if err := i . UpdateShareStatus ( status ) ; err != nil {
logrus . Debugf ( "Add image share result error: %s" , err . Error ( ) )
}
2018-02-05 19:23:59 +08:00
}
2017-11-07 11:40:44 +08:00
func ( e * exectorManager ) Start ( ) error {
2019-06-29 15:20:11 +08:00
go e . RunTask ( )
2017-11-07 11:40:44 +08:00
return nil
}
func ( e * exectorManager ) Stop ( ) error {
2019-06-29 15:20:11 +08:00
e . cancel ( )
for task := range e . tasks {
if e . callback != nil {
e . callback <- task
}
}
2018-07-06 16:55:10 +08:00
logrus . Info ( "Waiting for all threads to exit." )
2018-10-18 18:39:21 +08:00
i := 0
timer := time . NewTimer ( time . Second * 2 )
defer timer . Stop ( )
for {
if i >= 15 {
logrus . Errorf ( "There are %d tasks not completed" , len ( e . tasks ) )
return fmt . Errorf ( "There are %d tasks not completed " , len ( e . tasks ) )
}
2019-06-29 15:20:11 +08:00
runningTaskSum := 0
e . runningTask . Range ( func ( k , v interface { } ) bool {
runningTaskSum ++
return true
} )
if runningTaskSum == 0 {
2018-10-18 18:39:21 +08:00
break
}
select {
case <- timer . C :
2018-12-04 13:43:15 +08:00
i ++
2018-10-18 22:29:25 +08:00
timer . Reset ( time . Second * 2 )
2018-10-18 18:39:21 +08:00
}
}
2018-07-06 16:55:10 +08:00
logrus . Info ( "All threads is exited." )
2017-11-07 11:40:44 +08:00
return nil
}