add fastdfs migrate

This commit is contained in:
s_jqzhang 2019-02-26 18:51:39 +08:00
parent 1324a2ed53
commit 8070314f11
2 changed files with 82 additions and 18 deletions

View File

@ -208,15 +208,13 @@ go-fastdfs的文件定位与其它分布式系统不同它的寻址是直接
- 已经使用fastdfs存储的文件可以迁移到go fastdfs下么
```
答案是可以的,你担心的问题是路径改变,go fastdfs为你考虑了这一点
curl -F file=@data/00/00/_78HAFwyvN2AK6ChAAHg8gw80FQ213.jpg -F path=M00/00/00/ http://127.0.0.1:8080/upload
同理可以用一行命令迁移所有文件
cd fastdfs/data && find -type f |xargs -n 1 -I {} curl -F file=@data/{} -F path=M00/00/00/ http://127.0.0.1:8080/
以上命令可以过迁粗糙
可以写一些简单脚本进行迁移
步骤:
一、下载最新版的go-fastdfs
二、将原来的fastdfs目录复制到go-fastdfs的 files目录下
三、将配置enable_migrate设为true
四、调用 http://10.1.5.9:8080/repair_fileinfo
注意迁移过程中会扫描整下files目录下的所有文件
速度较慢迁移完成后请将enable_migrate设为false
```

View File

@ -157,7 +157,9 @@ const (
"是否支持按组(集群)管理,主要用途是Nginx支持多集群": "默认不支持,不支持时路径为http://10.1.5.4:8080/action,支持时为http://10.1.5.4:8080/group(配置中的group参数)/action,action为动作名如status,delete,sync等",
"support_group_manage": false,
"管理ip列表": "用于管理集的ip白名单,",
"admin_ips": ["127.0.0.1"]
"admin_ips": ["127.0.0.1"],
"是否启用迁移": "默认不启用",
"enable_migrate": false
}
`
)
@ -244,6 +246,7 @@ type GloablConfig struct {
SupportGroupManage bool `json:"support_group_manage"`
AdminIps []string `json:"admin_ips"`
EnableMergeSmallFile bool `json:"enable_merge_small_file"`
EnableMigrate bool `json:"enable_migrate"`
}
func NewServer() *Server {
@ -875,6 +878,66 @@ func (this *Server) BackUpMetaDataByDate(date string) {
os.Remove(metaFileName)
}
}
func (this *Server) RepairFileInfoFromFile() {
defer func() {
if re := recover(); re != nil {
buffer := debug.Stack()
log.Error("RepairFileInfoFromFile")
log.Error(re)
log.Error(string(buffer))
}
}()
if this.lockMap.IsLock("RepairFileInfoFromFile") {
log.Warn("Lock RepairFileInfoFromFile")
return
}
this.lockMap.LockKey("RepairFileInfoFromFile")
defer this.lockMap.UnLockKey("RepairFileInfoFromFile")
handlefunc := func(file_path string, f os.FileInfo, err error) error {
var (
files []os.FileInfo
fi os.FileInfo
fileInfo FileInfo
sum string
pathMd5 string
)
if f.IsDir() {
files, err = ioutil.ReadDir(file_path)
if err != nil {
return err
}
for _, fi = range files {
if fi.IsDir() || fi.Size() == 0 {
continue
}
pathMd5 = this.util.MD5(file_path + "/" + fi.Name())
if finfo, _ := this.GetFileInfoFromLevelDB(pathMd5); finfo != nil && finfo.Md5 != "" {
continue
}
sum, err = this.util.GetFileSumByName(file_path+"/"+fi.Name(), Config().FileSumArithmetic)
if err != nil {
log.Error(err)
continue
}
fileInfo = FileInfo{
Size: fi.Size(),
Name: fi.Name(),
Path: strings.Replace(file_path, "\\", "/", -1),
Md5: sum,
TimeStamp: fi.ModTime().Unix(),
}
this.SaveFileMd5Log(&fileInfo, CONST_FILE_Md5_FILE_NAME)
}
}
return nil
}
pathname := STORE_DIR
fi, _ := os.Stat(pathname)
if fi.IsDir() {
filepath.Walk(pathname, handlefunc)
}
log.Info("RepairFileInfoFromFile is finish.")
}
func (this *Server) RepairStatByDate(date string) StatDateFileInfo {
defer func() {
if re := recover(); re != nil {
@ -2652,10 +2715,13 @@ func (this *Server) RepairFileInfo(w http.ResponseWriter, r *http.Request) {
w.Write([]byte(this.GetClusterNotPermitMessage(r)))
return
}
if !Config().EnableMigrate {
w.Write([]byte("please set enable_migrate=true"))
return
}
result.Status = "ok"
//result.Message = "repair job start,don't try again"
result.Message = "very danger , disable by system"
//go this.RepairFileInfoFromFile()
result.Message = "repair job start,don't try again,very danger "
go this.RepairFileInfoFromFile()
w.Write([]byte(this.util.JsonEncodePretty(result)))
}
func (this *Server) Reload(w http.ResponseWriter, r *http.Request) {
@ -3310,14 +3376,14 @@ func (this *Server) Main() {
}
}()
}
groupRoute:=""
groupRoute := ""
if Config().SupportGroupManage {
groupRoute="/"+Config().Group
groupRoute = "/" + Config().Group
}
if groupRoute=="" {
http.HandleFunc(fmt.Sprintf("%s", "/" ), this.Index)
if groupRoute == "" {
http.HandleFunc(fmt.Sprintf("%s", "/"), this.Index)
} else {
http.HandleFunc(fmt.Sprintf("%s", groupRoute ), this.Index)
http.HandleFunc(fmt.Sprintf("%s", groupRoute), this.Index)
}
http.HandleFunc(fmt.Sprintf("%s/check_file_exist", groupRoute), this.CheckFileExist)
http.HandleFunc(fmt.Sprintf("%s/upload", groupRoute), this.Upload)