justleveldb20190223

This commit is contained in:
s_jqzhang 2019-02-23 23:01:40 +08:00
parent 842f0ef94c
commit 39c668f3bc

View File

@ -17,6 +17,7 @@ import (
"github.com/sjqzhang/tusd"
"github.com/sjqzhang/tusd/filestore"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/util"
"io"
"io/ioutil"
slog "log"
@ -925,7 +926,7 @@ func (this *Common) GetClientIp(r *http.Request) string {
}
func (this *Server) RepairStat() {
func (this *Server) RepairStatByDate(date string) {
defer func() {
if re := recover(); re != nil {
@ -937,86 +938,26 @@ func (this *Server) RepairStat() {
}
}()
if this.lockMap.IsLock("RepairStat") {
log.Warn("Lock RepairStat")
return
}
this.lockMap.LockKey("RepairStat")
defer this.lockMap.UnLockKey("RepairStat")
this.statMap.Put(CONST_STAT_FILE_COUNT_KEY, int64(0))
this.statMap.Put(CONST_STAT_FILE_TOTAL_SIZE_KEY, int64(0))
handlefunc := func(file_path string, f os.FileInfo, err error) error {
var (
files []os.FileInfo
date []string
data []byte
content string
lines []string
count int64
totalSize int64
line string
cols []string
size int64
)
if f.IsDir() {
if files, err = ioutil.ReadDir(file_path); err != nil {
return err
}
for _, file := range files {
count = 0
size = 0
if file.Name() == CONST_FILE_Md5_FILE_NAME {
if data, err = ioutil.ReadFile(file_path + "/" + file.Name()); err != nil {
log.Error(err)
continue
}
date = this.util.Match("\\d{8}", file_path)
if len(date) < 1 {
continue
}
content = string(data)
lines = strings.Split(content, "\n")
count = int64(len(lines))
if count > 1 {
count = count - 1
}
count = 0
for _, line = range lines {
cols = strings.Split(line, "|")
if len(cols) > 2 {
count = count + 1
if size, err = strconv.ParseInt(cols[1], 10, 64); err != nil {
size = 0
continue
}
totalSize = totalSize + size
}
}
this.statMap.Put(date[0]+"_"+CONST_STAT_FILE_COUNT_KEY, count)
this.statMap.Put(date[0]+"_"+CONST_STAT_FILE_TOTAL_SIZE_KEY, totalSize)
this.statMap.AddCountInt64(CONST_STAT_FILE_COUNT_KEY, count)
this.statMap.AddCountInt64(CONST_STAT_FILE_TOTAL_SIZE_KEY, totalSize)
}
}
var (
err error
keyPrefix string
fileInfo FileInfo
fileCount int64
fileSize int64
)
keyPrefix = "%s_%s_"
keyPrefix = fmt.Sprintf(keyPrefix, date, CONST_FILE_Md5_FILE_NAME)
iter := server.ldb.NewIterator(util.BytesPrefix([]byte(keyPrefix)), nil)
for iter.Next() {
if err = json.Unmarshal(iter.Value(), &fileInfo); err != nil {
continue
}
return nil
fileCount = fileCount + 1
fileSize = fileSize + fileInfo.Size
}
filepath.Walk(DATA_DIR, handlefunc)
iter.Release()
this.statMap.Put(date+"_"+CONST_STAT_FILE_COUNT_KEY, fileCount)
this.statMap.Put(date+"_"+CONST_STAT_FILE_TOTAL_SIZE_KEY, fileSize)
this.SaveStat()
}
@ -1044,73 +985,6 @@ func (this *Server) CheckFileExistByMd5(md5s string, fileInfo *FileInfo) bool {
}
func (this *Server) RepairFileInfoFromFile() {
defer func() {
if re := recover(); re != nil {
buffer := debug.Stack()
log.Error("RepairFileInfoFromFile")
log.Error(re)
log.Error(string(buffer))
}
}()
if this.lockMap.IsLock("RepairFileInfoFromFile") {
log.Warn("Lock RepairFileInfoFromFile")
return
}
this.lockMap.LockKey("RepairFileInfoFromFile")
defer this.lockMap.UnLockKey("RepairFileInfoFromFile")
handlefunc := func(file_path string, f os.FileInfo, err error) error {
var (
files []os.FileInfo
fi os.FileInfo
fileInfo FileInfo
sum string
)
if f.IsDir() {
files, err = ioutil.ReadDir(file_path)
if err != nil {
return err
}
for _, fi = range files {
if fi.IsDir() || fi.Size() == 0 {
continue
}
sum, err = this.util.GetFileSumByName(file_path+"/"+fi.Name(), Config().FileSumArithmetic)
if err != nil {
log.Error(err)
continue
}
fileInfo = FileInfo{
Size: fi.Size(),
Name: fi.Name(),
Path: strings.Replace(file_path, "\\", "/", -1),
Md5: sum,
TimeStamp: fi.ModTime().Unix(),
}
this.SaveFileMd5Log(&fileInfo, CONST_FILE_Md5_FILE_NAME)
}
}
return nil
}
pathname := STORE_DIR
fi, _ := os.Stat(pathname)
if fi.IsDir() {
filepath.Walk(pathname, handlefunc)
}
}
func (this *Server) DownloadFromPeer(peer string, fileInfo *FileInfo) {
var (
err error
@ -1435,12 +1309,6 @@ func (this *Server) CheckFileAndSendToPeer(date string, filename string, isForce
}
}()
//iter:=server.ldb.NewIterator(util.BytesPrefix([]byte(date+"_")),nil)
//for iter.Next() {
// fmt.Println(string(iter.Key()),string(iter.Value()))
//}
//iter.Release()
if md5set, err = this.GetMd5sByDate(date, filename); err != nil {
log.Error(err)
return
@ -1598,16 +1466,10 @@ func (this *Server) SaveFileMd5Log(fileInfo *FileInfo, filename string) {
func (this *Server) saveFileMd5Log(fileInfo *FileInfo, filename string) {
var (
err error
msg string
tmpFile *os.File
logpath string
outname string
logDate string
ok bool
sumKey string
sumset mapset.Set
fullpath string
v interface{}
md5Path string
logKey string
)
@ -1627,49 +1489,13 @@ func (this *Server) saveFileMd5Log(fileInfo *FileInfo, filename string) {
logDate = this.util.GetDayFromTimeStamp(fileInfo.TimeStamp)
sumKey = fmt.Sprintf("%s_%s", logDate, filename)
this.lockMap.LockKey(sumKey)
defer this.lockMap.UnLockKey(sumKey)
if v, ok = this.sumMap.GetValue(sumKey); !ok {
if sumset, err = this.GetMd5sByDate(logDate, filename); err != nil {
log.Error(err)
}
if sumset != nil {
this.sumMap.Put(sumKey, sumset)
}
} else {
sumset = v.(mapset.Set)
if sumset.Cardinality() == 0 {
sumset, err = this.GetMd5sByDate(logDate, filename)
}
}
//if sumset.Contains(fileInfo.Md5) { // remove from xxx
// return
//}
outname = fileInfo.Name
if fileInfo.ReName != "" {
outname = fileInfo.ReName
}
fullpath = fileInfo.Path + "/" + outname
logpath = DATA_DIR + "/" + time.Unix(fileInfo.TimeStamp, 0).Format("20060102")
if _, err = os.Stat(logpath); err != nil {
os.MkdirAll(logpath, 0775)
}
if !sumset.Contains(fileInfo.Md5) {
msg = fmt.Sprintf("%s|%d|%d|%s\n", fileInfo.Md5, fileInfo.Size, fileInfo.TimeStamp, fullpath)
if tmpFile, err = os.OpenFile(logpath+"/"+filename, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0644); err != nil {
log.Error(err)
return
}
defer tmpFile.Close()
tmpFile.WriteString(msg)
}
logKey = fmt.Sprintf("%s_%s", logDate, fileInfo.Md5)
logKey = fmt.Sprintf("%s_%s_%s", logDate, filename, fileInfo.Md5)
if filename == CONST_FILE_Md5_FILE_NAME {
if ok, err = this.IsExistFromLevelDB(fileInfo.Md5); !ok {
@ -1690,6 +1516,7 @@ func (this *Server) saveFileMd5Log(fileInfo *FileInfo, filename string) {
this.SaveStat()
}
return
}
if filename == CONST_REMOME_Md5_FILE_NAME {
if ok, err = this.IsExistFromLevelDB(fileInfo.Md5); ok {
@ -1707,9 +1534,10 @@ func (this *Server) saveFileMd5Log(fileInfo *FileInfo, filename string) {
this.statMap.AddCountInt64(CONST_STAT_FILE_COUNT_KEY, -1)
this.SaveStat()
}
return
}
sumset.Add(fileInfo.Md5)
this.SaveFileInfoToLevelDB(logKey, fileInfo)
}
@ -2097,70 +1925,23 @@ func (this *Server) GetMd5sMapByDate(date string, filename string) (*CommonMap,
}
func (this *Server) GetMd5sByDate(date string, filename string) (mapset.Set, error) {
var (
err error
result mapset.Set
fpath string
content string
lines []string
line string
cols []string
data []byte
sumkey string
ok bool
mds []interface{}
v interface{}
keyPrefix string
md5set mapset.Set
keys []string
)
sumkey = fmt.Sprintf("%s_%s", date, filename)
if v, ok = this.sumMap.GetValue(sumkey); ok {
result = v.(mapset.Set)
if result.Cardinality() > 0 {
return result, nil
}
}
result = mapset.NewSet()
if filename == "" {
fpath = DATA_DIR + "/" + date + "/" + CONST_FILE_Md5_FILE_NAME
} else {
fpath = DATA_DIR + "/" + date + "/" + filename
}
if !this.util.FileExists(fpath) {
os.MkdirAll(DATA_DIR+"/"+date, 0775)
log.Warn(fmt.Sprintf("fpath %s not found", fpath))
return result, nil
}
if data, err = ioutil.ReadFile(fpath); err != nil {
return result, err
}
content = string(data)
lines = strings.Split(content, "\n")
if len(lines) > 0 {
mds = make([]interface{}, len(lines)-1)
} else {
return result, nil
}
for _, line = range lines {
cols = strings.Split(line, "|")
if len(cols) > 2 {
if _, err = strconv.ParseInt(cols[1], 10, 64); err != nil {
continue
}
mds = append(mds, cols[0])
md5set = mapset.NewSet()
keyPrefix = "%s_%s_"
keyPrefix = fmt.Sprintf(keyPrefix, date, filename)
iter := server.ldb.NewIterator(util.BytesPrefix([]byte(keyPrefix)), nil)
for iter.Next() {
keys=strings.Split(string(iter.Key()),"_")
if len(keys)>=3 {
md5set.Add(keys[2])
}
}
result = mapset.NewSetFromSlice(mds)
this.sumMap.Put(sumkey, result)
return result, nil
iter.Release()
return md5set, nil
}
func (this *Server) SyncFileInfo(w http.ResponseWriter, r *http.Request) {
@ -2202,115 +1983,6 @@ func (this *Server) SyncFileInfo(w http.ResponseWriter, r *http.Request) {
}
func (this *Server) SyncFile(w http.ResponseWriter, r *http.Request) {
var (
err error
outPath string
//outname string
// timestamp string
fileInfo FileInfo
tmpFile *os.File
uploadFile multipart.File
)
if !this.IsPeer(r) {
log.Error(fmt.Sprintf(" not is peer,ip:%s", this.util.GetClientIp(r)))
w.Write([]byte(this.GetClusterNotPermitMessage(r)))
return
}
if r.Method == "POST" {
fileInfo.Path = r.Header.Get("Sync-Path")
fileInfo.Md5 = r.PostFormValue("md5")
fileInfo.Name = r.PostFormValue("name")
fileInfo.Scene = r.PostFormValue("scene")
fileInfo.Size, err = strconv.ParseInt(r.PostFormValue("size"), 10, 64)
fileInfo.TimeStamp, err = strconv.ParseInt(r.PostFormValue("timestamp"), 10, 64)
if err != nil {
fileInfo.TimeStamp = time.Now().Unix()
log.Error(err)
}
if uploadFile, _, err = r.FormFile("file"); err != nil {
w.Write([]byte(err.Error()))
log.Error(err)
return
}
fileInfo.Peers = []string{}
defer uploadFile.Close()
os.MkdirAll(DOCKER_DIR+fileInfo.Path, 0775)
outPath = fileInfo.Path + "/" + fileInfo.Name
sum := ""
if this.util.FileExists(outPath) {
if tmpFile, err = os.Open(outPath); err != nil {
log.Error(err)
w.Write([]byte(err.Error()))
return
}
sum = this.util.GetFileSum(tmpFile, Config().FileSumArithmetic)
if sum != fileInfo.Md5 {
tmpFile.Close()
log.Error("md5 !=fileInfo.Md5 ")
w.Write([]byte("md5 !=fileInfo.Md5 "))
return
}
}
if tmpFile, err = os.Create(outPath); err != nil {
log.Error(err)
w.Write([]byte(err.Error()))
return
}
defer tmpFile.Close()
if _, err = io.Copy(tmpFile, uploadFile); err != nil {
w.Write([]byte(err.Error()))
log.Error(err)
return
}
sum = this.util.GetFileSum(tmpFile, Config().FileSumArithmetic)
if sum != fileInfo.Md5 {
log.Error("md5 error")
w.Write([]byte("md5 error"))
tmpFile.Close()
os.Remove(outPath)
return
}
if fileInfo.Peers == nil {
fileInfo.Peers = []string{fmt.Sprintf("http://%s", r.Host)}
}
if _, err = this.SaveFileInfoToLevelDB(this.util.MD5(outPath), &fileInfo); err != nil {
log.Error(err)
}
if _, err = this.SaveFileInfoToLevelDB(fileInfo.Md5, &fileInfo); err != nil {
log.Error(err)
}
if this.util.IsExist(outPath) {
this.SaveFileMd5Log(&fileInfo, CONST_FILE_Md5_FILE_NAME)
}
p := strings.Replace(fileInfo.Path, STORE_DIR+"/", "", 1)
downloadUrl := fmt.Sprintf("http://%s/%s", r.Host, Config().Group+"/"+p+"/"+fileInfo.Name)
w.Write([]byte(downloadUrl))
}
}
func (this *Server) CheckScene(scene string) (bool, error) {
if len(Config().Scenes) == 0 {
@ -2857,8 +2529,13 @@ func (this *Server) RepairStatWeb(w http.ResponseWriter, r *http.Request) {
var (
result JsonResult
date string
)
this.RepairStat()
date=r.FormValue("date")
if date=="" {
date=this.util.GetToDay()
}
this.RepairStatByDate(date)
result.Status = "ok"
w.Write([]byte(this.util.JsonEncodePretty(result)))
@ -3861,7 +3538,7 @@ func (this *Server) FormatStatInfo() {
}
} else {
this.RepairStat()
this.RepairStatByDate(this.util.GetToDay())
}
}
@ -3978,7 +3655,6 @@ func (this *Server) Main() {
http.HandleFunc(fmt.Sprintf("/%s/repair", Config().Group), this.Repair)
http.HandleFunc(fmt.Sprintf("/%s/repair_fileinfo", Config().Group), this.RepairFileInfo)
http.HandleFunc(fmt.Sprintf("/%s/reload", Config().Group), this.Reload)
http.HandleFunc(fmt.Sprintf("/%s/syncfile", Config().Group), this.SyncFile)
http.HandleFunc(fmt.Sprintf("/%s/syncfile_info", Config().Group), this.SyncFileInfo)
http.HandleFunc(fmt.Sprintf("/%s/get_md5s_by_date", Config().Group), this.GetMd5sForWeb)
http.HandleFunc(fmt.Sprintf("/%s/receive_md5s", Config().Group), this.ReceiveMd5s)
@ -3995,7 +3671,6 @@ func (this *Server) Main() {
http.HandleFunc("/repair", this.Repair)
http.HandleFunc("/repair_fileinfo", this.RepairFileInfo)
http.HandleFunc("/reload", this.Reload)
http.HandleFunc("/syncfile", this.SyncFile)
http.HandleFunc("/syncfile_info", this.SyncFileInfo)
http.HandleFunc("/get_md5s_by_date", this.GetMd5sForWeb)
http.HandleFunc("/receive_md5s", this.ReceiveMd5s)