From 578a6d94ddc7a444aa580f37ee143a16d8c78066 Mon Sep 17 00:00:00 2001 From: glyasai Date: Fri, 1 Nov 2019 19:09:45 +0800 Subject: [PATCH] volume gc --- api/handler/service.go | 25 ++++++++++++++-- worker/discover/model/model.go | 17 ++++++++++- worker/gc/gc.go | 52 +++++++++++++++++++++++++++++++--- worker/handle/manager.go | 16 +++++++++++ 4 files changed, 103 insertions(+), 7 deletions(-) diff --git a/api/handler/service.go b/api/handler/service.go index 3ddf9556e..f2ec25502 100644 --- a/api/handler/service.go +++ b/api/handler/service.go @@ -1385,11 +1385,32 @@ func (s *ServiceAction) VolumnVar(tsv *dbmodel.TenantServiceVolume, tenantID, fi } }() if tsv.VolumeName != "" { - err := db.GetManager().TenantServiceVolumeDaoTransactions(tx).DeleteModel(tsv.ServiceID, tsv.VolumeName) - if err != nil && err.Error() != gorm.ErrRecordNotFound.Error() { + volume, err := db.GetManager().TenantServiceVolumeDaoTransactions(tx).GetVolumeByServiceIDAndName(tsv.ServiceID, tsv.VolumeName) + if err != nil { + tx.Rollback() + return util.CreateAPIHandleErrorFromDBError("find volume", err) + } + + if err := db.GetManager().TenantServiceVolumeDaoTransactions(tx).DeleteModel(tsv.ServiceID, tsv.VolumeName); err != nil && err.Error() != gorm.ErrRecordNotFound.Error() { tx.Rollback() return util.CreateAPIHandleErrorFromDBError("delete volume", err) } + + err = s.MQClient.SendBuilderTopic(gclient.TaskStruct{ + Topic: gclient.WorkerTopic, + TaskType: "volume_gc", + TaskBody: map[string]interface{}{ + "tenant_id": tenantID, + "service_id": volume.ServiceID, + "volume_id": volume.ID, + "volume_path": volume.VolumePath, + }, + }) + if err != nil { + logrus.Errorf("send 'volume_gc' task: %v", err) + tx.Rollback() + return util.CreateAPIHandleErrorFromDBError("send 'volume_gc' task", err) + } } else { if err := db.GetManager().TenantServiceVolumeDaoTransactions(tx).DeleteByServiceIDAndVolumePath(tsv.ServiceID, tsv.VolumePath); err != nil && err.Error() != gorm.ErrRecordNotFound.Error() { tx.Rollback() diff --git a/worker/discover/model/model.go b/worker/discover/model/model.go index 25fb9808c..3e5673bfe 100644 --- a/worker/discover/model/model.go +++ b/worker/discover/model/model.go @@ -151,6 +151,13 @@ func NewTaskBody(taskType string, body []byte) TaskBody { return nil } return b + case "volume_gc": + b := VolumeGCTaskBody{} + err := ffjson.Unmarshal(body, &b) + if err != nil { + return nil + } + return b default: return DefaultTaskBody{} } @@ -317,12 +324,20 @@ type GroupStopTaskBody struct { Strategy []string `json:"strategy"` } -// ServiceGCTaskBody holds the request body to execute volume gc task. +// ServiceGCTaskBody holds the request body to execute service gc task. type ServiceGCTaskBody struct { TenantID string `json:"tenant_id"` ServiceID string `json:"service_id"` EventIDs []string `json:"event_ids"` } +// VolumeGCTaskBody holds the request body to execute volume gc task. +type VolumeGCTaskBody struct { + TenantID string `json:"tenant_id"` + ServiceID string `json:"service_id"` + VolumeID int `json:"volume_id"` + VolumePath string `json:"volume_path"` +} + //DefaultTaskBody 默认操作任务主体 type DefaultTaskBody map[string]interface{} diff --git a/worker/gc/gc.go b/worker/gc/gc.go index 4ea6270bd..371f20b6f 100644 --- a/worker/gc/gc.go +++ b/worker/gc/gc.go @@ -21,6 +21,8 @@ package gc import ( "fmt" "os" + "path" + "strings" "github.com/Sirupsen/logrus" eventutil "github.com/goodrain/rainbond/eventlog/util" @@ -64,11 +66,15 @@ func (g *GarbageCollector) DelLogFile(serviceGCReq model.ServiceGCTaskBody) { // DelVolumeData - func (g *GarbageCollector) DelVolumeData(serviceGCReq model.ServiceGCTaskBody) { - dir := fmt.Sprintf("/grdata/tenant/%s/service/%s", serviceGCReq.TenantID, serviceGCReq.ServiceID) - logrus.Infof("volume data. delete %s", dir) - if err := os.RemoveAll(dir); err != nil { - logrus.Warningf("dir: %s; remove volume data: %v", dir, err) + f := func(prefix string) { + dir := path.Join(prefix, fmt.Sprintf("tenant/%s/service/%s", serviceGCReq.TenantID, serviceGCReq.ServiceID)) + logrus.Infof("volume data. delete %s", dir) + if err := os.RemoveAll(dir); err != nil { + logrus.Warningf("dir: %s; remove volume data: %v", dir, err) + } } + f("/grdata") + f("/grlocaldata") } // DelPvPvcByServiceID - @@ -86,3 +92,41 @@ func (g *GarbageCollector) DelPvPvcByServiceID(serviceGCReq model.ServiceGCTaskB logrus.Warningf("service id: %s; delete a collection fo PVC: %v", serviceGCReq.ServiceID, err) } } + +// DelVolumeDataByVolumeID - +func (g *GarbageCollector) DelVolumeDataByVolumeID(volumeGCReq model.VolumeGCTaskBody) { + f := func(prefix string) { + dir := path.Join(prefix, fmt.Sprintf("tenant/%s/service/%s", volumeGCReq.TenantID, volumeGCReq.ServiceID), volumeGCReq.VolumePath) + logrus.Infof("volume data. delete %s", dir) + if err := os.RemoveAll(dir); err != nil { + logrus.Warningf("dir: %s; remove volume data: %v", dir, err) + } + } + f("/grdata") + f("/grlocaldata") +} + +// DelPvPvcByVolumeID - +func (g *GarbageCollector) DelPvPvcByVolumeID(volumeGCReq model.VolumeGCTaskBody) { + logrus.Infof("volume id: %d; delete PV/PVC.", volumeGCReq.VolumeID) + listOpts := metav1.ListOptions{ + LabelSelector: fmt.Sprintf("service_id=%s", volumeGCReq.ServiceID), + } + pvs, err := g.clientset.CoreV1().PersistentVolumes().List(listOpts) + if err != nil { + logrus.Warningf("list pvc: %v", err) + } + namePrefix := fmt.Sprintf("manual%d", volumeGCReq.VolumeID) + for _, pv := range pvs.Items { + claimRef := pv.Spec.ClaimRef + if !strings.HasPrefix(claimRef.Name, namePrefix) { + continue + } + if err := g.clientset.CoreV1().PersistentVolumeClaims(volumeGCReq.TenantID).Delete(claimRef.Name, &metav1.DeleteOptions{}); err != nil { + logrus.Warningf("volume id: %d; delete pvc: %v", volumeGCReq.VolumeID, err) + } + if err := g.clientset.CoreV1().PersistentVolumes().Delete(pv.Name, &metav1.DeleteOptions{}); err != nil { + logrus.Warningf("volume id: %d; delete pv: %v", volumeGCReq.VolumeID, err) + } + } +} diff --git a/worker/handle/manager.go b/worker/handle/manager.go index 32b594d3d..9bafe3773 100644 --- a/worker/handle/manager.go +++ b/worker/handle/manager.go @@ -119,6 +119,9 @@ func (m *Manager) AnalystToExec(task *model.Task) error { case "service_gc": logrus.Info("start the 'service_gc' task") return m.ExecServiceGCTask(task) + case "volume_gc": + logrus.Info("start the 'volume_gc' task") + return m.ExecVolumeGCTask(task) default: logrus.Warning("task can not execute because no type is identified") return nil @@ -454,3 +457,16 @@ func (m *Manager) ExecServiceGCTask(task *model.Task) error { return nil } + +// ExecVolumeGCTask executes the 'volume_gc' task +func (m *Manager) ExecVolumeGCTask(task *model.Task) error { + volumeGCReq, ok := task.Body.(model.VolumeGCTaskBody) + if !ok { + return fmt.Errorf("can not convert the request body to 'VolumeGCTaskBody'") + } + + m.garbageCollector.DelPvPvcByVolumeID(volumeGCReq) + m.garbageCollector.DelVolumeDataByVolumeID(volumeGCReq) + + return nil +}