del path assosiated with pv

This commit is contained in:
glyasai 2019-11-06 16:24:20 +08:00
parent eef9135f14
commit 88f1124513
6 changed files with 116 additions and 77 deletions

View File

@ -65,3 +65,20 @@ func CreateLocalVolume(w http.ResponseWriter, r *http.Request) {
}
httputil.ReturnSuccess(r, w, map[string]string{"path": volumeHostPath})
}
// DeleteLocalVolume delete local volume dir
func DeleteLocalVolume(w http.ResponseWriter, r *http.Request) {
var requestopt = make(map[string]string)
if err := json.NewDecoder(r.Body).Decode(&requestopt); err != nil {
w.WriteHeader(400)
return
}
path := requestopt["path"]
if err := os.RemoveAll(path); err != nil {
logrus.Errorf("path: %s; remove pv path: %v", path, err)
w.WriteHeader(500)
}
httputil.ReturnSuccess(r, w, nil)
}

View File

@ -54,6 +54,7 @@ func Routers(mode string) *chi.Mux {
})
r.Route("/localvolumes", func(r chi.Router) {
r.Post("/create", controller.CreateLocalVolume)
r.Delete("/", controller.DeleteLocalVolume)
})
//以下只有管理节点具有的API
if mode == "master" {

View File

@ -151,13 +151,6 @@ func NewTaskBody(taskType string, body []byte) TaskBody {
return nil
}
return b
case "volume_gc":
b := VolumeGCTaskBody{}
err := ffjson.Unmarshal(body, &b)
if err != nil {
return nil
}
return b
default:
return DefaultTaskBody{}
}
@ -331,13 +324,5 @@ type ServiceGCTaskBody struct {
EventIDs []string `json:"event_ids"`
}
// VolumeGCTaskBody holds the request body to execute volume gc task.
type VolumeGCTaskBody struct {
TenantID string `json:"tenant_id"`
ServiceID string `json:"service_id"`
VolumeID int `json:"volume_id"`
VolumePath string `json:"volume_path"`
}
//DefaultTaskBody 默认操作任务主体
type DefaultTaskBody map[string]interface{}

View File

@ -22,7 +22,6 @@ import (
"fmt"
"os"
"path"
"strings"
"github.com/Sirupsen/logrus"
eventutil "github.com/goodrain/rainbond/eventlog/util"
@ -74,7 +73,6 @@ func (g *GarbageCollector) DelVolumeData(serviceGCReq model.ServiceGCTaskBody) {
}
}
f("/grdata")
f("/grlocaldata")
}
// DelPvPvcByServiceID -
@ -92,41 +90,3 @@ func (g *GarbageCollector) DelPvPvcByServiceID(serviceGCReq model.ServiceGCTaskB
logrus.Warningf("service id: %s; delete a collection fo PVC: %v", serviceGCReq.ServiceID, err)
}
}
// DelVolumeDataByVolumeID -
func (g *GarbageCollector) DelVolumeDataByVolumeID(volumeGCReq model.VolumeGCTaskBody) {
f := func(prefix string) {
dir := path.Join(prefix, fmt.Sprintf("tenant/%s/service/%s", volumeGCReq.TenantID, volumeGCReq.ServiceID), volumeGCReq.VolumePath)
logrus.Infof("volume data. delete %s", dir)
if err := os.RemoveAll(dir); err != nil {
logrus.Warningf("dir: %s; remove volume data: %v", dir, err)
}
}
f("/grdata")
f("/grlocaldata")
}
// DelPvPvcByVolumeID -
func (g *GarbageCollector) DelPvPvcByVolumeID(volumeGCReq model.VolumeGCTaskBody) {
logrus.Infof("volume id: %d; delete PV/PVC.", volumeGCReq.VolumeID)
listOpts := metav1.ListOptions{
LabelSelector: fmt.Sprintf("service_id=%s", volumeGCReq.ServiceID),
}
pvs, err := g.clientset.CoreV1().PersistentVolumes().List(listOpts)
if err != nil {
logrus.Warningf("list pvc: %v", err)
}
namePrefix := fmt.Sprintf("manual%d", volumeGCReq.VolumeID)
for _, pv := range pvs.Items {
claimRef := pv.Spec.ClaimRef
if !strings.HasPrefix(claimRef.Name, namePrefix) {
continue
}
if err := g.clientset.CoreV1().PersistentVolumeClaims(volumeGCReq.TenantID).Delete(claimRef.Name, &metav1.DeleteOptions{}); err != nil {
logrus.Warningf("volume id: %d; delete pvc: %v", volumeGCReq.VolumeID, err)
}
if err := g.clientset.CoreV1().PersistentVolumes().Delete(pv.Name, &metav1.DeleteOptions{}); err != nil {
logrus.Warningf("volume id: %d; delete pv: %v", volumeGCReq.VolumeID, err)
}
}
}

View File

