prepare volume

This commit is contained in:
凡羊羊 2019-12-19 18:03:46 +08:00
parent 099a938119
commit 86b7cf7de3
15 changed files with 220 additions and 71 deletions

View File

@ -33,26 +33,6 @@ import (
// VolumeOptions list volume option
func VolumeOptions(w http.ResponseWriter, r *http.Request) {
// swagger:operation POST /v2/volume-options v2 volumeOptions
//
// 查询可用存储驱动模型列表
//
// get volume-options
//
// ---
// consumes:
// - application/json
// - application/x-protobuf
//
// produces:
// - application/json
// - application/xml
//
// responses:
// default:
// schema:
// description: 统一返回格式
volumetypeOptions, err := handler.GetVolumeTypeHandler().GetAllVolumeTypes()
if err != nil {
httputil.ReturnError(r, w, 500, err.Error())
@ -63,7 +43,7 @@ func VolumeOptions(w http.ResponseWriter, r *http.Request) {
// ListVolumeType list volume type list
func ListVolumeType(w http.ResponseWriter, r *http.Request) {
// swagger:operation POST /v2/volume-options v2 volumeOptions
// swagger:operation POST /v2/volume-options v2 volumeOptions TODO fanyangyang delete it
//
// 查询可用存储驱动模型列表
//

View File

@ -76,7 +76,6 @@ func (vta *VolumeTypeAction) VolumeTypeVar(action string, vtm *dbmodel.TenantSer
// GetAllVolumeTypes get all volume types
func (vta *VolumeTypeAction) GetAllVolumeTypes() ([]*api_model.VolumeTypeStruct, error) {
var optionList []*api_model.VolumeTypeStruct
volumeTypeMap := make(map[string]*dbmodel.TenantServiceVolumeType)
volumeTypes, err := db.GetManager().VolumeTypeDao().GetAllVolumeTypes()
@ -110,6 +109,7 @@ func (vta *VolumeTypeAction) GetAllVolumeTypes() ([]*api_model.VolumeTypeStruct,
optionList = append(optionList, &api_model.VolumeTypeStruct{
VolumeType: vt.VolumeType,
NameShow: vt.NameShow,
Provisioner: vt.Provisioner,
CapacityValidation: capacityValidation,
Description: vt.Description,
AccessMode: accessMode,
@ -234,7 +234,8 @@ func (vta *VolumeTypeAction) SetVolumeType(vol *api_model.VolumeTypeStruct) erro
dbVolume.SharePolicy = strings.Join(sharePolicy, ",")
dbVolume.BackupPolicy = strings.Join(backupPolicy, ",")
dbVolume.ReclaimPolicy = vol.ReclaimPolicy
dbVolume.StorageClassDetail = string(jsonStorageClassDetailStr)
dbVolume.StorageClassDetail = string(jsonStorageClassDetailStr) // TODO fanyangyang StorageClass规范性校验 并返回正确的结构将结构中的provisoner赋值
dbVolume.Provisioner = "provisioner" // TODO fanyangyang 根据StorageClass获取
dbVolume.Sort = vol.Sort
dbVolume.Enable = vol.Enable

View File

@ -28,6 +28,7 @@ type VolumeTypeStruct struct {
SharePolicy []string `json:"share_policy"` //共享模式
BackupPolicy []string `json:"backup_policy"` // 备份策略
ReclaimPolicy string `json:"reclaim_policy"` // 回收策略,delete, retain, recyle
Provisioner string `json:"provisioner"` //存储提供方
StorageClassDetail map[string]interface{} `json:"storage_class_detail" validate:"storage_class_detail|required"`
Sort int `json:"sort"` // 排序
Enable bool `json:"enable"` // 是否生效
@ -35,9 +36,8 @@ type VolumeTypeStruct struct {
// VolumeTypePageStruct volume option struct with page
type VolumeTypePageStruct struct {
list *VolumeTypeStruct
page int
pageSize int
count int
list *VolumeTypeStruct
page int
pageSize int
count int
}

View File

@ -69,7 +69,7 @@ type VolumeTypeDao interface {
GetAllVolumeTypes() ([]*model.TenantServiceVolumeType, error)
GetAllVolumeTypesByPage(page int, pageSize int) ([]*model.TenantServiceVolumeType, error)
GetVolumeTypeByType(vt string) (*model.TenantServiceVolumeType, error)
FindOrCreate(vt *model.TenantServiceVolumeType) (*model.TenantServiceVolumeType, error)
CreateOrUpdateVolumeType(vt *model.TenantServiceVolumeType) (*model.TenantServiceVolumeType, error)
}
//LicenseDao LicenseDao

View File

@ -402,13 +402,17 @@ func TestVolumeType(t *testing.T) {
t.Log("yes")
}
}
func TestGetVolumeType(t *testing.T) {
func initDBManager(t *testing.T) {
if err := CreateManager(dbconfig.Config{
MysqlConnectionInfo: "ieZoo9:Maigoed0@tcp(192.168.2.108:3306)/region",
DBType: "mysql",
}); err != nil {
t.Fatal(err)
}
}
func TestGetVolumeType(t *testing.T) {
initDBManager(t)
vts, err := GetManager().VolumeTypeDao().GetAllVolumeTypes()
if err != nil {
t.Fatal(err)
@ -417,7 +421,10 @@ func TestGetVolumeType(t *testing.T) {
t.Logf("%+v", vt)
}
t.Logf("volume type len is : %v", len(vts))
}
func TestGetVolumeTypeByType(t *testing.T) {
initDBManager(t)
vt, err := GetManager().VolumeTypeDao().GetVolumeTypeByType("ceph-rbd")
if err != nil {
t.Fatal("get volumeType by type error: ", err.Error())

View File

@ -21,26 +21,18 @@ package model
// TenantServiceVolumeType tenant service volume type
type TenantServiceVolumeType struct {
Model
// 存储类型
VolumeType string `gorm:"column:volume_type; size:64" json:"volume_type"`
// 别名
NameShow string `gorm:"column:name_show; size:64" json:"name_show"`
// 存储大小校验条件
VolumeType string `gorm:"column:volume_type; size:64" json:"volume_type"`
NameShow string `gorm:"column:name_show; size:64" json:"name_show"`
CapacityValidation string `gorm:"column:capacity_validation; size:1024" json:"capacity_validation"`
// 描述
Description string `gorm:"column:description; size:1024" json:"description"`
//读写模式
AccessMode string `gorm:"column:access_mode; size:128" json:"access_mode"`
// 备份策略
BackupPolicy string `gorm:"column:backup_policy; size:128" json:"backup_policy"`
// 回收策略
ReclaimPolicy string `gorm:"column:reclaim_policy; size:20" json:"reclaim_policy"`
// 分享策略
Description string `gorm:"column:description; size:1024" json:"description"`
AccessMode string `gorm:"column:access_mode; size:128" json:"access_mode"`
BackupPolicy string `gorm:"column:backup_policy; size:128" json:"backup_policy"`
ReclaimPolicy string `gorm:"column:reclaim_policy; size:20" json:"reclaim_policy"`
SharePolicy string `gorm:"share_policy; size:128" json:"share_policy"`
Provisioner string `gorm:"provisioner; size:128" json:"provisioner"`
StorageClassDetail string `gorm:"storage_class_detail; size:2048" json:"storage_class_detail"`
// 排序
Sort int `gorm:"sort; default:9999" json:"sort"`
Enable bool `gorm:"enable; default: false" json:"enable"`
Sort int `gorm:"sort; default:9999" json:"sort"`
Enable bool `gorm:"enable" json:"enable"`
}
// TableName 表名

View File

@ -21,6 +21,7 @@ package dao
import (
"fmt"
"github.com/Sirupsen/logrus"
"github.com/goodrain/rainbond/db/model"
"github.com/jinzhu/gorm"
@ -31,16 +32,25 @@ type VolumeTypeDaoImpl struct {
DB *gorm.DB
}
// FindOrCreate find or create volumeType
func (vtd *VolumeTypeDaoImpl) FindOrCreate(vt *model.TenantServiceVolumeType) (*model.TenantServiceVolumeType, error) {
// CreateOrUpdateVolumeType find or create volumeType, !!! attentionjust for store sync storageclass from k8s
func (vtd *VolumeTypeDaoImpl) CreateOrUpdateVolumeType(vt *model.TenantServiceVolumeType) (*model.TenantServiceVolumeType, error) {
if vt.VolumeType == model.ShareFileVolumeType.String() || vt.VolumeType == model.LocalVolumeType.String() || vt.VolumeType == model.MemoryFSVolumeType.String() {
return vt, nil
}
volumeType, err := vtd.GetVolumeTypeByType(vt.VolumeType)
if err != nil && err != gorm.ErrRecordNotFound {
return nil, err
}
if err == gorm.ErrRecordNotFound || volumeType == nil {
return vt, vtd.AddModel(vt)
logrus.Debugf("volume type[%s] do not exists, create it", vt.VolumeType)
err = vtd.AddModel(vt)
} else {
logrus.Debugf("volume type[%s] already exists, update it", vt.VolumeType)
volumeType.Provisioner = vt.Provisioner
volumeType.StorageClassDetail = vt.StorageClassDetail
err = vtd.UpdateModel(volumeType)
}
return volumeType, nil
return volumeType, err
}
//AddModel AddModel
@ -78,7 +88,7 @@ func (vtd *VolumeTypeDaoImpl) GetAllVolumeTypes() ([]*model.TenantServiceVolumeT
// GetAllVolumeTypesByPage get all volumeTypes by page
func (vtd *VolumeTypeDaoImpl) GetAllVolumeTypesByPage(page int, pageSize int) ([]*model.TenantServiceVolumeType, error) {
var volumeTypes []*model.TenantServiceVolumeType
if err := vtd.DB.Limit(pageSize).Offset((page-1)*pageSize).Find(&volumeTypes).Error; err != nil {
if err := vtd.DB.Limit(pageSize).Offset((page - 1) * pageSize).Find(&volumeTypes).Error; err != nil {
return nil, err
}
return volumeTypes, nil
@ -86,14 +96,11 @@ func (vtd *VolumeTypeDaoImpl) GetAllVolumeTypesByPage(page int, pageSize int) ([
// GetVolumeTypeByType get volume type by type
func (vtd *VolumeTypeDaoImpl) GetVolumeTypeByType(vt string) (*model.TenantServiceVolumeType, error) {
var volumeTypes []*model.TenantServiceVolumeType
if err := vtd.DB.Where("volume_type=?", vt).Find(&volumeTypes).Error; err != nil {
var volumeType model.TenantServiceVolumeType
if err := vtd.DB.Where("volume_type=?", vt).Find(&volumeType).Error; err != nil {
return nil, err
}
if len(volumeTypes) == 0 {
return nil, nil
}
return volumeTypes[0], nil
return &volumeType, nil
}
// DeleteModelByVolumeTypes delete volume by type

View File

@ -30,6 +30,7 @@ import (
v1 "github.com/goodrain/rainbond/worker/appm/types/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
k8sErrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
)
@ -142,6 +143,62 @@ func (s *upgradeController) upgradeService(newapp v1.AppService) {
}
}
}
func (s *upgradeController) upgradeClaim(newapp v1.AppService) {
nowApp := s.manager.store.GetAppService(newapp.ServiceID)
nowClaims := nowApp.GetClaims()
newClaims := newapp.GetClaims()
var nowClaimMaps = make(map[string]*corev1.PersistentVolumeClaim, len(nowClaims))
for i, now := range nowClaims {
nowClaimMaps[now.Name] = nowClaims[i]
}
for _, n := range newClaims {
if o, ok := nowClaimMaps[n.Name]; ok {
n.UID = o.UID
n.ResourceVersion = o.ResourceVersion
claim, err := s.manager.client.CoreV1().PersistentVolumeClaims(n.Namespace).Update(n)
if err != nil {
logrus.Errorf("error updating claim: %+v: err: %v", claim, err)
continue
}
nowApp.SetClaim(claim)
delete(nowClaimMaps, o.Name)
logrus.Debugf("ServiceID: %s; successfully update claim: %s", nowApp.ServiceID, claim.Name)
} else {
claim, err := s.manager.client.CoreV1().PersistentVolumeClaims(n.Namespace).Get(n.Name, metav1.GetOptions{})
if err != nil {
if k8sErrors.IsNotFound(err) {
_, err := s.manager.client.CoreV1().PersistentVolumeClaims(n.Namespace).Create(n)
if err != nil {
logrus.Errorf("error creating claim: %+v: err: %v", err)
continue
}
} else {
logrus.Errorf("err get claim[%s:%s], err: %+v", n.Namespace, n.Name, err)
}
}
if claim != nil {
logrus.Infof("claim is exists, do not create again, and can't update it", claim.Name)
} else {
claim, err = s.manager.client.CoreV1().PersistentVolumeClaims(n.Namespace).Update(n)
if err != nil {
logrus.Errorf("error update claim: %+v: err: %v", claim, err)
continue
}
logrus.Debugf("ServiceID: %s; successfully create claim: %s", nowApp.ServiceID, claim.Name)
}
nowApp.SetClaim(claim)
}
}
for _, claim := range nowClaimMaps {
if claim != nil {
if err := s.manager.client.CoreV1().PersistentVolumeClaims(claim.Namespace).Delete(claim.Name, &metav1.DeleteOptions{}); err != nil {
logrus.Errorf("error deleting claim: %+v: err: %v", claim, err)
continue
}
logrus.Debugf("ServiceID: %s; successfully delete claim: %s", nowApp.ServiceID, claim.Name)
}
}
}
func (s *upgradeController) upgradeOne(app v1.AppService) error {
//first: check and create namespace
@ -155,6 +212,7 @@ func (s *upgradeController) upgradeOne(app v1.AppService) error {
}
}
s.upgradeConfigMap(app)
s.upgradeClaim(app)
if deployment := app.GetDeployment(); deployment != nil {
_, err := s.manager.client.AppsV1().Deployments(deployment.Namespace).Patch(deployment.Name, types.MergePatchType, app.UpgradePatch["deployment"])
if err != nil {
@ -165,6 +223,7 @@ func (s *upgradeController) upgradeOne(app v1.AppService) error {
if statefulset := app.GetStatefulSet(); statefulset != nil {
_, err := s.manager.client.AppsV1().StatefulSets(statefulset.Namespace).Patch(statefulset.Name, types.MergePatchType, app.UpgradePatch["statefulset"])
if err != nil {
logrus.Errorf("patch statefulset error : %s", err.Error())
app.Logger.Error(fmt.Sprintf("upgrade statefulset %s failure %s", app.ServiceAlias, err.Error()), event.GetLoggerOption("failure"))
return fmt.Errorf("upgrade statefulset %s failure %s", app.ServiceAlias, err.Error())
}
@ -179,7 +238,6 @@ func (s *upgradeController) upgradeOne(app v1.AppService) error {
}
_ = f.UpgradeSecrets(s.manager.client, &app, oldApp.GetSecrets(), app.GetSecrets(), handleErr)
_ = f.UpgradeIngress(s.manager.client, &app, oldApp.GetIngress(), app.GetIngress(), handleErr)
_ = f.UpgradeClaims(s.manager.client, &app, oldApp.GetClaims(), app.GetClaims(), handleErr)
return s.WaitingReady(app)
}

View File

@ -316,15 +316,36 @@ func UpgradeClaims(clientset *kubernetes.Clientset, as *v1.AppService, old, new
delete(oldMap, o.Name)
logrus.Debugf("ServiceID: %s; successfully update claim: %s", as.ServiceID, claim.Name)
} else {
claim, err := clientset.CoreV1().PersistentVolumeClaims(n.Namespace).Create(n)
claim, err := clientset.CoreV1().PersistentVolumeClaims(n.Namespace).Get(n.Name, metav1.GetOptions{})
if err != nil {
if err := handleErr(fmt.Sprintf("error creating claim: %+v: err: %v", claim, err), err); err != nil {
return err
if k8sErrors.IsNotFound(err) {
_, err := clientset.CoreV1().PersistentVolumeClaims(n.Namespace).Create(n)
if err != nil {
if err := handleErr(fmt.Sprintf("error creating claim: %+v: err: %v",
n, err), err); err != nil {
return err
}
continue
}
} else {
if e := handleErr(fmt.Sprintf("err get claim[%s:%s], err: %+v", n.Namespace, n.Name, err), err); err != nil {
return e
}
}
continue
}
if claim != nil {
logrus.Infof("claim is exists, do not create again, and can't update it", claim.Name)
} else {
claim, err = clientset.CoreV1().PersistentVolumeClaims(n.Namespace).Update(n)
if err != nil {
if err := handleErr(fmt.Sprintf("error update claim: %+v: err: %v", claim, err), err); err != nil {
return err
}
continue
}
logrus.Debugf("ServiceID: %s; successfully create claim: %s", as.ServiceID, claim.Name)
}
as.SetClaim(claim)
logrus.Debugf("ServiceID: %s; successfully create claim: %s", as.ServiceID, claim.Name)
}
}
for _, claim := range oldMap {

View File

@ -271,7 +271,7 @@ func NewStore(clientset *kubernetes.Clientset,
store.informers.ReplicaSet.AddEventHandlerWithResyncPeriod(store, time.Second*10)
store.informers.Endpoints.AddEventHandlerWithResyncPeriod(epEventHandler, time.Second*10)
store.informers.Nodes.AddEventHandlerWithResyncPeriod(store, time.Second*10)
store.informers.StorageClass.AddEventHandlerWithResyncPeriod(store, time.Second*10)
store.informers.StorageClass.AddEventHandlerWithResyncPeriod(store, time.Second*300)
store.informers.Events.AddEventHandlerWithResyncPeriod(store.evtEventHandler(), time.Second*10)
store.informers.HorizontalPodAutoscaler.AddEventHandlerWithResyncPeriod(store, time.Second*10)
@ -576,8 +576,23 @@ func (a *appRuntimeStore) OnAdd(obj interface{}) {
}
if sc, ok := obj.(*storagev1.StorageClass); ok {
vt := workerutil.TransStorageClass2RBDVolumeType(sc)
if _, err := a.dbmanager.VolumeTypeDao().FindOrCreate(vt); err != nil {
return
if _, err := db.GetManager().VolumeTypeDao().CreateOrUpdateVolumeType(vt); err != nil { // TODO update 时处理
logrus.Errorf("sync storageclass error : %s, ignore it", err.Error())
}
}
if claim, ok := obj.(*corev1.PersistentVolumeClaim); ok {
serviceID := claim.Labels["service_id"]
version := claim.Labels["version"]
createrID := claim.Labels["creater_id"]
if serviceID != "" && createrID != "" {
appservice, err := a.getAppService(serviceID, version, createrID, true)
if err == conversion.ErrServiceNotFound {
a.conf.KubeClient.CoreV1().PersistentVolumeClaims(claim.Namespace).Delete(claim.Name, &metav1.DeleteOptions{})
}
if appservice != nil {
appservice.SetClaim(claim)
return
}
}
}
}
@ -743,6 +758,22 @@ func (a *appRuntimeStore) OnDelete(obj interface{}) {
return
}
}
if claim, ok := obj.(*corev1.PersistentVolumeClaim); ok {
serviceID := claim.Labels["service_id"]
version := claim.Labels["version"]
createrID := claim.Labels["creater_id"]
if serviceID != "" && createrID != "" {
appservice, _ := a.getAppService(serviceID, version, createrID, false)
if appservice != nil {
appservice.DeleteClaim(claim)
if appservice.IsClosed() {
a.DeleteAppService(appservice)
}
return
}
}
}
}
//RegistAppService regist a app model to store.

View File

@ -19,12 +19,15 @@
package store
import (
"fmt"
k8sErrors "k8s.io/apimachinery/pkg/api/errors"
"testing"
"time"
"github.com/eapache/channels"
v2beta1 "k8s.io/api/autoscaling/v2beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
@ -194,3 +197,50 @@ func getStoreForTest(t *testing.T) Storer {
}
return storer
}
func TestPatch(t *testing.T) {
ocfg := option.Config{
DBType: "mysql",
MysqlConnectionInfo: "oc6Poh:noot6Mea@tcp(192.168.2.203:3306)/region",
EtcdEndPoints: []string{"http://192.168.2.203:2379"},
EtcdTimeout: 5,
KubeConfig: "/Users/fanyangyang/Documents/company/goodrain/admin.kubeconfig",
LeaderElectionNamespace: "rainbond",
}
storer := getStoreForTest(t)
c, err := clientcmd.BuildConfigFromFlags("", ocfg.KubeConfig)
if err != nil {
t.Fatalf("read kube config file error: %v", err)
}
clientset, err := kubernetes.NewForConfig(c)
if err != nil {
t.Fatalf("create kube api client error: %v", err)
}
app := storer.GetAppService("f55f7e28ec441ce5765998259756cc09")
if app == nil {
t.Fatal("app is niil")
}
statefulset := app.GetStatefulSet()
if statefulset == nil {
t.Fatal("stateful iis nil")
}
t.Logf("s is : %+v", statefulset)
claims := statefulset.Spec.VolumeClaimTemplates
if claims != nil {
statefulset.Spec.VolumeClaimTemplates = nil
for _, claim := range claims {
if _, err = clientset.CoreV1().PersistentVolumeClaims(app.TenantID).Get(claim.Name, metav1.GetOptions{}); err != nil {
if k8sErrors.IsNotFound(err) {
clientset.CoreV1().PersistentVolumeClaims(app.TenantID).Create(&claim)
} else {
t.Fatal(err)
}
}
}
}
_, err = clientset.AppsV1().StatefulSets(statefulset.Namespace).Patch(statefulset.Name, types.MergePatchType, app.UpgradePatch["statefulset"])
if err != nil {
t.Fatal(fmt.Sprintf("upgrade statefulset %s failure %s", app.ServiceAlias, err.Error()), nil)
}
time.Sleep(30 * time.Second)
}

View File

@ -576,6 +576,7 @@ func (a *AppService) GetClaims() []*corev1.PersistentVolumeClaim {
// SetClaim set claim
func (a *AppService) SetClaim(claim *corev1.PersistentVolumeClaim) {
claim.Namespace = a.TenantID
if len(a.claims) > 0 {
for i, c := range a.claims {
if c.GetName() == claim.GetName() {

View File

@ -59,17 +59,16 @@ func (v *OtherVolume) CreateVolume(define *Define) error {
return "linux"
}(),
}
v.as.SetClaim(claim) // store claim to appService
statefulset := v.as.GetStatefulSet() //有状态组件
vo := corev1.Volume{Name: volumeMountName}
if statefulset != nil {
statefulset.Spec.VolumeClaimTemplates = append(statefulset.Spec.VolumeClaimTemplates, *claim)
} else {
v.as.SetClaim(claim) // store claim to appService
vo := corev1.Volume{Name: volumeMountName}
vo.PersistentVolumeClaim = &corev1.PersistentVolumeClaimVolumeSource{ClaimName: claim.GetName(), ReadOnly: volumeReadOnly}
define.volumes = append(define.volumes, vo)
logrus.Warnf("service[%s] is not stateful, mount volume by k8s volume.PersistenVolumeClaim[%s]", v.svm.ServiceID, claim.GetName())
}
define.volumes = append(define.volumes, vo)
vm := corev1.VolumeMount{
Name: volumeMountName,

View File

@ -108,6 +108,7 @@ func newVolumeClaim(name, volumePath, accessMode, storageClassName string, capac
Name: name,
Labels: labels,
Annotations: annotations,
Namespace: "string",
},
Spec: corev1.PersistentVolumeClaimSpec{
AccessModes: []corev1.PersistentVolumeAccessMode{parseAccessMode(accessMode)},

View File

@ -55,6 +55,7 @@ func TransStorageClass2RBDVolumeType(sc *storagev1.StorageClass) *dbmodel.Tenant
NameShow: sc.GetName(),
CapacityValidation: string(cvbs),
StorageClassDetail: string(scbs),
Provisioner: sc.Provisioner,
AccessMode: strings.Join(defaultAccessMode, ","),
BackupPolicy: strings.Join(defaultBackupPolicy, ","),
SharePolicy: strings.Join(defaultSharePolicy, ","),