Fixed data loss during stateful component recovery

This commit is contained in:
barnettZQG 2020-03-18 16:55:01 +08:00
parent 45ea5caea2
commit 4684e98de9
4 changed files with 70 additions and 23 deletions

View File

@ -277,7 +277,7 @@ func (b *BackupAPPNew) backupServiceInfo(serviceInfos []*RegionServiceSnapshot)
b.Logger.Info(fmt.Sprintf("Start backup application(%s) persistent data", app.Service.ServiceAlias), map[string]string{"step": "backup_builder", "status": "starting"})
//backup app data,The overall data of the direct backup service
if len(app.ServiceVolume) > 0 {
dstDir := fmt.Sprintf("%s/data_%s/%s.zip", b.SourceDir, "all", "data")
dstDir := fmt.Sprintf("%s/data_%s/%s.zip", b.SourceDir, app.Service.ServiceID, "__all_data")
_, sharepath := GetVolumeDir()
serviceVolumeData := path.Join(sharepath, "tenant", app.Service.TenantID, "service", app.Service.ServiceID)
if !util.DirIsEmpty(serviceVolumeData) {

View File

@ -25,6 +25,7 @@ import (
"os"
"path"
"path/filepath"
"strconv"
"strings"
"time"
@ -58,6 +59,7 @@ type BackupAPPRestore struct {
cacheDir string
//serviceChange key: oldServiceID
serviceChange map[string]*Info
volumeIDMap map[uint]uint
etcdcli *clientv3.Client
S3Config struct {
@ -91,6 +93,7 @@ func BackupAPPRestoreCreater(in []byte, m *exectorManager) (TaskWorker, error) {
DockerClient: m.DockerClient,
etcdcli: m.EtcdCli,
serviceChange: make(map[string]*Info, 0),
volumeIDMap: make(map[uint]uint),
}
if err := ffjson.Unmarshal(in, &backupRestore); err != nil {
return nil, err
@ -197,22 +200,38 @@ func (b *BackupAPPRestore) restoreVersionAndData(backup *dbmodel.AppBackup, appS
b.Logger.Info(fmt.Sprintf("开始恢复应用(%s)持久化数据", app.Service.ServiceAlias), map[string]string{"step": "restore_builder", "status": "starting"})
//restore app data
//if all data backup file exist, restore all data directly
allDataFilePath := fmt.Sprintf("%s/data_%s/%s.zip", b.cacheDir, b.getOldServiceID(app.ServiceID), "__all_data")
allDataRestore := false
allTmpDir := fmt.Sprintf("/grdata/tmp/%s", app.ServiceID)
if exist, _ := util.FileExists(allDataFilePath); exist {
if err := util.Unzip(allDataFilePath, allTmpDir); err != nil {
logrus.Errorf("unzip all data file failure %s", err.Error())
} else {
allDataRestore = true
}
}
for _, volume := range app.ServiceVolume {
if volume.HostPath == "" {
continue
}
dstDir := fmt.Sprintf("%s/data_%s/%s.zip", b.cacheDir, b.getOldServiceID(app.ServiceID), strings.Replace(volume.VolumeName, "/", "", -1))
tmpDir := fmt.Sprintf("/grdata/tmp/%s_%d", volume.ServiceID, volume.ID)
if err := util.Unzip(dstDir, tmpDir); err != nil {
if !strings.Contains(err.Error(), "no such file") {
logrus.Errorf("restore service(%s) volume(%s) data error.%s", app.ServiceID, volume.VolumeName, err.Error())
return err
var tmpDir string
if !allDataRestore {
dstDir := fmt.Sprintf("%s/data_%s/%s.zip", b.cacheDir, b.getOldServiceID(app.ServiceID), strings.Replace(volume.VolumeName, "/", "", -1))
tmpDir = fmt.Sprintf("/grdata/tmp/%s_%d", volume.ServiceID, volume.ID)
if err := util.Unzip(dstDir, tmpDir); err != nil {
if !strings.Contains(err.Error(), "no such file") {
logrus.Errorf("restore service(%s) volume(%s) data error.%s", app.ServiceID, volume.VolumeName, err.Error())
return err
}
//backup data is not exist because dir is empty.
//so create host path and continue
os.MkdirAll(volume.HostPath, 0777)
continue
}
//backup data is not exist because dir is empty.
//so create host path and continue
os.MkdirAll(volume.HostPath, 0777)
continue
} else {
tmpDir = fmt.Sprintf("%s/", allTmpDir)
}
//if app type is statefulset, change pod hostpath
@ -225,7 +244,18 @@ func (b *BackupAPPRestore) restoreVersionAndData(backup *dbmodel.AppBackup, appS
}
for _, path := range list {
newNameTmp := strings.Split(filepath.Base(path), "-")
newNameTmp[0] = b.serviceChange[b.getOldServiceID(app.ServiceID)].ServiceAlias
// before use PVC, path name is pod name. eg gr123456-0
if len(newNameTmp) == 2 {
newNameTmp[0] = b.serviceChange[b.getOldServiceID(app.ServiceID)].ServiceAlias
}
//pvc name in path , manual16-grcaa708-0
if len(newNameTmp) == 3 {
newNameTmp[1] = b.serviceChange[b.getOldServiceID(app.ServiceID)].ServiceAlias
oldVolumeID, _ := strconv.Atoi(newNameTmp[0][6:])
if oldVolumeID > 0 {
newNameTmp[0] = fmt.Sprintf("manual%d", b.volumeIDMap[uint(oldVolumeID)])
}
}
newName := strings.Join(newNameTmp, "-")
newpath := filepath.Join(util.GetParentDirectory(path), newName)
err := util.Rename(path, newpath)
@ -243,18 +273,26 @@ func (b *BackupAPPRestore) restoreVersionAndData(backup *dbmodel.AppBackup, appS
}
}
}
err := util.Rename(tmpDir, util.GetParentDirectory(volume.HostPath))
if err != nil {
if strings.Contains(err.Error(), "file exists") {
if err := util.MergeDir(tmpDir, util.GetParentDirectory(volume.HostPath)); err != nil {
if !allDataRestore {
err := util.Rename(tmpDir, util.GetParentDirectory(volume.HostPath))
if err != nil {
if strings.Contains(err.Error(), "file exists") {
if err := util.MergeDir(tmpDir, util.GetParentDirectory(volume.HostPath)); err != nil {
return err
}
} else {
return err
}
} else {
}
if err := os.Chmod(volume.HostPath, 0777); err != nil {
return err
}
}
if err := os.Chmod(volume.HostPath, 0777); err != nil {
return err
}
if allDataRestore {
err := util.Rename(path.Join(allTmpDir, b.getOldServiceID(app.ServiceID)), fmt.Sprintf("/grdata/tenant/%s/service/%s", app.Service.TenantID, app.Service.ServiceID))
if err != nil {
logrus.Errorf("rename %s to %s failure %s", path.Join(allTmpDir, b.getOldServiceID(app.ServiceID)), fmt.Sprintf("/grdata/tenant/%s/service/%s", app.Service.TenantID, app.Service.ServiceID), err.Error())
}
}
b.Logger.Info(fmt.Sprintf("完成恢复应用(%s)持久化数据", app.Service.ServiceAlias), map[string]string{"step": "restore_builder", "status": "running"})
@ -479,6 +517,7 @@ func (b *BackupAPPRestore) restoreMetadata(appSnapshot *AppSnapshot) error {
}
localPath, sharePath := GetVolumeDir()
for _, a := range app.ServiceVolume {
oldVolumeID := a.ID
a.ID = 0
switch a.VolumeType {
//nfs
@ -501,11 +540,11 @@ func (b *BackupAPPRestore) restoreMetadata(appSnapshot *AppSnapshot) error {
a.HostPath = fmt.Sprintf("%s/tenant/%s/service/%s%s", sharePath, b.TenantID, a.ServiceID, a.VolumePath)
}
}
if err := db.GetManager().TenantServiceVolumeDaoTransactions(tx).AddModel(a); err != nil {
tx.Rollback()
return fmt.Errorf("create app volume when restore backup error. %s", err.Error())
}
b.volumeIDMap[oldVolumeID] = a.ID
}
for _, a := range app.ServicePort {
a.ID = 0

View File

@ -21,10 +21,12 @@ package exector
import (
"testing"
"github.com/Sirupsen/logrus"
"github.com/pquerna/ffjson/ffjson"
dbmodel "github.com/goodrain/rainbond/db/model"
"github.com/goodrain/rainbond/event"
"github.com/goodrain/rainbond/util"
)
func TestModify(t *testing.T) {
@ -71,3 +73,11 @@ func TestModify(t *testing.T) {
re, _ := ffjson.Marshal(appSnapshot)
t.Log(string(re))
}
func TestUnzipAllDataFile(t *testing.T) {
allDataFilePath := "/tmp/__all_data.zip"
allTmpDir := "/tmp/4f25c53e864744ec95d037528acaa708"
if err := util.Unzip(allDataFilePath, allTmpDir); err != nil {
logrus.Errorf("unzip all data file failure %s", err.Error())
}
}

View File

@ -68,8 +68,6 @@ func Run() error {
logrus.Infof("rewrite hosts file success")
}
oldHosts = hosts
} else {
logrus.Infof("hosts not change %+v", hosts)
}
}
}