Merge pull request #574 from GLYASAI/gc

manage node disk cleanup
This commit is contained in:
barnettZQG 2019-12-03 23:23:13 -06:00 committed by GitHub
commit 2cfea932b1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
38 changed files with 1219 additions and 83 deletions

View File

@ -973,8 +973,9 @@ func (t *TenantStruct) GetSingleServiceInfo(w http.ResponseWriter, r *http.Reque
// description: 统一返回格式 // description: 统一返回格式
func (t *TenantStruct) DeleteSingleServiceInfo(w http.ResponseWriter, r *http.Request) { func (t *TenantStruct) DeleteSingleServiceInfo(w http.ResponseWriter, r *http.Request) {
serviceID := r.Context().Value(middleware.ContextKey("service_id")).(string) serviceID := r.Context().Value(middleware.ContextKey("service_id")).(string)
if err := handler.GetServiceManager().TransServieToDelete(serviceID); err != nil { tenantID := r.Context().Value(middleware.ContextKey("tenant_id")).(string)
if err == fmt.Errorf("unclosed") { if err := handler.GetServiceManager().TransServieToDelete(tenantID, serviceID); err != nil {
if err == handler.ErrServiceNotClosed {
httputil.ReturnError(r, w, 400, fmt.Sprintf("Service must be closed")) httputil.ReturnError(r, w, 400, fmt.Sprintf("Service must be closed"))
return return
} }

View File

@ -20,6 +20,7 @@ package handler
import ( import (
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"net/http" "net/http"
"os" "os"
@ -35,10 +36,11 @@ import (
"github.com/goodrain/rainbond/builder/parser" "github.com/goodrain/rainbond/builder/parser"
"github.com/goodrain/rainbond/cmd/api/option" "github.com/goodrain/rainbond/cmd/api/option"
"github.com/goodrain/rainbond/db" "github.com/goodrain/rainbond/db"
"github.com/goodrain/rainbond/db/errors" dberrors "github.com/goodrain/rainbond/db/errors"
core_model "github.com/goodrain/rainbond/db/model" core_model "github.com/goodrain/rainbond/db/model"
dbmodel "github.com/goodrain/rainbond/db/model" dbmodel "github.com/goodrain/rainbond/db/model"
"github.com/goodrain/rainbond/event" "github.com/goodrain/rainbond/event"
eventutil "github.com/goodrain/rainbond/eventlog/util"
gclient "github.com/goodrain/rainbond/mq/client" gclient "github.com/goodrain/rainbond/mq/client"
core_util "github.com/goodrain/rainbond/util" core_util "github.com/goodrain/rainbond/util"
"github.com/goodrain/rainbond/worker/client" "github.com/goodrain/rainbond/worker/client"
@ -50,6 +52,9 @@ import (
"github.com/twinj/uuid" "github.com/twinj/uuid"
) )
// ErrServiceNotClosed -
var ErrServiceNotClosed = errors.New("Service has not been closed")
//ServiceAction service act //ServiceAction service act
type ServiceAction struct { type ServiceAction struct {
MQClient gclient.MQClient MQClient gclient.MQClient
@ -516,7 +521,7 @@ func (s *ServiceAction) ServiceCreate(sc *api_model.ServiceStruct) error {
env.TenantID = ts.TenantID env.TenantID = ts.TenantID
if err := db.GetManager().TenantServiceEnvVarDaoTransactions(tx).AddModel(&env); err != nil { if err := db.GetManager().TenantServiceEnvVarDaoTransactions(tx).AddModel(&env); err != nil {
logrus.Errorf("add env[name=%s] error, %v", env.AttrName, err) logrus.Errorf("add env[name=%s] error, %v", env.AttrName, err)
if err != errors.ErrRecordAlreadyExist { if err != dberrors.ErrRecordAlreadyExist {
tx.Rollback() tx.Rollback()
return err return err
} }
@ -1389,11 +1394,32 @@ func (s *ServiceAction) VolumnVar(tsv *dbmodel.TenantServiceVolume, tenantID, fi
} }
}() }()
if tsv.VolumeName != "" { if tsv.VolumeName != "" {
err := db.GetManager().TenantServiceVolumeDaoTransactions(tx).DeleteModel(tsv.ServiceID, tsv.VolumeName) volume, err := db.GetManager().TenantServiceVolumeDaoTransactions(tx).GetVolumeByServiceIDAndName(tsv.ServiceID, tsv.VolumeName)
if err != nil && err.Error() != gorm.ErrRecordNotFound.Error() { if err != nil {
tx.Rollback()
return util.CreateAPIHandleErrorFromDBError("find volume", err)
}
if err := db.GetManager().TenantServiceVolumeDaoTransactions(tx).DeleteModel(tsv.ServiceID, tsv.VolumeName); err != nil && err.Error() != gorm.ErrRecordNotFound.Error() {
tx.Rollback() tx.Rollback()
return util.CreateAPIHandleErrorFromDBError("delete volume", err) return util.CreateAPIHandleErrorFromDBError("delete volume", err)
} }
err = s.MQClient.SendBuilderTopic(gclient.TaskStruct{
Topic: gclient.WorkerTopic,
TaskType: "volume_gc",
TaskBody: map[string]interface{}{
"tenant_id": tenantID,
"service_id": volume.ServiceID,
"volume_id": volume.ID,
"volume_path": volume.VolumePath,
},
})
if err != nil {
logrus.Errorf("send 'volume_gc' task: %v", err)
tx.Rollback()
return util.CreateAPIHandleErrorFromDBError("send 'volume_gc' task", err)
}
} else { } else {
if err := db.GetManager().TenantServiceVolumeDaoTransactions(tx).DeleteByServiceIDAndVolumePath(tsv.ServiceID, tsv.VolumePath); err != nil && err.Error() != gorm.ErrRecordNotFound.Error() { if err := db.GetManager().TenantServiceVolumeDaoTransactions(tx).DeleteByServiceIDAndVolumePath(tsv.ServiceID, tsv.VolumePath); err != nil && err.Error() != gorm.ErrRecordNotFound.Error() {
tx.Rollback() tx.Rollback()
@ -1781,7 +1807,36 @@ func (s *ServiceAction) GetPodContainerMemory(podNames []string) (map[string]map
} }
//TransServieToDelete trans service info to delete table //TransServieToDelete trans service info to delete table
func (s *ServiceAction) TransServieToDelete(serviceID string) error { func (s *ServiceAction) TransServieToDelete(tenantID, serviceID string) error {
if err := s.isServiceClosed(serviceID); err != nil {
return err
}
body, err := s.gcTaskBody(tenantID, serviceID)
if err != nil {
return fmt.Errorf("GC task body: %v", err)
}
if err := s.delServiceMetadata(serviceID); err != nil {
return fmt.Errorf("delete service-related metadata: %v", err)
}
// let rbd-chaos remove related persistent data
logrus.Info("let rbd-chaos remove related persistent data")
topic := gclient.WorkerTopic
if err := s.MQClient.SendBuilderTopic(gclient.TaskStruct{
Topic: topic,
TaskType: "service_gc",
TaskBody: body,
}); err != nil {
logrus.Warningf("send gc task: %v", err)
}
return nil
}
// isServiceClosed checks if the service has been closed according to the serviceID.
func (s *ServiceAction) isServiceClosed(serviceID string) error {
service, err := db.GetManager().TenantServiceDao().GetServiceByID(serviceID) service, err := db.GetManager().TenantServiceDao().GetServiceByID(serviceID)
if err != nil { if err != nil {
return err return err
@ -1789,9 +1844,18 @@ func (s *ServiceAction) TransServieToDelete(serviceID string) error {
status := s.statusCli.GetStatus(serviceID) status := s.statusCli.GetStatus(serviceID)
if service.Kind != dbmodel.ServiceKindThirdParty.String() { if service.Kind != dbmodel.ServiceKindThirdParty.String() {
if !s.statusCli.IsClosedStatus(status) { if !s.statusCli.IsClosedStatus(status) {
return fmt.Errorf("unclosed") return ErrServiceNotClosed
} }
} }
return nil
}
// delServiceMetadata deletes service-related metadata in the database.
func (s *ServiceAction) delServiceMetadata(serviceID string) error {
service, err := db.GetManager().TenantServiceDao().GetServiceByID(serviceID)
if err != nil {
return err
}
tx := db.GetManager().Begin() tx := db.GetManager().Begin()
defer func() { defer func() {
if r := recover(); r != nil { if r := recover(); r != nil {
@ -1806,24 +1870,24 @@ func (s *ServiceAction) TransServieToDelete(serviceID string) error {
return err return err
} }
var deleteServicePropertyFunc = []func(serviceID string) error{ var deleteServicePropertyFunc = []func(serviceID string) error{
db.GetManager().TenantServiceDaoTransactions(tx).DeleteServiceByServiceID, db.GetManager().CodeCheckResultDaoTransactions(tx).DeleteByServiceID,
db.GetManager().TenantServiceMountRelationDaoTransactions(tx).DELTenantServiceMountRelationByServiceID,
db.GetManager().TenantServiceEnvVarDaoTransactions(tx).DELServiceEnvsByServiceID, db.GetManager().TenantServiceEnvVarDaoTransactions(tx).DELServiceEnvsByServiceID,
db.GetManager().TenantServicesPortDaoTransactions(tx).DELPortsByServiceID, db.GetManager().TenantPluginVersionConfigDaoTransactions(tx).DeletePluginConfigByServiceID,
db.GetManager().TenantServiceRelationDaoTransactions(tx).DELRelationsByServiceID,
db.GetManager().TenantServiceLBMappingPortDaoTransactions(tx).DELServiceLBMappingPortByServiceID,
db.GetManager().TenantServiceVolumeDaoTransactions(tx).DeleteTenantServiceVolumesByServiceID,
db.GetManager().TenantServiceConfigFileDaoTransactions(tx).DelByServiceID,
db.GetManager().ServiceProbeDaoTransactions(tx).DELServiceProbesByServiceID,
db.GetManager().TenantServicePluginRelationDaoTransactions(tx).DeleteALLRelationByServiceID, db.GetManager().TenantServicePluginRelationDaoTransactions(tx).DeleteALLRelationByServiceID,
db.GetManager().TenantServicesStreamPluginPortDaoTransactions(tx).DeleteAllPluginMappingPortByServiceID, db.GetManager().TenantServicesStreamPluginPortDaoTransactions(tx).DeleteAllPluginMappingPortByServiceID,
db.GetManager().TenantPluginVersionENVDaoTransactions(tx).DeleteEnvByServiceID, db.GetManager().TenantServiceDaoTransactions(tx).DeleteServiceByServiceID,
db.GetManager().TenantPluginVersionConfigDaoTransactions(tx).DeletePluginConfigByServiceID, db.GetManager().TenantServicesPortDaoTransactions(tx).DELPortsByServiceID,
db.GetManager().TenantServiceLabelDaoTransactions(tx).DeleteLabelByServiceID, db.GetManager().TenantServiceRelationDaoTransactions(tx).DELRelationsByServiceID,
db.GetManager().HTTPRuleDaoTransactions(tx).DeleteHTTPRuleByServiceID, db.GetManager().TenantServiceMountRelationDaoTransactions(tx).DELTenantServiceMountRelationByServiceID,
db.GetManager().TCPRuleDaoTransactions(tx).DeleteTCPRuleByServiceID, db.GetManager().TenantServiceVolumeDaoTransactions(tx).DeleteTenantServiceVolumesByServiceID,
db.GetManager().ThirdPartySvcDiscoveryCfgDaoTransactions(tx).DeleteByServiceID, db.GetManager().TenantServiceConfigFileDaoTransactions(tx).DelByServiceID,
db.GetManager().EndpointsDaoTransactions(tx).DeleteByServiceID, db.GetManager().EndpointsDaoTransactions(tx).DeleteByServiceID,
db.GetManager().ThirdPartySvcDiscoveryCfgDaoTransactions(tx).DeleteByServiceID,
db.GetManager().TenantServiceLabelDaoTransactions(tx).DeleteLabelByServiceID,
db.GetManager().VersionInfoDaoTransactions(tx).DeleteVersionByServiceID,
db.GetManager().TenantPluginVersionENVDaoTransactions(tx).DeleteEnvByServiceID,
db.GetManager().ServiceProbeDaoTransactions(tx).DELServiceProbesByServiceID,
db.GetManager().ServiceEventDaoTransactions(tx).DelEventByServiceID,
} }
if err := GetGatewayHandler().DeleteTCPRuleByServiceIDWithTransaction(serviceID, tx); err != nil { if err := GetGatewayHandler().DeleteTCPRuleByServiceIDWithTransaction(serviceID, tx); err != nil {
tx.Rollback() tx.Rollback()
@ -1835,7 +1899,7 @@ func (s *ServiceAction) TransServieToDelete(serviceID string) error {
} }
for _, del := range deleteServicePropertyFunc { for _, del := range deleteServicePropertyFunc {
if err := del(serviceID); err != nil { if err := del(serviceID); err != nil {
if err.Error() != gorm.ErrRecordNotFound.Error() { if err != gorm.ErrRecordNotFound {
tx.Rollback() tx.Rollback()
return err return err
} }
@ -1848,6 +1912,40 @@ func (s *ServiceAction) TransServieToDelete(serviceID string) error {
return nil return nil
} }
// delLogFile deletes persistent data related to the service based on serviceID.
func (s *ServiceAction) delLogFile(serviceID string, eventIDs []string) {
// log generated during service running
dockerLogPath := eventutil.DockerLogFilePath(s.conf.LogPath, serviceID)
if err := os.RemoveAll(dockerLogPath); err != nil {
logrus.Warningf("remove docker log files: %v", err)
}
// log generated by the service event
eventLogPath := eventutil.EventLogFilePath(s.conf.LogPath)
for _, eventID := range eventIDs {
eventLogFileName := eventutil.EventLogFileName(eventLogPath, eventID)
if err := os.RemoveAll(eventLogFileName); err != nil {
logrus.Warningf("file: %s; remove event log file: %v", eventLogFileName, err)
}
}
}
func (s *ServiceAction) gcTaskBody(tenantID, serviceID string) (map[string]interface{}, error) {
events, err := db.GetManager().ServiceEventDao().ListByTargetID(serviceID)
if err != nil {
logrus.Errorf("list events based on serviceID: %v", err)
}
var eventIDs []string
for _, event := range events {
eventIDs = append(eventIDs, event.EventID)
}
return map[string]interface{}{
"tenant_id": tenantID,
"service_id": serviceID,
"event_ids": eventIDs,
}, nil
}
//GetServiceDeployInfo get service deploy info //GetServiceDeployInfo get service deploy info
func (s *ServiceAction) GetServiceDeployInfo(tenantID, serviceID string) (*pb.DeployInfo, *util.APIHandleError) { func (s *ServiceAction) GetServiceDeployInfo(tenantID, serviceID string) (*pb.DeployInfo, *util.APIHandleError) {
info, err := s.statusCli.GetServiceDeployInfo(serviceID) info, err := s.statusCli.GetServiceDeployInfo(serviceID)

View File

@ -62,7 +62,7 @@ type ServiceHandler interface {
CreateTenant(*dbmodel.Tenants) error CreateTenant(*dbmodel.Tenants) error
CreateTenandIDAndName(eid string) (string, string, error) CreateTenandIDAndName(eid string) (string, string, error)
GetPods(serviceID string) (*K8sPodInfos, error) GetPods(serviceID string) (*K8sPodInfos, error)
TransServieToDelete(serviceID string) error TransServieToDelete(tenantID, serviceID string) error
TenantServiceDeletePluginRelation(tenantID, serviceID, pluginID string) *util.APIHandleError TenantServiceDeletePluginRelation(tenantID, serviceID, pluginID string) *util.APIHandleError
GetTenantServicePluginRelation(serviceID string) ([]*dbmodel.TenantServicePluginRelation, *util.APIHandleError) GetTenantServicePluginRelation(serviceID string) ([]*dbmodel.TenantServicePluginRelation, *util.APIHandleError)
SetTenantServicePluginRelation(tenantID, serviceID string, pss *api_model.PluginSetStruct) (*dbmodel.TenantServicePluginRelation, *util.APIHandleError) SetTenantServicePluginRelation(tenantID, serviceID string, pss *api_model.PluginSetStruct) (*dbmodel.TenantServicePluginRelation, *util.APIHandleError)

View File

@ -95,22 +95,6 @@ func TestABCService(t *testing.T) {
fmt.Printf("json is \n %v", s) fmt.Printf("json is \n %v", s)
} }
/*
func TestPortOuter(t *testing.T) {
if err := db.CreateManager(dbconfig.Config{
MysqlConnectionInfo: "root:admin@tcp(127.0.0.1:3306)/region",
DBType: "mysql",
}); err != nil {
t.Fatal(err)
}
sa := &ServiceAction{}
port, sche, err := sa.PortOuter("123", "257389e878258717abc6fa7c98660709", "close", 8080)
t.Log(port)
t.Log(sche)
t.Log(err)
}
*/
func TestUUID(t *testing.T) { func TestUUID(t *testing.T) {
id := fmt.Sprintf("%s", uuid.NewV4()) id := fmt.Sprintf("%s", uuid.NewV4())
uid := strings.Replace(id, "-", "", -1) uid := strings.Replace(id, "-", "", -1)

View File

@ -63,6 +63,12 @@ func (s *slugBuild) Build(re *Request) (*Response, error) {
logrus.Error("build slug in container error,", err.Error()) logrus.Error("build slug in container error,", err.Error())
return nil, err return nil, err
} }
defer func() {
if err := os.Remove(packageName); err != nil {
logrus.Warningf("pkg name: %s; remove slug pkg: %v", packageName, err)
}
}()
fileInfo, err := os.Stat(packageName) fileInfo, err := os.Stat(packageName)
if err != nil { if err != nil {
re.Logger.Error(util.Translation("Check that the build result failure"), map[string]string{"step": "build-code", "status": "failure"}) re.Logger.Error(util.Translation("Check that the build result failure"), map[string]string{"step": "build-code", "status": "failure"})
@ -106,6 +112,12 @@ func (s *slugBuild) buildRunnerImage(slugPackage string) (string, error) {
if err := util.CheckAndCreateDir(cacheDir); err != nil { if err := util.CheckAndCreateDir(cacheDir); err != nil {
return "", fmt.Errorf("create cache package dir failure %s", err.Error()) return "", fmt.Errorf("create cache package dir failure %s", err.Error())
} }
defer func() {
if err := os.RemoveAll(cacheDir); err != nil {
logrus.Errorf("remove cache dir %s failure %s", cacheDir, err.Error())
}
}()
packageName := path.Base(slugPackage) packageName := path.Base(slugPackage)
if err := util.Rename(slugPackage, path.Join(cacheDir, packageName)); err != nil { if err := util.Rename(slugPackage, path.Join(cacheDir, packageName)); err != nil {
return "", fmt.Errorf("move code package failure %s", err.Error()) return "", fmt.Errorf("move code package failure %s", err.Error())
@ -148,9 +160,6 @@ func (s *slugBuild) buildRunnerImage(slugPackage string) (string, error) {
if err := sources.ImageRemove(s.re.DockerClient, imageName); err != nil { if err := sources.ImageRemove(s.re.DockerClient, imageName); err != nil {
logrus.Errorf("remove image %s failure %s", imageName, err.Error()) logrus.Errorf("remove image %s failure %s", imageName, err.Error())
} }
if err := os.RemoveAll(cacheDir); err != nil {
logrus.Errorf("remove cache dir %s failure %s", cacheDir, err.Error())
}
return imageName, nil return imageName, nil
} }

View File

@ -173,6 +173,12 @@ func (i *SourceCodeBuildItem) Run(timeout time.Duration) error {
Message: commit.Message, Message: commit.Message,
} }
} }
defer func() {
if err := os.RemoveAll(rbi.GetCodeHome()); err != nil {
logrus.Warningf("remove source code: %v", err)
}
}()
hash := i.commit.Hash hash := i.commit.Hash
if len(hash) >= 8 { if len(hash) >= 8 {
hash = i.commit.Hash[0:7] hash = i.commit.Hash[0:7]

View File

@ -88,6 +88,7 @@ func NewManager(conf option.Config, mqc mqclient.MQClient) (Manager, error) {
maxConcurrentTask: maxConcurrentTask, maxConcurrentTask: maxConcurrentTask,
ctx: ctx, ctx: ctx,
cancel: cancel, cancel: cancel,
cfg: conf,
}, nil }, nil
} }
@ -101,6 +102,7 @@ type exectorManager struct {
ctx context.Context ctx context.Context
cancel context.CancelFunc cancel context.CancelFunc
runningTask sync.Map runningTask sync.Map
cfg option.Config
} }
//TaskWorker worker interface //TaskWorker worker interface
@ -204,6 +206,8 @@ func (e *exectorManager) RunTask(task *pb.TaskMessage) {
go e.runTask(e.slugShare, task, false) go e.runTask(e.slugShare, task, false)
case "share-image": case "share-image":
go e.runTask(e.imageShare, task, false) go e.runTask(e.imageShare, task, false)
case "garbage-collection":
go e.runTask(e.garbageCollection, task, false)
default: default:
go e.runTaskWithErr(e.exec, task, false) go e.runTaskWithErr(e.exec, task, false)
} }
@ -502,6 +506,20 @@ func (e *exectorManager) imageShare(task *pb.TaskMessage) {
} }
} }
func (e *exectorManager) garbageCollection(task *pb.TaskMessage) {
gci, err := NewGarbageCollectionItem(e.cfg, task.TaskBody)
if err != nil {
logrus.Warningf("create a new GarbageCollectionItem: %v", err)
}
go func() {
// delete docker log file and event log file
gci.delLogFile()
// volume data
gci.delVolumeData()
}()
}
func (e *exectorManager) Start() error { func (e *exectorManager) Start() error {
return nil return nil
} }

View File

@ -0,0 +1,77 @@
// Copyright (C) 2014-2018 Goodrain Co., Ltd.
// RAINBOND, Application Management Platform
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version. For any non-GPL usage of Rainbond,
// one or multiple Commercial Licenses authorized by Goodrain Co., Ltd.
// must be obtained first.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package exector
import (
"fmt"
"os"
"github.com/Sirupsen/logrus"
"github.com/goodrain/rainbond/cmd/builder/option"
eventutil "github.com/goodrain/rainbond/eventlog/util"
"github.com/pquerna/ffjson/ffjson"
)
// GarbageCollectionItem -
type GarbageCollectionItem struct {
TenantID string `json:"tenant_id"`
ServiceID string `json:"service_id"`
EventIDs []string `json:"event_ids"`
Cfg option.Config `json:"-"`
}
// NewGarbageCollectionItem creates a new GarbageCollectionItem
func NewGarbageCollectionItem(cfg option.Config, in []byte) (*GarbageCollectionItem, error) {
logrus.Debugf("garbage collection; request body: %v", string(in))
var gci GarbageCollectionItem
if err := ffjson.Unmarshal(in, &gci); err != nil {
return nil, err
}
gci.Cfg = cfg
// validate
return &gci, nil
}
// delLogFile deletes persistent data related to the service based on serviceID.
func (g *GarbageCollectionItem) delLogFile() {
logrus.Infof("service id: %s;delete log file.", g.ServiceID)
// log generated during service running
dockerLogPath := eventutil.DockerLogFilePath(g.Cfg.LogPath, g.ServiceID)
if err := os.RemoveAll(dockerLogPath); err != nil {
logrus.Warningf("remove docker log files: %v", err)
}
// log generated by the service event
eventLogPath := eventutil.EventLogFilePath(g.Cfg.LogPath)
for _, eventID := range g.EventIDs {
eventLogFileName := eventutil.EventLogFileName(eventLogPath, eventID)
logrus.Debugf("remove event log file: %s", eventLogFileName)
if err := os.RemoveAll(eventLogFileName); err != nil {
logrus.Warningf("file: %s; remove event log file: %v", eventLogFileName, err)
}
}
}
func (g *GarbageCollectionItem) delVolumeData() {
logrus.Infof("service id: %s; delete volume data.", g.ServiceID)
dir := fmt.Sprintf("/gradata/tenant/%s/service/%s", g.TenantID, g.ServiceID)
if err := os.RemoveAll(dir); err != nil {
logrus.Warningf("dir: %s; remove volume data: %v", dir, err)
}
}

View File

@ -210,6 +210,14 @@ func (d *SourceCodeParse) Parse() ParseErrorList {
return err return err
} }
} }
// The source code is useless after the test is completed, and needs to be deleted.
defer func() {
if sources.CheckFileExist(buildInfo.GetCodeHome()) {
if err := sources.RemoveDir(buildInfo.GetCodeHome()); err != nil {
logrus.Warningf("remove source code: %v", err)
}
}
}()
//read rainbondfile //read rainbondfile
rbdfileConfig, err := code.ReadRainbondFile(buildInfo.GetCodeBuildAbsPath()) rbdfileConfig, err := code.ReadRainbondFile(buildInfo.GetCodeBuildAbsPath())

View File

@ -53,6 +53,7 @@ type Config struct {
MinExtPort int // minimum external port MinExtPort int // minimum external port
LicensePath string LicensePath string
LicSoPath string LicSoPath string
LogPath string
} }
//APIServer apiserver server //APIServer apiserver server
@ -97,6 +98,7 @@ func (a *APIServer) AddFlags(fs *pflag.FlagSet) {
fs.StringArrayVar(&a.EnableFeature, "enable-feature", []string{}, "List of special features supported, such as `windows`") fs.StringArrayVar(&a.EnableFeature, "enable-feature", []string{}, "List of special features supported, such as `windows`")
fs.StringVar(&a.LicensePath, "license-path", "/opt/rainbond/etc/license/license.yb", "the license path of the enterprise version.") fs.StringVar(&a.LicensePath, "license-path", "/opt/rainbond/etc/license/license.yb", "the license path of the enterprise version.")
fs.StringVar(&a.LicSoPath, "license-so-path", "/opt/rainbond/etc/license/license.so", "Dynamic library file path for parsing the license.") fs.StringVar(&a.LicSoPath, "license-so-path", "/opt/rainbond/etc/license/license.so", "Dynamic library file path for parsing the license.")
fs.StringVar(&a.LogPath, "log-path", "/grdata/logs", "Where Docker log files and event log files are stored.")
} }
//SetLog 设置log //SetLog 设置log

View File

@ -44,6 +44,7 @@ type Config struct {
HostIP string HostIP string
CleanUp bool CleanUp bool
Topic string Topic string
LogPath string
} }
//Builder builder server //Builder builder server
@ -58,9 +59,6 @@ func NewBuilder() *Builder {
return &Builder{} return &Builder{}
} }
//
type NodeOSType string
//AddFlags config //AddFlags config
func (a *Builder) AddFlags(fs *pflag.FlagSet) { func (a *Builder) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&a.LogLevel, "log-level", "info", "the builder log level") fs.StringVar(&a.LogLevel, "log-level", "info", "the builder log level")
@ -80,6 +78,7 @@ func (a *Builder) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&a.HostIP, "hostIP", "", "Current node Intranet IP") fs.StringVar(&a.HostIP, "hostIP", "", "Current node Intranet IP")
fs.BoolVar(&a.CleanUp, "clean-up", true, "Turn on build version cleanup") fs.BoolVar(&a.CleanUp, "clean-up", true, "Turn on build version cleanup")
fs.StringVar(&a.Topic, "topic", "builder", "Topic in mq,you coule choose `builder` or `windows_builder`") fs.StringVar(&a.Topic, "topic", "builder", "Topic in mq,you coule choose `builder` or `windows_builder`")
fs.StringVar(&a.LogPath, "log-path", "/grdata/logs", "Where Docker log files and event log files are stored.")
} }
//SetLog 设置log //SetLog 设置log

View File

@ -99,6 +99,22 @@ type Conf struct {
LicPath string LicPath string
LicSoPath string LicSoPath string
// EnableImageGC is the trigger of image garbage collection.
EnableImageGC bool
// imageMinimumGCAge is the minimum age for an unused image before it is
// garbage collected.
ImageMinimumGCAge time.Duration
// imageGCHighThresholdPercent is the percent of disk usage after which
// image garbage collection is always run. The percent is calculated as
// this field value out of 100.
ImageGCHighThresholdPercent int32
// imageGCLowThresholdPercent is the percent of disk usage before which
// image garbage collection is never run. Lowest disk usage to garbage
// collect to. The percent is calculated as this field value out of 100.
ImageGCLowThresholdPercent int32
// ImageGCPeriod is the period for performing image garbage collection.
ImageGCPeriod time.Duration
} }
//StatsdConfig StatsdConfig //StatsdConfig StatsdConfig
@ -150,6 +166,11 @@ func (a *Conf) AddFlags(fs *pflag.FlagSet) {
fs.DurationVar(&a.AutoUnschedulerUnHealthDuration, "autounscheduler-unhealthy-dura", 5*time.Minute, "Node unhealthy duration, after the automatic offline,if set 0,disable auto handle unscheduler.default is 5 Minute") fs.DurationVar(&a.AutoUnschedulerUnHealthDuration, "autounscheduler-unhealthy-dura", 5*time.Minute, "Node unhealthy duration, after the automatic offline,if set 0,disable auto handle unscheduler.default is 5 Minute")
fs.StringVar(&a.LicPath, "lic-path", "/opt/rainbond/etc/license/license.yb", "the license path of the enterprise version.") fs.StringVar(&a.LicPath, "lic-path", "/opt/rainbond/etc/license/license.yb", "the license path of the enterprise version.")
fs.StringVar(&a.LicSoPath, "lic-so-path", "/opt/rainbond/etc/license/license.so", "Dynamic library file path for parsing the license.") fs.StringVar(&a.LicSoPath, "lic-so-path", "/opt/rainbond/etc/license/license.so", "Dynamic library file path for parsing the license.")
fs.BoolVar(&a.EnableImageGC, "enable-image-gc", true, "The trigger of image garbage collection.")
fs.DurationVar(&a.ImageMinimumGCAge, "minimum-image-ttl-duration", 2*time.Hour, "Minimum age for an unused image before it is garbage collected. Examples: '300ms', '10s' or '2h45m'.")
fs.DurationVar(&a.ImageGCPeriod, "image-gc-period", 5*time.Minute, "ImageGCPeriod is the period for performing image garbage collection. Examples: '10s', '5m' or '2h45m'.")
fs.Int32Var(&a.ImageGCHighThresholdPercent, "image-gc-high-threshold", 90, "The percent of disk usage after which image garbage collection is always run. Values must be within the range [0, 100], To disable image garbage collection, set to 100. ")
fs.Int32Var(&a.ImageGCLowThresholdPercent, "image-gc-low-threshold", 75, "The percent of disk usage before which image garbage collection is never run. Lowest disk usage to garbage collect to. Values must be within the range [0, 100] and should not be larger than that of --image-gc-high-threshold.")
} }
//SetLog 设置log //SetLog 设置log

View File

@ -48,6 +48,10 @@ func Run(c *option.Conf) error {
return nil return nil
} }
startfunc := func() error { startfunc := func() error {
if err := c.ParseClient(); err != nil {
return fmt.Errorf("config parse error:%s", err.Error())
}
nodemanager, err := nodem.NewNodeManager(c) nodemanager, err := nodem.NewNodeManager(c)
if err != nil { if err != nil {
return fmt.Errorf("create node manager failed: %s", err) return fmt.Errorf("create node manager failed: %s", err)
@ -55,9 +59,6 @@ func Run(c *option.Conf) error {
if err := nodemanager.InitStart(); err != nil { if err := nodemanager.InitStart(); err != nil {
return err return err
} }
if err := c.ParseClient(); err != nil {
return fmt.Errorf("config parse error:%s", err.Error())
}
errChan := make(chan error, 3) errChan := make(chan error, 3)
err = eventLog.NewManager(eventLog.EventConfig{ err = eventLog.NewManager(eventLog.EventConfig{
EventLogServers: c.EventLogServer, EventLogServers: c.EventLogServer,

View File

@ -37,6 +37,7 @@ import (
"github.com/goodrain/rainbond/worker/appm/controller" "github.com/goodrain/rainbond/worker/appm/controller"
"github.com/goodrain/rainbond/worker/appm/store" "github.com/goodrain/rainbond/worker/appm/store"
"github.com/goodrain/rainbond/worker/discover" "github.com/goodrain/rainbond/worker/discover"
"github.com/goodrain/rainbond/worker/gc"
"github.com/goodrain/rainbond/worker/master" "github.com/goodrain/rainbond/worker/master"
"github.com/goodrain/rainbond/worker/monitor" "github.com/goodrain/rainbond/worker/monitor"
"github.com/goodrain/rainbond/worker/server" "github.com/goodrain/rainbond/worker/server"
@ -112,8 +113,10 @@ func Run(s *option.Worker) error {
return err return err
} }
defer masterCon.Stop() defer masterCon.Stop()
//step 6 : create discover module //step 6 : create discover module
taskManager := discover.NewTaskManager(s.Config, cachestore, controllerManager, startCh) garbageCollector := gc.NewGarbageCollector(clientset)
taskManager := discover.NewTaskManager(s.Config, cachestore, controllerManager, garbageCollector, startCh)
if err := taskManager.Start(); err != nil { if err := taskManager.Start(); err != nil {
return err return err
} }

View File

@ -308,6 +308,7 @@ type ServiceProbeDao interface {
type CodeCheckResultDao interface { type CodeCheckResultDao interface {
Dao Dao
GetCodeCheckResult(serviceID string) (*model.CodeCheckResult, error) GetCodeCheckResult(serviceID string) (*model.CodeCheckResult, error)
DeleteByServiceID(serviceID string) error
} }
//EventDao EventDao //EventDao EventDao
@ -317,6 +318,7 @@ type EventDao interface {
GetEventByEventIDs(eventIDs []string) ([]*model.ServiceEvent, error) GetEventByEventIDs(eventIDs []string) ([]*model.ServiceEvent, error)
GetEventByServiceID(serviceID string) ([]*model.ServiceEvent, error) GetEventByServiceID(serviceID string) ([]*model.ServiceEvent, error)
DelEventByServiceID(serviceID string) error DelEventByServiceID(serviceID string) error
ListByTargetID(targetID string) ([]*model.ServiceEvent, error)
GetEventsByTarget(target, targetID string, offset, liimt int) ([]*model.ServiceEvent, int, error) GetEventsByTarget(target, targetID string, offset, liimt int) ([]*model.ServiceEvent, int, error)
GetEventsByTenantID(tenantID string, offset, limit int) ([]*model.ServiceEvent, int, error) GetEventsByTenantID(tenantID string, offset, limit int) ([]*model.ServiceEvent, int, error)
GetLastASyncEvent(target, targetID string) (*model.ServiceEvent, error) GetLastASyncEvent(target, targetID string) (*model.ServiceEvent, error)
@ -327,6 +329,7 @@ type EventDao interface {
//VersionInfoDao VersionInfoDao //VersionInfoDao VersionInfoDao
type VersionInfoDao interface { type VersionInfoDao interface {
Dao Dao
ListSuccessfulOnes() ([]*model.VersionInfo, error)
GetVersionByEventID(eventID string) (*model.VersionInfo, error) GetVersionByEventID(eventID string) (*model.VersionInfo, error)
GetVersionByDeployVersion(version, serviceID string) (*model.VersionInfo, error) GetVersionByDeployVersion(version, serviceID string) (*model.VersionInfo, error)
GetVersionByServiceID(serviceID string) ([]*model.VersionInfo, error) GetVersionByServiceID(serviceID string) ([]*model.VersionInfo, error)

View File

@ -3309,6 +3309,20 @@ func (mr *MockCodeCheckResultDaoMockRecorder) GetCodeCheckResult(serviceID inter
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetCodeCheckResult", reflect.TypeOf((*MockCodeCheckResultDao)(nil).GetCodeCheckResult), serviceID) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetCodeCheckResult", reflect.TypeOf((*MockCodeCheckResultDao)(nil).GetCodeCheckResult), serviceID)
} }
// DeleteByServiceID mocks base method
func (m *MockCodeCheckResultDao) DeleteByServiceID(serviceID string) error {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "DeleteByServiceID", serviceID)
ret0, _ := ret[0].(error)
return ret0
}
// DeleteByServiceID indicates an expected call of DeleteByServiceID
func (mr *MockCodeCheckResultDaoMockRecorder) DeleteByServiceID(serviceID interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteByServiceID", reflect.TypeOf((*MockCodeCheckResultDao)(nil).DeleteByServiceID), serviceID)
}
// MockEventDao is a mock of EventDao interface // MockEventDao is a mock of EventDao interface
type MockEventDao struct { type MockEventDao struct {
ctrl *gomock.Controller ctrl *gomock.Controller
@ -3419,6 +3433,21 @@ func (mr *MockEventDaoMockRecorder) DelEventByServiceID(serviceID interface{}) *
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DelEventByServiceID", reflect.TypeOf((*MockEventDao)(nil).DelEventByServiceID), serviceID) return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DelEventByServiceID", reflect.TypeOf((*MockEventDao)(nil).DelEventByServiceID), serviceID)
} }
// ListByTargetID mocks base method
func (m *MockEventDao) ListByTargetID(targetID string) ([]*model.ServiceEvent, error) {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "ListByTargetID", targetID)
ret0, _ := ret[0].([]*model.ServiceEvent)
ret1, _ := ret[1].(error)
return ret0, ret1
}
// ListByTargetID indicates an expected call of ListByTargetID
func (mr *MockEventDaoMockRecorder) ListByTargetID(targetID interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListByTargetID", reflect.TypeOf((*MockEventDao)(nil).ListByTargetID), targetID)
}
// GetEventsByTarget mocks base method // GetEventsByTarget mocks base method
func (m *MockEventDao) GetEventsByTarget(target, targetID string, offset, liimt int) ([]*model.ServiceEvent, int, error) { func (m *MockEventDao) GetEventsByTarget(target, targetID string, offset, liimt int) ([]*model.ServiceEvent, int, error) {
m.ctrl.T.Helper() m.ctrl.T.Helper()

View File

@ -1 +0,0 @@
mockgen -source=dao.go -destination dao_mock.go -package dao

View File

@ -99,3 +99,8 @@ func (c *CodeCheckResultDaoImpl) GetCodeCheckResult(serviceID string) (*model.Co
} }
return &result, nil return &result, nil
} }
// DeleteByServiceID deletes a CodeCheckResult base on serviceID.
func (c *CodeCheckResultDaoImpl) DeleteByServiceID(serviceID string) error {
return c.DB.Where("service_id=?", serviceID).Delete(&model.CodeCheckResult{}).Error
}

View File

@ -106,6 +106,15 @@ func (c *EventDaoImpl) DelEventByServiceID(serviceID string) error {
return nil return nil
} }
// ListByTargetID -
func (c *EventDaoImpl) ListByTargetID(targetID string) ([]*model.ServiceEvent, error) {
var events []*model.ServiceEvent
if err := c.DB.Where("target_id=?", targetID).Find(&events).Error; err != nil {
return nil, err
}
return events, nil
}
// GetEventsByTarget get event by target with page // GetEventsByTarget get event by target with page
func (c *EventDaoImpl) GetEventsByTarget(target, targetID string, offset, limit int) ([]*model.ServiceEvent, int, error) { func (c *EventDaoImpl) GetEventsByTarget(target, targetID string, offset, limit int) ([]*model.ServiceEvent, int, error) {
var result []*model.ServiceEvent var result []*model.ServiceEvent

View File

@ -74,6 +74,16 @@ type VersionInfoDaoImpl struct {
DB *gorm.DB DB *gorm.DB
} }
// ListSuccessfulOnes r-
func (c *VersionInfoDaoImpl) ListSuccessfulOnes() ([]*model.VersionInfo, error) {
// TODO: group by service id and limit each group
var versoins []*model.VersionInfo
if err := c.DB.Where("final_status=?", "success").Find(&versoins).Error; err != nil {
return nil, err
}
return versoins, nil
}
//GetVersionByEventID get version by event id //GetVersionByEventID get version by event id
func (c *VersionInfoDaoImpl) GetVersionByEventID(eventID string) (*model.VersionInfo, error) { func (c *VersionInfoDaoImpl) GetVersionByEventID(eventID string) (*model.VersionInfo, error) {
var result model.VersionInfo var result model.VersionInfo

View File

@ -36,11 +36,17 @@ func TestGetSuitableInstance(t *testing.T) {
cancel: cancel, cancel: cancel,
context: ctx, context: ctx,
discover: dis, discover: dis,
monitorDatas: make(map[string]db.MonitorData),
updateTime: make(map[string]time.Time), updateTime: make(map[string]time.Time),
abnormalNode: make(map[string]int), abnormalNode: make(map[string]int),
log: logrus.WithField("Module", "Test"), log: logrus.WithField("Module", "Test"),
} }
d.monitorDatas = map[string]db.MonitorData{"a": db.MonitorData{InstanceID: "a", LogSizePeerM: 200}, "b": db.MonitorData{InstanceID: "b", LogSizePeerM: 150}} d.monitorDatas = map[string]*db.MonitorData{
d.GetSuitableInstance() "a": &db.MonitorData{
InstanceID: "a", LogSizePeerM: 200,
},
"b": &db.MonitorData{
InstanceID: "b", LogSizePeerM: 150,
},
}
d.GetSuitableInstance("todo service id")
} }

View File

@ -30,6 +30,7 @@ import (
"time" "time"
"github.com/Sirupsen/logrus" "github.com/Sirupsen/logrus"
eventutil "github.com/goodrain/rainbond/eventlog/util"
"github.com/goodrain/rainbond/util" "github.com/goodrain/rainbond/util"
) )
@ -43,13 +44,12 @@ func (m *EventFilePlugin) SaveMessage(events []*EventLogMessage) error {
if len(events) == 0 { if len(events) == 0 {
return nil return nil
} }
key := events[0].EventID filePath := eventutil.EventLogFilePath(m.HomePath)
dirpath := path.Join(m.HomePath, "eventlog") if err := util.CheckAndCreateDir(filePath); err != nil {
if err := util.CheckAndCreateDir(dirpath); err != nil {
return err return err
} }
apath := path.Join(m.HomePath, "eventlog", key+".log") filename := eventutil.EventLogFileName(filePath, events[0].EventID)
writeFile, err := os.OpenFile(apath, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0755) writeFile, err := os.OpenFile(filename, os.O_CREATE|os.O_APPEND|os.O_WRONLY, 0755)
if err != nil { if err != nil {
return err return err
} }

View File

@ -191,4 +191,4 @@ func MvLogFile(newName string, filePath string) error {
} }
defer new.Close() defer new.Close()
return nil return nil
} }

View File

@ -20,10 +20,6 @@ package db
import "testing" import "testing"
func TestGetServiceAliasID(t *testing.T) {
t.Log(GetServiceAliasID(""))
}
func TestFileSaveMessage(t *testing.T) { func TestFileSaveMessage(t *testing.T) {
f := filePlugin{ f := filePlugin{

42
eventlog/util/filepath.go Normal file
View File

@ -0,0 +1,42 @@
package util
import (
"crypto/sha256"
"fmt"
"path"
"strconv"
)
// DockerLogFilePath returns the directory to save Docker log files
func DockerLogFilePath(homepath, key string) string {
return path.Join(homepath, getServiceAliasID(key))
}
// DockerLogFileName returns the file name of Docker log file.
func DockerLogFileName(filePath string) string {
return path.Join(filePath, "stdout.log")
}
//python:
//new_word = str(ord(string[10])) + string + str(ord(string[3])) + 'log' + str(ord(string[2]) / 7)
//new_id = hashlib.sha224(new_word).hexdigest()[0:16]
//
func getServiceAliasID(ServiceID string) string {
if len(ServiceID) > 11 {
newWord := strconv.Itoa(int(ServiceID[10])) + ServiceID + strconv.Itoa(int(ServiceID[3])) + "log" + strconv.Itoa(int(ServiceID[2])/7)
ha := sha256.New224()
ha.Write([]byte(newWord))
return fmt.Sprintf("%x", ha.Sum(nil))[0:16]
}
return ServiceID
}
// EventLogFilePath returns the directory to save event log files
func EventLogFilePath(homePath string) string {
return path.Join(homePath, "eventlog")
}
// EventLogFileName returns the file name of event log file.
func EventLogFileName(filePath, key string) string {
return path.Join(filePath, key+".log")
}

View File

@ -65,3 +65,20 @@ func CreateLocalVolume(w http.ResponseWriter, r *http.Request) {
} }
httputil.ReturnSuccess(r, w, map[string]string{"path": volumeHostPath}) httputil.ReturnSuccess(r, w, map[string]string{"path": volumeHostPath})
} }
// DeleteLocalVolume delete local volume dir
func DeleteLocalVolume(w http.ResponseWriter, r *http.Request) {
var requestopt = make(map[string]string)
if err := json.NewDecoder(r.Body).Decode(&requestopt); err != nil {
w.WriteHeader(400)
return
}
path := requestopt["path"]
if err := os.RemoveAll(path); err != nil {
logrus.Errorf("path: %s; remove pv path: %v", path, err)
w.WriteHeader(500)
}
httputil.ReturnSuccess(r, w, nil)
}

View File

@ -54,6 +54,7 @@ func Routers(mode string) *chi.Mux {
}) })
r.Route("/localvolumes", func(r chi.Router) { r.Route("/localvolumes", func(r chi.Router) {
r.Post("/create", controller.CreateLocalVolume) r.Post("/create", controller.CreateLocalVolume)
r.Delete("/", controller.DeleteLocalVolume)
}) })
//以下只有管理节点具有的API //以下只有管理节点具有的API
if mode == "master" { if mode == "master" {

View File

@ -36,4 +36,5 @@ type Manager interface {
StopService(serviceName string) error StopService(serviceName string) error
SetAPIRoute(apim *api.Manager) error SetAPIRoute(apim *api.Manager) error
GetService(serviceName string) *service.Service GetService(serviceName string) *service.Service
ListServiceImages() []string
} }

View File

@ -28,7 +28,9 @@ import (
"github.com/Sirupsen/logrus" "github.com/Sirupsen/logrus"
"github.com/coreos/etcd/clientv3" "github.com/coreos/etcd/clientv3"
"github.com/goodrain/rainbond/builder/parser"
"github.com/goodrain/rainbond/cmd/node/option" "github.com/goodrain/rainbond/cmd/node/option"
"github.com/goodrain/rainbond/event"
"github.com/goodrain/rainbond/node/nodem/client" "github.com/goodrain/rainbond/node/nodem/client"
"github.com/goodrain/rainbond/node/nodem/healthy" "github.com/goodrain/rainbond/node/nodem/healthy"
"github.com/goodrain/rainbond/node/nodem/service" "github.com/goodrain/rainbond/node/nodem/service"
@ -499,6 +501,26 @@ func (m *ManagerService) InjectConfig(content string) string {
return content return content
} }
// ListServiceImages -
func (m *ManagerService) ListServiceImages() []string {
var images []string
for _, svc := range m.services {
if svc.Start == "" || svc.OnlyHealthCheck {
continue
}
par := parser.CreateDockerRunOrImageParse("", "", svc.Start, nil, event.GetTestLogger())
par.ParseDockerun(strings.Split(svc.Start, " "))
logrus.Debugf("detect image: %s", par.GetImage().String())
if par.GetImage().String() == "" {
continue
}
images = append(images, par.GetImage().String())
}
return images
}
//NewManagerService new controller manager //NewManagerService new controller manager
func NewManagerService(conf *option.Conf, healthyManager healthy.Manager, cluster client.ClusterClient) *ManagerService { func NewManagerService(conf *option.Conf, healthyManager healthy.Manager, cluster client.ClusterClient) *ManagerService {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())

View File

@ -0,0 +1,435 @@
/*
Copyright 2015 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package gc
import (
"context"
goerrors "errors"
"fmt"
"sort"
"strings"
"sync"
"syscall"
"time"
"github.com/Sirupsen/logrus"
"github.com/docker/docker/api/types"
"github.com/docker/docker/client"
"k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
)
var (
// ErrImageNotFound -
ErrImageNotFound = goerrors.New("image not found")
)
// FsStats -
type FsStats struct {
CapacityBytes uint64 `json:"capacityBytes,omitempty"`
AvailableBytes uint64 `json:"availableBytes,omitempty"`
}
// GetFsStats -
func GetFsStats(path string) (*FsStats, error) {
var fs syscall.Statfs_t
err := syscall.Statfs(path, &fs)
if err != nil {
return nil, err
}
fsStats := &FsStats{
CapacityBytes: fs.Blocks * uint64(fs.Bsize),
AvailableBytes: fs.Bfree * uint64(fs.Bsize),
}
return fsStats, nil
}
// ImageGCManager is an interface for managing lifecycle of all images.
// Implementation is thread-safe.
type ImageGCManager interface {
// Start async garbage collection of images.
Start()
SetServiceImages(seviceImages []string)
}
// ImageGCPolicy is a policy for garbage collecting images. Policy defines an allowed band in
// which garbage collection will be run.
type ImageGCPolicy struct {
// Any usage above this threshold will always trigger garbage collection.
// This is the highest usage we will allow.
HighThresholdPercent int
// Any usage below this threshold will never trigger garbage collection.
// This is the lowest threshold we will try to garbage collect to.
LowThresholdPercent int
// Minimum age at which an image can be garbage collected.
MinAge time.Duration
// ImageGCPeriod is the period for performing image garbage collection.
ImageGCPeriod time.Duration
}
type realImageGCManager struct {
dockerClient *client.Client
// Records of images and their use.
imageRecords map[string]*imageRecord
imageRecordsLock sync.Mutex
// The image garbage collection policy in use.
policy ImageGCPolicy
// Track initialization
initialized bool
// sandbox image exempted from GC
sandboxImage string
serviceImages []string
}
// Information about the images we track.
type imageRecord struct {
// Time when this image was first detected.
firstDetected time.Time
// Time when we last saw this image being used.
lastUsed time.Time
// Size of the image in bytes.
size int64
}
// NewImageGCManager instantiates a new ImageGCManager object.
func NewImageGCManager(dockerClient *client.Client, policy ImageGCPolicy, sandboxImage string) (ImageGCManager, error) {
// Validate policy.
if policy.HighThresholdPercent < 0 || policy.HighThresholdPercent > 100 {
return nil, fmt.Errorf("invalid HighThresholdPercent %d, must be in range [0-100]", policy.HighThresholdPercent)
}
if policy.LowThresholdPercent < 0 || policy.LowThresholdPercent > 100 {
return nil, fmt.Errorf("invalid LowThresholdPercent %d, must be in range [0-100]", policy.LowThresholdPercent)
}
if policy.LowThresholdPercent > policy.HighThresholdPercent {
return nil, fmt.Errorf("LowThresholdPercent %d can not be higher than HighThresholdPercent %d", policy.LowThresholdPercent, policy.HighThresholdPercent)
}
im := &realImageGCManager{
dockerClient: dockerClient,
policy: policy,
imageRecords: make(map[string]*imageRecord),
initialized: false,
sandboxImage: sandboxImage,
}
return im, nil
}
func (im *realImageGCManager) Start() {
logrus.Infof("start image gc manager; image gc period: %f", im.policy.ImageGCPeriod.Seconds())
go wait.Until(func() {
// Initial detection make detected time "unknown" in the past.
var ts time.Time
if im.initialized {
ts = time.Now()
}
_, err := im.detectImages(ts)
if err != nil {
logrus.Warningf("[imageGCManager] Failed to monitor images: %v", err)
} else {
im.initialized = true
}
}, im.policy.ImageGCPeriod, wait.NeverStop)
prevImageGCFailed := false
go wait.Until(func() {
if err := im.GarbageCollect(); err != nil {
if prevImageGCFailed {
logrus.Errorf("Image garbage collection failed multiple times in a row: %v", err)
} else {
logrus.Errorf("Image garbage collection failed once. Stats initialization may not have completed yet: %v", err)
}
prevImageGCFailed = true
} else {
logrus.Debug("Image garbage collection succeeded")
}
}, im.policy.ImageGCPeriod, wait.NeverStop)
}
func (im *realImageGCManager) SetServiceImages(serviceImages []string) {
logrus.Infof("set service images: %s", strings.Join(serviceImages, ","))
im.serviceImages = serviceImages
}
func (im *realImageGCManager) detectImages(detectTime time.Time) (sets.String, error) {
imagesInUse := sets.NewString()
// copy service images
serviceImages := make([]string, len(im.serviceImages))
copy(serviceImages, im.serviceImages)
// Always consider the container runtime pod sandbox image in use
serviceImages = append(serviceImages, im.sandboxImage)
for _, image := range serviceImages {
imageRef, err := im.getImageRef(image)
if err == nil && imageRef != "" {
imagesInUse.Insert(imageRef)
}
}
images, err := im.listImages()
if err != nil {
return imagesInUse, err
}
// Add new images and record those being used.
now := time.Now()
currentImages := sets.NewString()
im.imageRecordsLock.Lock()
defer im.imageRecordsLock.Unlock()
for _, image := range images {
logrus.Debugf("Adding image ID %s to currentImages", image.ID)
currentImages.Insert(image.ID)
// New image, set it as detected now.
if _, ok := im.imageRecords[image.ID]; !ok {
logrus.Debugf("Image ID %s is new", image.ID)
im.imageRecords[image.ID] = &imageRecord{
firstDetected: detectTime,
}
}
// Set last used time to now if the image is being used.
if isImageUsed(image.ID, imagesInUse) {
logrus.Debugf("Setting Image ID %s lastUsed to %v", image.ID, now)
im.imageRecords[image.ID].lastUsed = now
}
logrus.Debugf("Image ID %s has size %d", image.ID, image.Size)
im.imageRecords[image.ID].size = image.Size
}
// Remove old images from our records.
for image := range im.imageRecords {
if !currentImages.Has(image) {
logrus.Debugf("Image ID %s is no longer present; removing from imageRecords", image)
delete(im.imageRecords, image)
}
}
return imagesInUse, nil
}
func (im *realImageGCManager) getImageRef(imageID string) (string, error) {
ctx, cancel := getContextWithTimeout(3 * time.Second)
defer cancel()
inspect, _, err := im.dockerClient.ImageInspectWithRaw(ctx, imageID)
if err != nil {
if strings.Contains(err.Error(), "No such image") {
return "", ErrImageNotFound
}
return "", err
}
return inspect.ID, nil
}
func (im *realImageGCManager) listImages() ([]types.ImageSummary, error) {
ctx, cancel := getContextWithTimeout(3 * time.Second)
defer cancel()
return im.dockerClient.ImageList(ctx, types.ImageListOptions{})
}
func (im *realImageGCManager) removeImage(imageID string) error {
ctx, cancel := getContextWithTimeout(3 * time.Second)
defer cancel()
opts := types.ImageRemoveOptions{
Force: true,
}
items, err := im.dockerClient.ImageRemove(ctx, imageID, opts)
if err != nil {
return err
}
for _, item := range items {
if item.Deleted != "" {
logrus.Debugf("image deleted: %s", item.Deleted)
}
if item.Untagged != "" {
logrus.Debugf("image untagged: %s", item.Untagged)
}
}
return nil
}
func (im *realImageGCManager) dockerRootDir() (string, error) {
ctx, cancel := getContextWithTimeout(3 * time.Second)
defer cancel()
dockerInfo, err := im.dockerClient.Info(ctx)
if err != nil {
return "", fmt.Errorf("docker info: %v", err)
}
return dockerInfo.DockerRootDir, nil
}
// getContextWithTimeout returns a context with timeout.
func getContextWithTimeout(timeout time.Duration) (context.Context, context.CancelFunc) {
return context.WithTimeout(context.Background(), timeout)
}
func (im *realImageGCManager) GarbageCollect() error {
dockerRootDir, err := im.dockerRootDir()
if err != nil {
logrus.Errorf("failed to get docker root dir: %v; use '/'", err)
dockerRootDir = "/"
}
logrus.Infof("docker root dir: %s", dockerRootDir)
fsStats, err := GetFsStats(dockerRootDir)
if err != nil {
return err
}
available := fsStats.AvailableBytes
capacity := fsStats.CapacityBytes
if available > capacity {
logrus.Warningf("available %d is larger than capacity %d", available, capacity)
available = capacity
}
// Check valid capacity.
if capacity == 0 {
err := goerrors.New("invalid capacity 0 on image filesystem")
return err
}
// If over the max threshold, free enough to place us at the lower threshold.
usagePercent := 100 - int(available*100/capacity)
if usagePercent >= im.policy.HighThresholdPercent {
amountToFree := int64(capacity)*int64(100-im.policy.LowThresholdPercent)/100 - int64(available)
logrus.Infof("[imageGCManager]: Disk usage on image filesystem is at %d%% which is over the high threshold (%d%%). Trying to free %d bytes down to the low threshold (%d%%).", usagePercent, im.policy.HighThresholdPercent, amountToFree, im.policy.LowThresholdPercent)
freed, err := im.freeSpace(amountToFree, time.Now())
if err != nil {
return err
}
if freed < amountToFree {
return fmt.Errorf("failed to garbage collect required amount of images. Wanted to free %d bytes, but freed %d bytes", amountToFree, freed)
}
}
return nil
}
// Tries to free bytesToFree worth of images on the disk.
//
// Returns the number of bytes free and an error if any occurred. The number of
// bytes freed is always returned.
// Note that error may be nil and the number of bytes free may be less
// than bytesToFree.
func (im *realImageGCManager) freeSpace(bytesToFree int64, freeTime time.Time) (int64, error) {
imagesInUse, err := im.detectImages(freeTime)
if err != nil {
return 0, err
}
im.imageRecordsLock.Lock()
defer im.imageRecordsLock.Unlock()
// Get all images in eviction order.
images := make([]evictionInfo, 0, len(im.imageRecords))
for image, record := range im.imageRecords {
if isImageUsed(image, imagesInUse) {
logrus.Debugf("Image ID %s is being used", image)
continue
}
images = append(images, evictionInfo{
id: image,
imageRecord: *record,
})
}
sort.Sort(byLastUsedAndDetected(images))
// Delete unused images until we've freed up enough space.
var deletionErrors []error
spaceFreed := int64(0)
for _, image := range images {
logrus.Debugf("Evaluating image ID %s for possible garbage collection", image.id)
// Images that are currently in used were given a newer lastUsed.
if image.lastUsed.Equal(freeTime) || image.lastUsed.After(freeTime) {
logrus.Debugf("Image ID %s has lastUsed=%v which is >= freeTime=%v, not eligible for garbage collection", image.id, image.lastUsed, freeTime)
continue
}
// Avoid garbage collect the image if the image is not old enough.
// In such a case, the image may have just been pulled down, and will be used by a container right away.
if freeTime.Sub(image.firstDetected) < im.policy.MinAge {
logrus.Debugf("Image ID %s has age %v which is less than the policy's minAge of %v, not eligible for garbage collection", image.id, freeTime.Sub(image.firstDetected), im.policy.MinAge)
continue
}
// Remove image. Continue despite errors.
logrus.Debugf("[imageGCManager]: Removing image %q to free %d bytes", image.id, image.size)
err := im.removeImage(image.id)
if err != nil {
continue
}
delete(im.imageRecords, image.id)
spaceFreed += image.size
if spaceFreed >= bytesToFree {
break
}
}
if len(deletionErrors) > 0 {
return spaceFreed, fmt.Errorf("wanted to free %d bytes, but freed %d bytes space with errors in image deletion: %v", bytesToFree, spaceFreed, errors.NewAggregate(deletionErrors))
}
return spaceFreed, nil
}
type evictionInfo struct {
id string
imageRecord
}
type byLastUsedAndDetected []evictionInfo
func (ev byLastUsedAndDetected) Len() int { return len(ev) }
func (ev byLastUsedAndDetected) Swap(i, j int) { ev[i], ev[j] = ev[j], ev[i] }
func (ev byLastUsedAndDetected) Less(i, j int) bool {
// Sort by last used, break ties by detected.
if ev[i].lastUsed.Equal(ev[j].lastUsed) {
return ev[i].firstDetected.Before(ev[j].firstDetected)
}
return ev[i].lastUsed.Before(ev[j].lastUsed)
}
func isImageUsed(imageID string, imagesInUse sets.String) bool {
// Check the image ID.
if _, ok := imagesInUse[imageID]; ok {
return true
}
return false
}

View File

@ -0,0 +1,89 @@
package gc
import (
"context"
"testing"
"time"
"github.com/docker/docker/client"
)
var dockerTimeout = 10 * time.Second
func defaultContext() context.Context {
ctx, _ := context.WithTimeout(context.Background(), dockerTimeout)
return ctx
}
func TestGetFsStats(t *testing.T) {
fs, err := GetFsStats("/")
if err != nil {
t.Fatal(err)
}
t.Logf("capacity: %v", fs.CapacityBytes)
t.Logf("available: %v", fs.AvailableBytes)
}
func TestGetImageRef(t *testing.T) {
dockerCli, err := client.NewEnvClient()
if err != nil {
t.Fatal(err)
}
im := realImageGCManager{
dockerClient: dockerCli,
}
if _, err := im.getImageRef("nginx"); err != nil {
t.Error(err)
}
}
func TestListImages(t *testing.T) {
dockerCli, err := client.NewEnvClient()
if err != nil {
t.Fatal(err)
}
im := realImageGCManager{
dockerClient: dockerCli,
}
images, err := im.listImages()
if err != nil {
t.Fatal(err)
}
for _, image := range images {
t.Logf("%s\n", image.ID)
}
}
func TestRemoveImage(t *testing.T) {
dockerCli, err := client.NewEnvClient()
if err != nil {
t.Fatal(err)
}
im := realImageGCManager{
dockerClient: dockerCli,
}
if err := im.removeImage("sha256:568c4670fa800978e08e4a51132b995a54f8d5ae83ca133ef5546d092b864acf"); err != nil {
t.Fatalf("remove image: %v", err)
}
}
func TestDockerRootDir(t *testing.T) {
dockerCli, err := client.NewEnvClient()
if err != nil {
t.Fatal(err)
}
dockerInfo, err := dockerCli.Info(defaultContext())
if err != nil {
t.Errorf("docker info: %v", err)
}
t.Logf("docker root dir: %s", dockerInfo.DockerRootDir)
}

View File

@ -33,6 +33,7 @@ import (
"github.com/goodrain/rainbond/node/api" "github.com/goodrain/rainbond/node/api"
"github.com/goodrain/rainbond/node/nodem/client" "github.com/goodrain/rainbond/node/nodem/client"
"github.com/goodrain/rainbond/node/nodem/controller" "github.com/goodrain/rainbond/node/nodem/controller"
"github.com/goodrain/rainbond/node/nodem/gc"
"github.com/goodrain/rainbond/node/nodem/healthy" "github.com/goodrain/rainbond/node/nodem/healthy"
"github.com/goodrain/rainbond/node/nodem/info" "github.com/goodrain/rainbond/node/nodem/info"
"github.com/goodrain/rainbond/node/nodem/monitor" "github.com/goodrain/rainbond/node/nodem/monitor"
@ -40,6 +41,8 @@ import (
"github.com/goodrain/rainbond/util" "github.com/goodrain/rainbond/util"
) )
var sandboxImage = "k8s.gcr.io/pause-amd64:latest"
//NodeManager node manager //NodeManager node manager
type NodeManager struct { type NodeManager struct {
currentNode *client.HostNode currentNode *client.HostNode
@ -52,6 +55,8 @@ type NodeManager struct {
cfg *option.Conf cfg *option.Conf
apim *api.Manager apim *api.Manager
clm *logger.ContainerLogManage clm *logger.ContainerLogManage
imageGCManager gc.ImageGCManager
} }
//NewNodeManager new a node manager //NewNodeManager new a node manager
@ -68,17 +73,30 @@ func NewNodeManager(conf *option.Conf) (*NodeManager, error) {
if err != nil { if err != nil {
return nil, fmt.Errorf("Get host id error:%s", err.Error()) return nil, fmt.Errorf("Get host id error:%s", err.Error())
} }
imageGCPolicy := gc.ImageGCPolicy{
MinAge: conf.ImageMinimumGCAge,
ImageGCPeriod: conf.ImageGCPeriod,
HighThresholdPercent: int(conf.ImageGCHighThresholdPercent),
LowThresholdPercent: int(conf.ImageGCLowThresholdPercent),
}
imageGCManager, err := gc.NewImageGCManager(conf.DockerCli, imageGCPolicy, sandboxImage)
if err != nil {
return nil, fmt.Errorf("create new imageGCManager: %v", err)
}
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
nodem := &NodeManager{ nodem := &NodeManager{
cfg: conf, cfg: conf,
ctx: ctx, ctx: ctx,
cancel: cancel, cancel: cancel,
cluster: cluster, cluster: cluster,
monitor: monitor, monitor: monitor,
healthy: healthyManager, healthy: healthyManager,
controller: controller, controller: controller,
clm: clm, clm: clm,
currentNode: &client.HostNode{ID: uid}, currentNode: &client.HostNode{ID: uid},
imageGCManager: imageGCManager,
} }
return nodem, nil return nodem, nil
} }
@ -128,6 +146,14 @@ func (n *NodeManager) Start(errchan chan error) error {
} else { } else {
logrus.Infof("this node(%s) is not compute node or disable collect container log ,do not start container log manage", n.currentNode.Role) logrus.Infof("this node(%s) is not compute node or disable collect container log ,do not start container log manage", n.currentNode.Role)
} }
if n.cfg.EnableImageGC {
if n.currentNode.Role.HasRule(client.ManageNode) && !n.currentNode.Role.HasRule(client.ComputeNode) {
n.imageGCManager.SetServiceImages(n.controller.ListServiceImages())
go n.imageGCManager.Start()
}
}
go n.monitor.Start(errchan) go n.monitor.Start(errchan)
go n.heartbeat() go n.heartbeat()
return nil return nil

View File

@ -23,11 +23,9 @@ import (
"fmt" "fmt"
"sync" "sync"
"github.com/goodrain/rainbond/util"
"github.com/goodrain/rainbond/worker/appm/store" "github.com/goodrain/rainbond/worker/appm/store"
"github.com/goodrain/rainbond/worker/appm/types/v1" "github.com/goodrain/rainbond/worker/appm/types/v1"
"github.com/goodrain/rainbond/util"
"k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes"
) )

View File

@ -32,6 +32,7 @@ import (
"github.com/goodrain/rainbond/worker/appm/controller" "github.com/goodrain/rainbond/worker/appm/controller"
"github.com/goodrain/rainbond/worker/appm/store" "github.com/goodrain/rainbond/worker/appm/store"
"github.com/goodrain/rainbond/worker/discover/model" "github.com/goodrain/rainbond/worker/discover/model"
"github.com/goodrain/rainbond/worker/gc"
"github.com/goodrain/rainbond/worker/handle" "github.com/goodrain/rainbond/worker/handle"
grpc1 "google.golang.org/grpc" grpc1 "google.golang.org/grpc"
) )
@ -57,9 +58,11 @@ type TaskManager struct {
func NewTaskManager(cfg option.Config, func NewTaskManager(cfg option.Config,
store store.Storer, store store.Storer,
controllermanager *controller.Manager, controllermanager *controller.Manager,
garbageCollector *gc.GarbageCollector,
startCh *channels.RingChannel) *TaskManager { startCh *channels.RingChannel) *TaskManager {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
handleManager := handle.NewManager(ctx, cfg, store, controllermanager, startCh) handleManager := handle.NewManager(ctx, cfg, store, controllermanager, garbageCollector, startCh)
healthStatus["status"] = "health" healthStatus["status"] = "health"
healthStatus["info"] = "worker service health" healthStatus["info"] = "worker service health"
return &TaskManager{ return &TaskManager{
@ -122,7 +125,7 @@ func (t *TaskManager) Do() {
} }
rc := t.handleManager.AnalystToExec(transData) rc := t.handleManager.AnalystToExec(transData)
if rc != nil && rc != handle.ErrCallback { if rc != nil && rc != handle.ErrCallback {
logrus.Errorf("analyst to exet: %v", rc) logrus.Warningf("execute task: %v", rc)
TaskError++ TaskError++
} else if rc != nil && rc == handle.ErrCallback { } else if rc != nil && rc == handle.ErrCallback {
logrus.Errorf("err callback; analyst to exet: %v", rc) logrus.Errorf("err callback; analyst to exet: %v", rc)

View File

@ -144,6 +144,13 @@ func NewTaskBody(taskType string, body []byte) TaskBody {
return nil return nil
} }
return b return b
case "service_gc":
b := ServiceGCTaskBody{}
err := ffjson.Unmarshal(body, &b)
if err != nil {
return nil
}
return b
case "delete_tenant": case "delete_tenant":
b := &DeleteTenantTaskBody{} b := &DeleteTenantTaskBody{}
err := ffjson.Unmarshal(body, &b) err := ffjson.Unmarshal(body, &b)
@ -329,6 +336,13 @@ type GroupStopTaskBody struct {
Strategy []string `json:"strategy"` Strategy []string `json:"strategy"`
} }
// ServiceGCTaskBody holds the request body to execute service gc task.
type ServiceGCTaskBody struct {
TenantID string `json:"tenant_id"`
ServiceID string `json:"service_id"`
EventIDs []string `json:"event_ids"`
}
// DeleteTenantTaskBody - // DeleteTenantTaskBody -
type DeleteTenantTaskBody struct { type DeleteTenantTaskBody struct {
TenantID string `json:"tenant_id"` TenantID string `json:"tenant_id"`

92
worker/gc/gc.go Normal file
View File

@ -0,0 +1,92 @@
// Copyright (C) 2nilfmt.Errorf("a")4-2nilfmt.Errorf("a")8 Goodrain Co., Ltd.
// RAINBOND, Application Management Platform
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version. For any non-GPL usage of Rainbond,
// one or multiple Commercial Licenses authorized by Goodrain Co., Ltd.
// must be obtained first.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package gc
import (
"fmt"
"os"
"path"
"github.com/Sirupsen/logrus"
eventutil "github.com/goodrain/rainbond/eventlog/util"
"github.com/goodrain/rainbond/worker/discover/model"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
)
// GarbageCollector -
type GarbageCollector struct {
clientset kubernetes.Interface
}
// NewGarbageCollector -
func NewGarbageCollector(clientset kubernetes.Interface) *GarbageCollector {
gcr := &GarbageCollector{
clientset: clientset,
}
return gcr
}
// DelLogFile deletes persistent data related to the service based on serviceID.
func (g *GarbageCollector) DelLogFile(serviceGCReq model.ServiceGCTaskBody) {
logrus.Infof("service id: %s; delete log file.", serviceGCReq.ServiceID)
// log generated during service running
logPath := "/grdata/logs"
dockerLogPath := eventutil.DockerLogFilePath(logPath, serviceGCReq.ServiceID)
if err := os.RemoveAll(dockerLogPath); err != nil {
logrus.Warningf("remove docker log files: %v", err)
}
// log generated by the service event
eventLogPath := eventutil.EventLogFilePath(logPath)
for _, eventID := range serviceGCReq.EventIDs {
eventLogFileName := eventutil.EventLogFileName(eventLogPath, eventID)
logrus.Debugf("remove event log file: %s", eventLogFileName)
if err := os.RemoveAll(eventLogFileName); err != nil {
logrus.Warningf("file: %s; remove event log file: %v", eventLogFileName, err)
}
}
}
// DelVolumeData -
func (g *GarbageCollector) DelVolumeData(serviceGCReq model.ServiceGCTaskBody) {
f := func(prefix string) {
dir := path.Join(prefix, fmt.Sprintf("tenant/%s/service/%s", serviceGCReq.TenantID, serviceGCReq.ServiceID))
logrus.Infof("volume data. delete %s", dir)
if err := os.RemoveAll(dir); err != nil {
logrus.Warningf("dir: %s; remove volume data: %v", dir, err)
}
}
f("/grdata")
}
// DelPvPvcByServiceID -
func (g *GarbageCollector) DelPvPvcByServiceID(serviceGCReq model.ServiceGCTaskBody) {
logrus.Infof("service_id: %s", serviceGCReq.ServiceID)
deleteOpts := &metav1.DeleteOptions{}
listOpts := metav1.ListOptions{
LabelSelector: fmt.Sprintf("service_id=%s", serviceGCReq.ServiceID),
}
if err := g.clientset.CoreV1().PersistentVolumes().DeleteCollection(deleteOpts, listOpts); err != nil {
logrus.Warningf("service id: %s; delete a collection fo PV: %v", serviceGCReq.ServiceID, err)
}
if err := g.clientset.CoreV1().PersistentVolumeClaims(serviceGCReq.TenantID).DeleteCollection(deleteOpts, listOpts); err != nil {
logrus.Warningf("service id: %s; delete a collection fo PVC: %v", serviceGCReq.ServiceID, err)
}
}

View File

@ -40,6 +40,7 @@ import (
"github.com/goodrain/rainbond/worker/appm/store" "github.com/goodrain/rainbond/worker/appm/store"
v1 "github.com/goodrain/rainbond/worker/appm/types/v1" v1 "github.com/goodrain/rainbond/worker/appm/types/v1"
"github.com/goodrain/rainbond/worker/discover/model" "github.com/goodrain/rainbond/worker/discover/model"
"github.com/goodrain/rainbond/worker/gc"
) )
//Manager manager //Manager manager
@ -49,6 +50,7 @@ type Manager struct {
store store.Storer store store.Storer
dbmanager db.Manager dbmanager db.Manager
controllerManager *controller.Manager controllerManager *controller.Manager
garbageCollector *gc.GarbageCollector
startCh *channels.RingChannel startCh *channels.RingChannel
} }
@ -58,6 +60,7 @@ func NewManager(ctx context.Context,
config option.Config, config option.Config,
store store.Storer, store store.Storer,
controllerManager *controller.Manager, controllerManager *controller.Manager,
garbageCollector *gc.GarbageCollector,
startCh *channels.RingChannel) *Manager { startCh *channels.RingChannel) *Manager {
return &Manager{ return &Manager{
@ -66,6 +69,7 @@ func NewManager(ctx context.Context,
dbmanager: db.GetManager(), dbmanager: db.GetManager(),
store: store, store: store,
controllerManager: controllerManager, controllerManager: controllerManager,
garbageCollector: garbageCollector,
startCh: startCh, startCh: startCh,
} }
} }
@ -117,6 +121,9 @@ func (m *Manager) AnalystToExec(task *model.Task) error {
case "apply_plugin_config": case "apply_plugin_config":
logrus.Info("start a 'apply_plugin_config' task worker") logrus.Info("start a 'apply_plugin_config' task worker")
return m.applyPluginConfig(task) return m.applyPluginConfig(task)
case "service_gc":
logrus.Info("start the 'service_gc' task")
return m.ExecServiceGCTask(task)
case "delete_tenant": case "delete_tenant":
logrus.Info("start a 'delete_tenant' task worker") logrus.Info("start a 'delete_tenant' task worker")
return m.deleteTenant(task) return m.deleteTenant(task)
@ -474,6 +481,19 @@ func (m *Manager) applyPluginConfig(task *model.Task) error {
return nil return nil
} }
// ExecServiceGCTask executes the 'service_gc' task
func (m *Manager) ExecServiceGCTask(task *model.Task) error {
serviceGCReq, ok := task.Body.(model.ServiceGCTaskBody)
if !ok {
return fmt.Errorf("can not convert the request body to 'ServiceGCTaskBody'")
}
m.garbageCollector.DelLogFile(serviceGCReq)
m.garbageCollector.DelPvPvcByServiceID(serviceGCReq)
m.garbageCollector.DelVolumeData(serviceGCReq)
return nil
}
func (m *Manager) deleteTenant(task *model.Task) (err error) { func (m *Manager) deleteTenant(task *model.Task) (err error) {
body, ok := task.Body.(*model.DeleteTenantTaskBody) body, ok := task.Body.(*model.DeleteTenantTaskBody)
if !ok { if !ok {

View File

@ -17,18 +17,17 @@ limitations under the License.
package controller package controller
import ( import (
"bytes"
"encoding/json"
"fmt" "fmt"
"github.com/goodrain/rainbond/db/dao"
"io/ioutil" "io/ioutil"
"net/http"
"os" "os"
"strings" "strings"
"sync" "sync"
"time" "time"
"github.com/Sirupsen/logrus" "github.com/Sirupsen/logrus"
"github.com/goodrain/rainbond/worker/master/volumes/provider/lib/controller/metrics"
"github.com/goodrain/rainbond/worker/master/volumes/provider/lib/util"
"golang.org/x/time/rate" "golang.org/x/time/rate"
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
storage "k8s.io/api/storage/v1" storage "k8s.io/api/storage/v1"
@ -47,6 +46,10 @@ import (
ref "k8s.io/client-go/tools/reference" ref "k8s.io/client-go/tools/reference"
"k8s.io/client-go/util/workqueue" "k8s.io/client-go/util/workqueue"
utilversion "k8s.io/kubernetes/pkg/util/version" utilversion "k8s.io/kubernetes/pkg/util/version"
"github.com/goodrain/rainbond/db/dao"
"github.com/goodrain/rainbond/worker/master/volumes/provider/lib/controller/metrics"
"github.com/goodrain/rainbond/worker/master/volumes/provider/lib/util"
) )
// annClass annotation represents the storage class associated with a resource: // annClass annotation represents the storage class associated with a resource:
@ -491,7 +494,51 @@ func NewProvisionController(
volumeHandler := cache.ResourceEventHandlerFuncs{ volumeHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { controller.enqueueWork(controller.volumeQueue, obj) }, AddFunc: func(obj interface{}) { controller.enqueueWork(controller.volumeQueue, obj) },
UpdateFunc: func(oldObj, newObj interface{}) { controller.enqueueWork(controller.volumeQueue, newObj) }, UpdateFunc: func(oldObj, newObj interface{}) { controller.enqueueWork(controller.volumeQueue, newObj) },
DeleteFunc: func(obj interface{}) { controller.forgetWork(controller.volumeQueue, obj) }, DeleteFunc: func(obj interface{}) {
controller.forgetWork(controller.volumeQueue, obj)
pv, ok := obj.(*v1.PersistentVolume)
if !ok {
logrus.Errorf("expected persistent volume but got %+v", obj)
return
}
// rainbondsssc
switch pv.Spec.StorageClassName {
case "rainbondsssc":
path := pv.Spec.PersistentVolumeSource.HostPath.Path
if err := os.RemoveAll(path); err != nil {
logrus.Errorf("path: %s; name: %s; remove pv hostpath: %v", path, pv.Name, err)
}
logrus.Infof("storage class: rainbondsssc; path: %s; successfully delete pv hsot path", path)
case "rainbondslsc":
nodeIP := func() string {
for _, me := range pv.Spec.NodeAffinity.Required.NodeSelectorTerms[0].MatchExpressions {
if me.Key != "rainbond_node_ip" {
continue
}
return me.Values[0]
}
return ""
}()
if nodeIP == "" {
logrus.Errorf("storage class: rainbondslsc; name: %s; node ip not found", pv.Name)
return
}
path := pv.Spec.PersistentVolumeSource.HostPath.Path
if err := deletePath(nodeIP, path); err != nil {
logrus.Errorf("delete path: %v", err)
return
}
logrus.Infof("storage class: rainbondslsc; path: %s; successfully delete pv hsot path", path)
default:
logrus.Debugf("unsupported storage class: %s", pv.Spec.StorageClassName)
}
},
} }
if controller.volumeInformer != nil { if controller.volumeInformer != nil {
@ -1212,3 +1259,48 @@ func (ctrl *ProvisionController) supportsBlock(pr Provisioner) bool {
} }
return false return false
} }
func deletePath(nodeIP, path string) error {
logrus.Infof("node ip: %s; path: %s; delete pv hostpath", nodeIP, path)
reqOpts := map[string]string{
"path": path,
}
retry := 3
var err error
var statusCode int
for retry > 0 {
retry--
body := bytes.NewBuffer(nil)
if err := json.NewEncoder(body).Encode(reqOpts); err != nil {
return err
}
// create request
var req *http.Request
req, err = http.NewRequest("DELETE", fmt.Sprintf("http://%s:6100/v2/localvolumes", nodeIP), body)
if err != nil {
logrus.Warningf("remaining retry times: %d; path: %s; new request: %v", retry, path, err)
continue
}
var res *http.Response
res, err = http.DefaultClient.Do(req)
if err != nil {
logrus.Warningf("remaining retry times: %d; path: %s; do http request: %v", retry, path, err)
continue
}
defer res.Body.Close()
statusCode = res.StatusCode
if statusCode == 200 {
return nil
}
logrus.Warningf("remaining retry times: %d; path: %s; status code: %d; delete local volume", retry, path, res.StatusCode)
time.Sleep(1 * time.Second)
}
return fmt.Errorf("delete local path: %s; status code: %d; err: %v", path, statusCode, err)
}