2018-03-14 14:12:26 +08:00
// Copyright (C) 2014-2018 Goodrain Co., Ltd.
2017-11-07 11:40:44 +08:00
// RAINBOND, Application Management Platform
2018-03-14 14:33:31 +08:00
2017-11-07 11:40:44 +08:00
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version. For any non-GPL usage of Rainbond,
// one or multiple Commercial Licenses authorized by Goodrain Co., Ltd.
// must be obtained first.
2018-03-14 14:33:31 +08:00
2017-11-07 11:40:44 +08:00
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
2018-03-14 14:33:31 +08:00
2017-11-07 11:40:44 +08:00
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package exector
import (
2019-06-29 15:20:11 +08:00
"context"
2017-11-07 11:40:44 +08:00
"fmt"
2019-06-29 15:20:11 +08:00
"runtime"
2018-03-30 15:37:36 +08:00
"runtime/debug"
2019-06-29 15:20:11 +08:00
"sync"
2017-11-07 11:40:44 +08:00
"time"
2018-01-10 13:35:05 +08:00
"github.com/Sirupsen/logrus"
2018-01-23 12:13:22 +08:00
"github.com/coreos/etcd/clientv3"
2018-12-07 14:24:14 +08:00
"github.com/docker/docker/client"
2018-12-04 15:09:00 +08:00
"github.com/goodrain/rainbond/cmd/builder/option"
2018-04-24 16:44:59 +08:00
"github.com/goodrain/rainbond/db"
dbmodel "github.com/goodrain/rainbond/db/model"
"github.com/goodrain/rainbond/event"
"github.com/goodrain/rainbond/mq/api/grpc/pb"
2018-12-04 15:09:00 +08:00
mqclient "github.com/goodrain/rainbond/mq/client"
2018-05-25 18:24:17 +08:00
"github.com/goodrain/rainbond/util"
2019-03-08 19:22:22 +08:00
workermodel "github.com/goodrain/rainbond/worker/discover/model"
2017-11-07 11:40:44 +08:00
"github.com/tidwall/gjson"
)
2019-07-12 14:42:24 +08:00
//MetricTaskNum task number
var MetricTaskNum float64
2019-03-05 15:00:39 +08:00
2019-07-12 14:42:24 +08:00
//MetricErrorTaskNum error run task number
var MetricErrorTaskNum float64
//MetricBackTaskNum back task number
var MetricBackTaskNum float64
2018-07-11 15:44:29 +08:00
2017-11-07 11:40:44 +08:00
//Manager 任务执行管理器
type Manager interface {
2019-07-12 14:42:24 +08:00
GetMaxConcurrentTask ( ) float64
GetCurrentConcurrentTask ( ) float64
2017-11-07 11:40:44 +08:00
AddTask ( * pb . TaskMessage ) error
2019-06-29 18:45:09 +08:00
SetReturnTaskChan ( func ( * pb . TaskMessage ) )
2017-11-07 11:40:44 +08:00
Start ( ) error
Stop ( ) error
}
//NewManager new manager
2018-12-04 15:09:00 +08:00
func NewManager ( conf option . Config , mqc mqclient . MQClient ) ( Manager , error ) {
2017-11-07 11:40:44 +08:00
dockerClient , err := client . NewEnvClient ( )
if err != nil {
return nil , err
}
2018-01-23 12:13:22 +08:00
etcdCli , err := clientv3 . New ( clientv3 . Config {
Endpoints : conf . EtcdEndPoints ,
DialTimeout : 5 * time . Second ,
} )
2018-06-07 14:27:03 +08:00
if err != nil {
return nil , err
}
2019-06-29 15:20:11 +08:00
maxConcurrentTask := runtime . NumCPU ( ) * 2
ctx , cancel := context . WithCancel ( context . Background ( ) )
logrus . Infof ( "The maximum number of concurrent build tasks supported by the current node is %d" , maxConcurrentTask )
2017-11-07 11:40:44 +08:00
return & exectorManager {
2019-06-29 15:20:11 +08:00
DockerClient : dockerClient ,
EtcdCli : etcdCli ,
mqClient : mqc ,
tasks : make ( chan * pb . TaskMessage , maxConcurrentTask ) ,
maxConcurrentTask : maxConcurrentTask ,
ctx : ctx ,
cancel : cancel ,
2017-11-07 11:40:44 +08:00
} , nil
}
type exectorManager struct {
2019-06-29 15:20:11 +08:00
DockerClient * client . Client
EtcdCli * clientv3 . Client
tasks chan * pb . TaskMessage
2019-06-29 18:45:09 +08:00
callback func ( * pb . TaskMessage )
2019-06-29 15:20:11 +08:00
maxConcurrentTask int
mqClient mqclient . MQClient
ctx context . Context
cancel context . CancelFunc
runningTask sync . Map
2017-11-07 11:40:44 +08:00
}
2018-05-07 15:27:44 +08:00
//TaskWorker worker interface
type TaskWorker interface {
2018-05-07 15:10:12 +08:00
Run ( timeout time . Duration ) error
GetLogger ( ) event . Logger
Name ( ) string
Stop ( ) error
2018-05-23 11:08:44 +08:00
//ErrorCallBack if run error will callback
ErrorCallBack ( err error )
2018-05-07 15:10:12 +08:00
}
2018-05-28 11:14:56 +08:00
var workerCreaterList = make ( map [ string ] func ( [ ] byte , * exectorManager ) ( TaskWorker , error ) )
2018-05-07 15:10:12 +08:00
//RegisterWorker register worker creater
2018-05-28 11:14:56 +08:00
func RegisterWorker ( name string , fun func ( [ ] byte , * exectorManager ) ( TaskWorker , error ) ) {
2018-05-07 15:10:12 +08:00
workerCreaterList [ name ] = fun
}
2019-06-29 15:20:11 +08:00
//ErrCallback do not handle this task
var ErrCallback = fmt . Errorf ( "callback task to mq" )
2019-06-29 18:45:09 +08:00
func ( e * exectorManager ) SetReturnTaskChan ( re func ( * pb . TaskMessage ) ) {
2019-06-29 15:20:11 +08:00
e . callback = re
}
2017-11-07 11:40:44 +08:00
//TaskType:
2018-03-24 16:16:47 +08:00
//build_from_image build app from docker image
//build_from_source_code build app from source code
//build_from_market_slug build app from app market by download slug
//service_check check service source info
//plugin_image_build build plugin from image
//plugin_dockerfile_build build plugin from dockerfile
//share-slug share app with slug
//share-image share app with image
2017-11-07 11:40:44 +08:00
func ( e * exectorManager ) AddTask ( task * pb . TaskMessage ) error {
2019-06-29 15:20:11 +08:00
select {
case e . tasks <- task :
2019-07-12 14:42:24 +08:00
MetricTaskNum ++
2019-06-29 18:45:09 +08:00
e . RunTask ( task )
2019-06-29 15:20:11 +08:00
return nil
2017-11-07 11:40:44 +08:00
default :
2019-06-29 15:20:11 +08:00
logrus . Infof ( "The current number of parallel builds exceeds the maximum" )
if e . callback != nil {
2019-06-29 18:45:09 +08:00
e . callback ( task )
//Wait a while
//It's best to wait until the current controller can continue adding tasks
for len ( e . tasks ) >= e . maxConcurrentTask {
time . Sleep ( time . Second * 2 )
}
2019-07-12 14:42:24 +08:00
MetricBackTaskNum ++
2019-06-29 15:20:11 +08:00
return nil
}
return ErrCallback
}
}
2019-07-12 14:42:24 +08:00
func ( e * exectorManager ) runTask ( f func ( task * pb . TaskMessage ) , task * pb . TaskMessage , concurrencyControl bool ) {
2019-06-29 18:45:09 +08:00
logrus . Infof ( "Build task %s in progress" , task . TaskId )
2019-06-29 15:20:11 +08:00
e . runningTask . LoadOrStore ( task . TaskId , task )
2019-07-12 14:42:24 +08:00
if ! concurrencyControl {
<- e . tasks
} else {
defer func ( ) { <- e . tasks } ( )
}
2019-06-29 15:20:11 +08:00
f ( task )
e . runningTask . Delete ( task . TaskId )
2019-06-29 18:45:09 +08:00
logrus . Infof ( "Build task %s is completed" , task . TaskId )
2019-06-29 15:20:11 +08:00
}
2019-07-12 14:42:24 +08:00
func ( e * exectorManager ) runTaskWithErr ( f func ( task * pb . TaskMessage ) error , task * pb . TaskMessage , concurrencyControl bool ) {
2019-06-29 18:45:09 +08:00
logrus . Infof ( "Build task %s in progress" , task . TaskId )
2019-06-29 15:20:11 +08:00
e . runningTask . LoadOrStore ( task . TaskId , task )
2019-07-12 14:42:24 +08:00
//Remove a task that is being executed, not necessarily a task that is currently completed
if ! concurrencyControl {
<- e . tasks
} else {
defer func ( ) { <- e . tasks } ( )
}
2019-06-29 15:20:11 +08:00
if err := f ( task ) ; err != nil {
logrus . Errorf ( "run builder task failure %s" , err . Error ( ) )
}
e . runningTask . Delete ( task . TaskId )
2019-06-29 18:45:09 +08:00
logrus . Infof ( "Build task %s is completed" , task . TaskId )
2019-06-29 15:20:11 +08:00
}
2019-06-29 18:45:09 +08:00
func ( e * exectorManager ) RunTask ( task * pb . TaskMessage ) {
switch task . TaskType {
case "build_from_image" :
2019-07-12 14:42:24 +08:00
go e . runTask ( e . buildFromImage , task , false )
2019-06-29 18:45:09 +08:00
case "build_from_source_code" :
2019-07-12 14:42:24 +08:00
go e . runTask ( e . buildFromSourceCode , task , true )
2019-06-29 18:45:09 +08:00
case "build_from_market_slug" :
//deprecated
2019-07-12 14:58:42 +08:00
go e . runTask ( e . buildFromMarketSlug , task , false )
2019-06-29 18:45:09 +08:00
case "service_check" :
2019-07-12 14:42:24 +08:00
go e . runTask ( e . serviceCheck , task , true )
2019-06-29 18:45:09 +08:00
case "plugin_image_build" :
2019-07-12 14:42:24 +08:00
go e . runTask ( e . pluginImageBuild , task , false )
2019-06-29 18:45:09 +08:00
case "plugin_dockerfile_build" :
2019-07-12 14:42:24 +08:00
go e . runTask ( e . pluginDockerfileBuild , task , true )
2019-06-29 18:45:09 +08:00
case "share-slug" :
//deprecated
2019-07-12 14:58:42 +08:00
go e . runTask ( e . slugShare , task , false )
2019-06-29 18:45:09 +08:00
case "share-image" :
2019-07-12 14:42:24 +08:00
go e . runTask ( e . imageShare , task , false )
2019-06-29 18:45:09 +08:00
default :
2019-07-12 14:42:24 +08:00
go e . runTaskWithErr ( e . exec , task , false )
2018-05-07 15:10:12 +08:00
}
}
2018-10-18 18:39:21 +08:00
func ( e * exectorManager ) exec ( task * pb . TaskMessage ) error {
2019-03-26 11:07:06 +08:00
creator , ok := workerCreaterList [ task . TaskType ]
2018-05-07 15:10:12 +08:00
if ! ok {
2018-10-18 18:39:21 +08:00
return fmt . Errorf ( "`%s` tasktype can't support" , task . TaskType )
2017-11-07 11:40:44 +08:00
}
2019-03-26 11:07:06 +08:00
worker , err := creator ( task . TaskBody , e )
2018-05-23 11:08:44 +08:00
if err != nil {
logrus . Errorf ( "create worker for builder error.%s" , err )
return err
}
2019-06-29 15:20:11 +08:00
defer event . GetManager ( ) . ReleaseLogger ( worker . GetLogger ( ) )
defer func ( ) {
if r := recover ( ) ; r != nil {
fmt . Println ( r )
debug . PrintStack ( )
worker . GetLogger ( ) . Error ( util . Translation ( "Please try again or contact customer service" ) , map [ string ] string { "step" : "callback" , "status" : "failure" } )
worker . ErrorCallBack ( fmt . Errorf ( "%s" , r ) )
2018-05-23 11:08:44 +08:00
}
2018-05-07 15:10:12 +08:00
} ( )
2019-06-29 15:20:11 +08:00
if err := worker . Run ( time . Minute * 10 ) ; err != nil {
2019-07-12 14:42:24 +08:00
MetricErrorTaskNum ++
2019-06-29 15:20:11 +08:00
worker . ErrorCallBack ( err )
}
2017-11-07 11:40:44 +08:00
return nil
}
2018-04-11 21:38:48 +08:00
//buildFromImage build app from docker image
2018-10-18 18:39:21 +08:00
func ( e * exectorManager ) buildFromImage ( task * pb . TaskMessage ) {
i := NewImageBuildItem ( task . TaskBody )
2018-01-25 22:10:34 +08:00
i . DockerClient = e . DockerClient
2018-12-04 15:09:00 +08:00
i . Logger . Info ( "Start with the image build application task" , map [ string ] string { "step" : "builder-exector" , "status" : "starting" } )
2019-06-29 15:20:11 +08:00
defer event . GetManager ( ) . ReleaseLogger ( i . Logger )
defer func ( ) {
if r := recover ( ) ; r != nil {
fmt . Println ( r )
debug . PrintStack ( )
i . Logger . Error ( "Back end service drift. Please check the rbd-chaos log" , map [ string ] string { "step" : "callback" , "status" : "failure" } )
2018-01-25 22:10:34 +08:00
}
} ( )
2019-06-29 15:20:11 +08:00
start := time . Now ( )
defer func ( ) {
logrus . Debugf ( "complete build from source code, consuming time %s" , time . Now ( ) . Sub ( start ) . String ( ) )
} ( )
for n := 0 ; n < 2 ; n ++ {
2018-08-27 18:37:41 +08:00
err := i . Run ( time . Minute * 30 )
if err != nil {
2019-06-29 15:20:11 +08:00
logrus . Errorf ( "build from image error: %s" , err . Error ( ) )
if n < 1 {
i . Logger . Error ( "The application task to build from the mirror failed to execute, will try" , map [ string ] string { "step" : "build-exector" , "status" : "failure" } )
} else {
2019-07-12 14:42:24 +08:00
MetricErrorTaskNum ++
2019-08-29 13:29:05 +08:00
i . Logger . Error ( util . Translation ( "Check for log location imgae source errors" ) , map [ string ] string { "step" : "callback" , "status" : "failure" } )
2019-06-29 15:20:11 +08:00
if err := i . UpdateVersionInfo ( "failure" ) ; err != nil {
logrus . Debugf ( "update version Info error: %s" , err . Error ( ) )
}
2018-05-23 16:43:16 +08:00
}
2018-12-04 15:09:00 +08:00
} else {
2019-03-08 19:22:22 +08:00
var configs = make ( map [ string ] string , len ( i . Configs ) )
for k , v := range i . Configs {
configs [ k ] = v . String ( )
}
err = e . sendAction ( i . TenantID , i . ServiceID , i . EventID , i . DeployVersion , i . Action , configs , i . Logger )
2018-12-04 15:09:00 +08:00
if err != nil {
i . Logger . Error ( "Send upgrade action failed" , map [ string ] string { "step" : "callback" , "status" : "failure" } )
}
2019-06-29 15:20:11 +08:00
break
}
}
}
//buildFromSourceCode build app from source code
//support git repository
func ( e * exectorManager ) buildFromSourceCode ( task * pb . TaskMessage ) {
i := NewSouceCodeBuildItem ( task . TaskBody )
i . DockerClient = e . DockerClient
i . Logger . Info ( "Build app version from source code start" , map [ string ] string { "step" : "builder-exector" , "status" : "starting" } )
start := time . Now ( )
defer event . GetManager ( ) . ReleaseLogger ( i . Logger )
defer func ( ) {
if r := recover ( ) ; r != nil {
fmt . Println ( r )
debug . PrintStack ( )
i . Logger . Error ( "Back end service drift. Please check the rbd-chaos log" , map [ string ] string { "step" : "callback" , "status" : "failure" } )
2018-03-15 18:30:02 +08:00
}
2018-01-30 15:10:12 +08:00
} ( )
2019-06-29 15:20:11 +08:00
defer func ( ) {
logrus . Debugf ( "Complete build from source code, consuming time %s" , time . Now ( ) . Sub ( start ) . String ( ) )
} ( )
err := i . Run ( time . Minute * 30 )
if err != nil {
logrus . Errorf ( "build from source code error: %s" , err . Error ( ) )
2019-08-29 13:29:05 +08:00
i . Logger . Error ( util . Translation ( "Check for log location code errors" ) , map [ string ] string { "step" : "callback" , "status" : "failure" } )
2019-06-29 15:20:11 +08:00
vi := & dbmodel . VersionInfo {
FinalStatus : "failure" ,
EventID : i . EventID ,
2019-08-27 11:29:23 +08:00
CodeBranch : i . CodeSouceInfo . Branch ,
2019-06-29 15:20:11 +08:00
CodeVersion : i . commit . Hash ,
CommitMsg : i . commit . Message ,
Author : i . commit . Author ,
2019-08-16 06:53:16 +08:00
FinishTime : time . Now ( ) ,
2019-06-29 15:20:11 +08:00
}
if err := i . UpdateVersionInfo ( vi ) ; err != nil {
2019-08-16 06:53:16 +08:00
logrus . Errorf ( "update version Info error: %s" , err . Error ( ) )
2019-08-27 11:29:23 +08:00
i . Logger . Error ( fmt . Sprintf ( "error updating version info: %v" , err ) , event . GetCallbackLoggerOption ( ) )
2019-06-29 15:20:11 +08:00
}
} else {
var configs = make ( map [ string ] string , len ( i . Configs ) )
for k , v := range i . Configs {
configs [ k ] = v . String ( )
}
err = e . sendAction ( i . TenantID , i . ServiceID , i . EventID , i . DeployVersion , i . Action , configs , i . Logger )
if err != nil {
i . Logger . Error ( "Send upgrade action failed" , map [ string ] string { "step" : "callback" , "status" : "failure" } )
}
}
2018-01-25 22:10:34 +08:00
}
2018-04-11 21:38:48 +08:00
//buildFromMarketSlug build app from market slug
2018-10-18 18:39:21 +08:00
func ( e * exectorManager ) buildFromMarketSlug ( task * pb . TaskMessage ) {
eventID := gjson . GetBytes ( task . TaskBody , "event_id" ) . String ( )
2018-03-05 17:40:56 +08:00
logger := event . GetManager ( ) . GetLogger ( eventID )
2018-12-04 15:09:00 +08:00
logger . Info ( "Build app version from market slug start" , map [ string ] string { "step" : "builder-exector" , "status" : "starting" } )
2018-10-18 18:39:21 +08:00
i , err := NewMarketSlugItem ( task . TaskBody )
2018-03-05 17:40:56 +08:00
if err != nil {
logrus . Error ( "create build from market slug task error." , err . Error ( ) )
return
}
go func ( ) {
2018-08-24 10:02:34 +08:00
start := time . Now ( )
2018-03-05 17:40:56 +08:00
defer event . GetManager ( ) . ReleaseLogger ( i . Logger )
2018-03-30 15:37:36 +08:00
defer func ( ) {
if r := recover ( ) ; r != nil {
2018-04-12 18:09:13 +08:00
fmt . Println ( r )
2018-03-30 15:37:36 +08:00
debug . PrintStack ( )
2018-08-27 17:32:56 +08:00
i . Logger . Error ( "Back end service drift. Please check the rbd-chaos log" , map [ string ] string { "step" : "callback" , "status" : "failure" } )
2018-03-30 15:37:36 +08:00
}
} ( )
2018-08-24 10:02:34 +08:00
defer func ( ) {
logrus . Debugf ( "complete build from market slug consuming time %s" , time . Now ( ) . Sub ( start ) . String ( ) )
} ( )
2018-03-05 17:40:56 +08:00
for n := 0 ; n < 2 ; n ++ {
err := i . Run ( )
if err != nil {
logrus . Errorf ( "image share error: %s" , err . Error ( ) )
2018-03-15 15:59:18 +08:00
if n < 1 {
2018-12-04 15:09:00 +08:00
i . Logger . Error ( "Build app version from market slug failure, will try" , map [ string ] string { "step" : "builder-exector" , "status" : "failure" } )
2018-03-05 17:40:56 +08:00
} else {
2019-07-12 14:42:24 +08:00
MetricErrorTaskNum ++
2018-12-04 15:09:00 +08:00
i . Logger . Error ( "Build app version from market slug failure" , map [ string ] string { "step" : "callback" , "status" : "failure" } )
2018-03-05 17:40:56 +08:00
}
} else {
2019-03-08 19:22:22 +08:00
err = e . sendAction ( i . TenantID , i . ServiceID , i . EventID , i . DeployVersion , i . Action , i . Configs , i . Logger )
2018-12-04 15:09:00 +08:00
if err != nil {
i . Logger . Error ( "Send upgrade action failed" , map [ string ] string { "step" : "callback" , "status" : "failure" } )
}
2018-03-05 17:40:56 +08:00
break
}
}
} ( )
2018-03-04 22:48:50 +08:00
}
2018-12-04 15:09:00 +08:00
//rollingUpgradeTaskBody upgrade message body type
type rollingUpgradeTaskBody struct {
TenantID string ` json:"tenant_id" `
ServiceID string ` json:"service_id" `
EventID string ` json:"event_id" `
Strategy [ ] string ` json:"strategy" `
}
2019-03-08 19:22:22 +08:00
func ( e * exectorManager ) sendAction ( tenantID , serviceID , eventID , newVersion , actionType string , configs map [ string ] string , logger event . Logger ) error {
2019-08-23 11:41:47 +08:00
// update build event complete status
2019-08-24 19:02:24 +08:00
logger . Info ( "Build success" , map [ string ] string { "step" : "last" , "status" : "success" } )
2018-12-04 15:09:00 +08:00
switch actionType {
case "upgrade" :
2019-08-23 11:41:47 +08:00
//add upgrade event
event := & dbmodel . ServiceEvent {
EventID : util . NewUUID ( ) ,
TenantID : tenantID ,
ServiceID : serviceID ,
StartTime : time . Now ( ) . Format ( time . RFC3339 ) ,
OptType : "upgrade" ,
2019-08-24 15:30:59 +08:00
Target : "service" ,
TargetID : serviceID ,
2019-08-29 19:45:56 +08:00
UserName : "" ,
2019-08-24 19:02:24 +08:00
SynType : dbmodel . ASYNEVENTTYPE ,
2019-08-23 11:41:47 +08:00
}
if err := db . GetManager ( ) . ServiceEventDao ( ) . AddModel ( event ) ; err != nil {
logrus . Errorf ( "create upgrade event failure %s, service %s do not auto upgrade" , err . Error ( ) , serviceID )
return nil
}
2018-12-04 15:09:00 +08:00
if err := db . GetManager ( ) . TenantServiceDao ( ) . UpdateDeployVersion ( serviceID , newVersion ) ; err != nil {
2019-08-23 11:41:47 +08:00
logrus . Errorf ( "Update app service deploy version failure %s, service %s do not auto upgrade" , err . Error ( ) , serviceID )
return nil
2018-12-04 15:09:00 +08:00
}
2019-03-08 19:22:22 +08:00
body := workermodel . RollingUpgradeTaskBody {
TenantID : tenantID ,
ServiceID : serviceID ,
NewDeployVersion : newVersion ,
2019-08-23 11:41:47 +08:00
EventID : event . EventID ,
2019-03-08 19:22:22 +08:00
Configs : configs ,
2018-12-04 15:09:00 +08:00
}
if err := e . mqClient . SendBuilderTopic ( mqclient . TaskStruct {
2018-12-04 18:08:51 +08:00
Topic : mqclient . WorkerTopic ,
2019-08-16 06:53:16 +08:00
TaskType : "rolling_upgrade" , // TODO(huangrh 20190816): Separate from build
2018-12-04 15:09:00 +08:00
TaskBody : body ,
} ) ; err != nil {
return err
}
2018-12-04 15:21:33 +08:00
return nil
2018-12-04 15:09:00 +08:00
default :
}
return nil
}
2018-04-11 21:38:48 +08:00
//slugShare share app of slug
2018-10-18 18:39:21 +08:00
func ( e * exectorManager ) slugShare ( task * pb . TaskMessage ) {
i , err := NewSlugShareItem ( task . TaskBody , e . EtcdCli )
2018-03-04 22:48:50 +08:00
if err != nil {
logrus . Error ( "create share image task error." , err . Error ( ) )
return
}
i . Logger . Info ( "开始分享应用" , map [ string ] string { "step" : "builder-exector" , "status" : "starting" } )
2018-02-07 16:10:26 +08:00
status := "success"
2018-03-01 11:30:54 +08:00
go func ( ) {
2018-02-06 17:41:47 +08:00
defer event . GetManager ( ) . ReleaseLogger ( i . Logger )
2018-04-12 18:09:13 +08:00
defer func ( ) {
if r := recover ( ) ; r != nil {
fmt . Println ( r )
debug . PrintStack ( )
i . Logger . Error ( "后端服务开小差,请重试或联系客服" , map [ string ] string { "step" : "callback" , "status" : "failure" } )
}
} ( )
2018-03-15 15:59:18 +08:00
for n := 0 ; n < 2 ; n ++ {
2018-03-04 22:48:50 +08:00
err := i . ShareService ( )
2018-02-06 17:41:47 +08:00
if err != nil {
2018-03-04 22:48:50 +08:00
logrus . Errorf ( "image share error: %s" , err . Error ( ) )
2018-03-15 15:59:18 +08:00
if n < 1 {
2018-03-04 22:48:50 +08:00
i . Logger . Error ( "应用分享失败,开始重试" , map [ string ] string { "step" : "builder-exector" , "status" : "failure" } )
2018-03-01 11:30:54 +08:00
} else {
2019-07-12 14:42:24 +08:00
MetricErrorTaskNum ++
2018-03-04 22:48:50 +08:00
i . Logger . Error ( "分享应用任务执行失败" , map [ string ] string { "step" : "builder-exector" , "status" : "failure" } )
2018-03-01 11:30:54 +08:00
status = "failure"
2018-02-06 17:41:47 +08:00
}
2018-03-01 11:30:54 +08:00
} else {
2018-03-04 22:48:50 +08:00
status = "success"
2018-02-06 17:41:47 +08:00
break
}
}
2018-03-04 22:48:50 +08:00
if err := i . UpdateShareStatus ( status ) ; err != nil {
logrus . Debugf ( "Add image share result error: %s" , err . Error ( ) )
}
2018-02-06 17:41:47 +08:00
} ( )
2018-02-05 19:23:59 +08:00
}
2018-04-11 21:38:48 +08:00
//imageShare share app of docker image
2018-10-18 18:39:21 +08:00
func ( e * exectorManager ) imageShare ( task * pb . TaskMessage ) {
i , err := NewImageShareItem ( task . TaskBody , e . DockerClient , e . EtcdCli )
2018-03-04 22:48:50 +08:00
if err != nil {
logrus . Error ( "create share image task error." , err . Error ( ) )
2019-10-13 15:33:31 +08:00
i . Logger . Error ( util . Translation ( "create share image task error" ) , map [ string ] string { "step" : "builder-exector" , "status" : "failure" } )
2018-03-04 22:48:50 +08:00
return
}
i . Logger . Info ( "开始分享应用" , map [ string ] string { "step" : "builder-exector" , "status" : "starting" } )
2018-02-07 16:10:26 +08:00
status := "success"
2019-06-29 15:20:11 +08:00
defer event . GetManager ( ) . ReleaseLogger ( i . Logger )
defer func ( ) {
if r := recover ( ) ; r != nil {
debug . PrintStack ( )
i . Logger . Error ( "后端服务开小差,请重试或联系客服" , map [ string ] string { "step" : "callback" , "status" : "failure" } )
}
} ( )
for n := 0 ; n < 2 ; n ++ {
err := i . ShareService ( )
if err != nil {
logrus . Errorf ( "image share error: %s" , err . Error ( ) )
if n < 1 {
i . Logger . Error ( "应用分享失败,开始重试" , map [ string ] string { "step" : "builder-exector" , "status" : "failure" } )
2018-03-01 11:30:54 +08:00
} else {
2019-07-12 14:42:24 +08:00
MetricErrorTaskNum ++
2019-06-29 15:20:11 +08:00
i . Logger . Error ( "分享应用任务执行失败" , map [ string ] string { "step" : "builder-exector" , "status" : "failure" } )
status = "failure"
2018-02-07 11:11:35 +08:00
}
2019-06-29 15:20:11 +08:00
} else {
status = "success"
break
2018-02-07 11:11:35 +08:00
}
2019-06-29 15:20:11 +08:00
}
if err := i . UpdateShareStatus ( status ) ; err != nil {
logrus . Debugf ( "Add image share result error: %s" , err . Error ( ) )
}
2018-02-05 19:23:59 +08:00
}
2017-11-07 11:40:44 +08:00
func ( e * exectorManager ) Start ( ) error {
return nil
}
func ( e * exectorManager ) Stop ( ) error {
2019-06-29 15:20:11 +08:00
e . cancel ( )
2018-07-06 16:55:10 +08:00
logrus . Info ( "Waiting for all threads to exit." )
2019-06-29 18:45:09 +08:00
//Recycle all ongoing tasks
e . runningTask . Range ( func ( k , v interface { } ) bool {
task := v . ( * pb . TaskMessage )
e . callback ( task )
return true
} )
2018-07-06 16:55:10 +08:00
logrus . Info ( "All threads is exited." )
2017-11-07 11:40:44 +08:00
return nil
}
2019-07-12 14:42:24 +08:00
func ( e * exectorManager ) GetMaxConcurrentTask ( ) float64 {
return float64 ( e . maxConcurrentTask )
}
func ( e * exectorManager ) GetCurrentConcurrentTask ( ) float64 {
return float64 ( len ( e . tasks ) )
}