@ -119,9 +119,6 @@ func (m *Manager) AnalystToExec(task *model.Task) error {
case "service_gc":
logrus.Info("start the 'service_gc' task")
return m.ExecServiceGCTask(task)
case "volume_gc":
logrus.Info("start the 'volume_gc' task")
return m.ExecVolumeGCTask(task)
default:
logrus.Warning("task can not execute because no type is identified")
return nil
@ -452,21 +449,8 @@ func (m *Manager) ExecServiceGCTask(task *model.Task) error {
}
m.garbageCollector.DelLogFile(serviceGCReq)
m.garbageCollector.DelVolumeData(serviceGCReq)
m.garbageCollector.DelPvPvcByServiceID(serviceGCReq)
return nil
}
// ExecVolumeGCTask executes the 'volume_gc' task
func (m *Manager) ExecVolumeGCTask(task *model.Task) error {
volumeGCReq, ok := task.Body.(model.VolumeGCTaskBody)
if !ok {
return fmt.Errorf("can not convert the request body to 'VolumeGCTaskBody'")
}
m.garbageCollector.DelPvPvcByVolumeID(volumeGCReq)
m.garbageCollector.DelVolumeDataByVolumeID(volumeGCReq)
m.garbageCollector.DelVolumeData(serviceGCReq)
return nil
}

View File

@ -17,18 +17,17 @@ limitations under the License.
package controller
import (
"bytes"
"encoding/json"
"fmt"
"github.com/goodrain/rainbond/db/dao"
"io/ioutil"
"net/http"
"os"
"strings"
"sync"
"time"
"github.com/Sirupsen/logrus"
"github.com/goodrain/rainbond/worker/master/volumes/provider/lib/controller/metrics"
"github.com/goodrain/rainbond/worker/master/volumes/provider/lib/util"
"golang.org/x/time/rate"
"k8s.io/api/core/v1"
storage "k8s.io/api/storage/v1"
@ -47,6 +46,10 @@ import (
ref "k8s.io/client-go/tools/reference"
"k8s.io/client-go/util/workqueue"
utilversion "k8s.io/kubernetes/pkg/util/version"
"github.com/goodrain/rainbond/db/dao"
"github.com/goodrain/rainbond/worker/master/volumes/provider/lib/controller/metrics"
"github.com/goodrain/rainbond/worker/master/volumes/provider/lib/util"
)
// annClass annotation represents the storage class associated with a resource:
@ -491,7 +494,51 @@ func NewProvisionController(
volumeHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) { controller.enqueueWork(controller.volumeQueue, obj) },
UpdateFunc: func(oldObj, newObj interface{}) { controller.enqueueWork(controller.volumeQueue, newObj) },
DeleteFunc: func(obj interface{}) { controller.forgetWork(controller.volumeQueue, obj) },
DeleteFunc: func(obj interface{}) {
controller.forgetWork(controller.volumeQueue, obj)
pv, ok := obj.(*v1.PersistentVolume)
if !ok {
logrus.Errorf("expected persistent volume but got %+v", obj)
return
}
// rainbondsssc
switch pv.Spec.StorageClassName {
case "rainbondsssc":
path := pv.Spec.PersistentVolumeSource.HostPath.Path
if err := os.RemoveAll(path); err != nil {
logrus.Errorf("path: %s; name: %s; remove pv hostpath: %v", path, pv.Name, err)
}
logrus.Infof("storage class: rainbondsssc; path: %s; successfully delete pv hsot path", path)
case "rainbondslsc":
nodeIP := func() string {
for _, me := range pv.Spec.NodeAffinity.Required.NodeSelectorTerms[0].MatchExpressions {
if me.Key != "rainbond_node_ip" {
continue
}
return me.Values[0]
}
return ""
}()
if nodeIP == "" {
logrus.Errorf("storage class: rainbondslsc; name: %s; node ip not found", pv.Name)
return
}
path := pv.Spec.PersistentVolumeSource.HostPath.Path
if err := deletePath(nodeIP, path); err != nil {
logrus.Errorf("delete path: %v", err)
return
}
logrus.Infof("storage class: rainbondslsc; path: %s; successfully delete pv hsot path", path)
default:
logrus.Debugf("unsupported storage class: %s", pv.Spec.StorageClassName)
}
},
}
if controller.volumeInformer != nil {
@ -1212,3 +1259,48 @@ func (ctrl *ProvisionController) supportsBlock(pr Provisioner) bool {
}
return false
}
func deletePath(nodeIP, path string) error {
logrus.Infof("node ip: %s; path: %s; delete pv hostpath", nodeIP, path)
reqOpts := map[string]string{
"path": path,
}
retry := 3
var err error
var statusCode int
for retry > 0 {
retry--
body := bytes.NewBuffer(nil)
if err := json.NewEncoder(body).Encode(reqOpts); err != nil {
return err
}
// create request
var req *http.Request
req, err = http.NewRequest("DELETE", fmt.Sprintf("http://%s:6100/v2/localvolumes", nodeIP), body)
if err != nil {
logrus.Warningf("remaining retry times: %d; path: %s; new request: %v", retry, path, err)
continue
}
var res *http.Response
res, err = http.DefaultClient.Do(req)
if err != nil {
logrus.Warningf("remaining retry times: %d; path: %s; do http request: %v", retry, path, err)
continue
}
defer res.Body.Close()
statusCode = res.StatusCode
if statusCode == 200 {
return nil
}
logrus.Warningf("remaining retry times: %d; path: %s; status code: %d; delete local volume", retry, path, res.StatusCode)
time.Sleep(1 * time.Second)
}
return fmt.Errorf("delete local path: %s; status code: %d; err: %v", path, statusCode, err)
}