optimization logic

This commit is contained in:
s_jqzhang 2019-01-20 11:05:22 +08:00
parent 9482388288
commit a7bb23f799

View File

@ -156,6 +156,7 @@ type Server struct {
fileset mapset.Set
errorset mapset.Set
curDate string
}
type FileInfo struct {
@ -223,11 +224,8 @@ type GloablConfig struct {
func NewServer() *Server {
var (
server *Server
)
server = &Server{
util: &Common{},
statMap: &CommonMap{m: make(map[string]interface{})},
@ -244,9 +242,12 @@ func NewServer() *Server {
}
httplib.SetDefaultSetting(settins)
server.statMap.Put(CONST_STAT_FILE_COUNT_KEY, int64(0))
server.statMap.Put(CONST_STAT_FILE_TOTAL_SIZE_KEY, int64(0))
server.statMap.Put(server.util.GetToDay()+"_"+CONST_STAT_FILE_COUNT_KEY, int64(0))
server.statMap.Put(server.util.GetToDay()+"_"+CONST_STAT_FILE_TOTAL_SIZE_KEY, int64(0))
server.statMap.Put(CONST_STAT_FILE_TOTAL_SIZE_KEY, int64(0))
server.statMap.Put(server.util.GetToDay()+"_"+CONST_STAT_FILE_COUNT_KEY, int64(0))
server.statMap.Put(server.util.GetToDay()+"_"+CONST_STAT_FILE_TOTAL_SIZE_KEY, int64(0))
server.errorset,_=server.GetMd5sByDate(server.util.GetToDay(),CONST_Md5_ERROR_FILE_NAME)
server.curDate = server.util.GetToDay()
@ -865,6 +866,11 @@ func (this *Server) GetServerURI(r *http.Request) string {
func (this *Server) CheckFileAndSendToPeer(date string, filename string, is_force_upload bool) {
var (
fpath string
md5s []string
)
defer func() {
if re := recover(); re != nil {
buffer := debug.Stack()
@ -874,35 +880,43 @@ func (this *Server) CheckFileAndSendToPeer(date string, filename string, is_forc
}
}()
filename = DATA_DIR + "/" +date+ "/" + filename
if date == this.util.GetToDay() && filename == CONST_Md5_ERROR_FILE_NAME {
if date==this.util.GetToDay() && filename==CONST_Md5_ERROR_FILE_NAME {
for md:=range this.errorset.Iter() {
for md := range this.errorset.Iter() {
if fileInfo, _ := this.GetFileInfoByMd5(md.(string)); fileInfo != nil && fileInfo.Md5 != "" {
if is_force_upload {
fileInfo.Peers = []string{}
}
this.queueToPeers <- *fileInfo
if fileInfo, _ := this.GetFileInfoFromLevelDB(md.(string)); fileInfo != nil && fileInfo.Md5 != "" {
if is_force_upload {
fileInfo.Peers = []string{}
}
if len(fileInfo.Peers) > len(Config().Peers) {
md5s=append(md5s,md.(string))
continue
}
this.queueToPeers <- *fileInfo
}
return
}
for _,md:=range md5s {
this.errorset.Remove(md)
}
if data, err := ioutil.ReadFile(filename); err == nil {
return
}
fpath = DATA_DIR + "/" + date + "/" + filename
if data, err := ioutil.ReadFile(fpath); err == nil {
content := string(data)
lines := strings.Split(content, "\n")
for _, line := range lines {
cols := strings.Split(line, "|")
if fileInfo, _ := this.GetFileInfoByMd5(cols[0]); fileInfo != nil && fileInfo.Md5 != "" {
if fileInfo, _ := this.GetFileInfoFromLevelDB(cols[0]); fileInfo != nil && fileInfo.Md5 != "" {
if is_force_upload {
fileInfo.Peers = []string{}
}
@ -925,7 +939,6 @@ func (this *Server) postFileToPeer(fileInfo *FileInfo) {
info *FileInfo
postURL string
result string
data []byte
fi os.FileInfo
i int
)
@ -969,6 +982,12 @@ func (this *Server) postFileToPeer(fileInfo *FileInfo) {
if info, err = this.checkPeerFileExist(peer, fileInfo.Md5); info.Md5 != "" {
fileInfo.Peers = append(fileInfo.Peers, peer)
if _, err = this.SaveFileInfoToLevelDB(fileInfo.Md5, fileInfo); err != nil {
log.Error(err)
}
continue
}
@ -980,27 +999,28 @@ func (this *Server) postFileToPeer(fileInfo *FileInfo) {
b.Param("name", filename)
b.Param("md5", fileInfo.Md5)
b.Param("scene", fileInfo.Scene)
b.Param("size", fmt.Sprintf("%d",fileInfo.Size))
b.Param("size", fmt.Sprintf("%d", fileInfo.Size))
b.Param("timestamp", fmt.Sprintf("%d", fileInfo.TimeStamp))
b.PostFile("file", fileInfo.Path+"/"+filename)
b.Debug(true) //fuck this is a bug
result, err = b.String()
if !strings.HasPrefix(result, "http://") {
if !strings.HasPrefix(result, "http://") || err != nil {
this.SaveFileMd5Log(fileInfo, CONST_Md5_ERROR_FILE_NAME)
this.SaveFileMd5Log(fileInfo, CONST_Md5_ERROR_FILE_NAME)
} else {
}
if strings.HasPrefix(result, "http://") {
log.Info(result)
if !this.util.Contains(peer, fileInfo.Peers) {
fileInfo.Peers = append(fileInfo.Peers, peer)
if data, err = json.Marshal(fileInfo); err != nil {
if _, err = this.SaveFileInfoToLevelDB(fileInfo.Md5, fileInfo); err != nil {
log.Error(err)
return
}
this.ldb.Put([]byte(fileInfo.Md5), data, nil)
}
}
@ -1035,13 +1055,13 @@ func (this *Server) SaveFileMd5Log(fileInfo *FileInfo, filename string) {
this.fileset.Clear()
}
if logDate == toDay && filename == CONST_Md5_ERROR_FILE_NAME &&
this.errorset.Cardinality() == 0 &&
this.util.IsExist(DATA_DIR+"/"+toDay+"/"+CONST_Md5_ERROR_FILE_NAME) {
this.errorset, err = this.GetMd5sByDate(logDate, CONST_Md5_ERROR_FILE_NAME)
}
//if logDate == toDay && filename == CONST_Md5_ERROR_FILE_NAME &&
// this.errorset.Cardinality() == 0 &&
// this.util.IsExist(DATA_DIR+"/"+toDay+"/"+CONST_Md5_ERROR_FILE_NAME) {
//
// this.errorset, err = this.GetMd5sByDate(logDate, CONST_Md5_ERROR_FILE_NAME)
//
//}
if logDate != this.util.GetToDay() && filename == CONST_FILE_Md5_FILE_NAME {
@ -1094,24 +1114,6 @@ func (this *Server) SaveFileMd5Log(fileInfo *FileInfo, filename string) {
}
func (this *Server) GetFileInfoByMd5(md5sum string) (*FileInfo, error) {
var (
data []byte
err error
fileInfo FileInfo
)
if data, err = this.ldb.Get([]byte(md5sum), nil); err != nil {
return nil, err
} else {
if err = json.Unmarshal(data, &fileInfo); err == nil {
return &fileInfo, nil
} else {
return nil, err
}
}
}
func (this *Server) checkPeerFileExist(peer string, md5sum string) (*FileInfo, error) {
var (
@ -1149,7 +1151,7 @@ func (this *Server) CheckFileExist(w http.ResponseWriter, r *http.Request) {
return
}
if fileInfo, err = this.GetFileInfoByMd5(md5sum); fileInfo != nil {
if fileInfo, err = this.GetFileInfoFromLevelDB(md5sum); fileInfo != nil {
if data, err = json.Marshal(fileInfo); err == nil {
w.Write(data)
@ -1165,7 +1167,6 @@ func (this *Server) CheckFileExist(w http.ResponseWriter, r *http.Request) {
func (this *Server) Sync(w http.ResponseWriter, r *http.Request) {
r.ParseForm()
if !this.IsPeer(r) {
@ -1194,14 +1195,11 @@ func (this *Server) Sync(w http.ResponseWriter, r *http.Request) {
if is_force_upload {
go this.CheckFileAndSendToPeer(date, CONST_FILE_Md5_FILE_NAME, is_force_upload)
go this.CheckFileAndSendToPeer(date, CONST_FILE_Md5_FILE_NAME, is_force_upload)
} else {
go this.CheckFileAndSendToPeer(date, CONST_Md5_ERROR_FILE_NAME, is_force_upload)
go this.CheckFileAndSendToPeer(date, CONST_Md5_ERROR_FILE_NAME, is_force_upload)
}
@ -1233,6 +1231,7 @@ func (this *Server) GetFileInfoFromLevelDB(key string) (*FileInfo, error) {
fileInfo FileInfo
)
if data, err = this.ldb.Get([]byte(key), nil); err != nil {
return nil, err
}
@ -1240,6 +1239,7 @@ func (this *Server) GetFileInfoFromLevelDB(key string) (*FileInfo, error) {
if err = json.Unmarshal(data, &fileInfo); err != nil {
return nil, err
}
return &fileInfo, nil
}
@ -1260,7 +1260,7 @@ func (this *Server) SaveStat() {
stat := this.statMap.Get()
if v, ok := stat[CONST_STAT_FILE_COUNT_KEY]; ok {
switch v.(type) {
case int64,int32,int,float64,float32:
case int64, int32, int, float64, float32:
if v.(int64) >= 0 {
if data, err := json.Marshal(stat); err != nil {
@ -1275,11 +1275,9 @@ func (this *Server) SaveStat() {
}
}
SaveStatFunc()
}
func (this *Server) SaveFileInfoToLevelDB(key string, fileInfo *FileInfo) (*FileInfo, error) {
@ -1288,6 +1286,9 @@ func (this *Server) SaveFileInfoToLevelDB(key string, fileInfo *FileInfo) (*File
data []byte
)
if data, err = json.Marshal(fileInfo); err != nil {
return fileInfo, err
@ -1350,7 +1351,7 @@ func (this *Server) ReceiveMd5s(w http.ResponseWriter, r *http.Request) {
}
for m := range mdfs.Iter() {
if fileInfo, err = this.GetFileInfoByMd5(m.(string)); err != nil {
if fileInfo, err = this.GetFileInfoFromLevelDB(m.(string)); err != nil {
log.Error(err)
continue
}
@ -1478,12 +1479,10 @@ func (this *Server) SyncFile(w http.ResponseWriter, r *http.Request) {
fileInfo.Path = r.Header.Get("Sync-Path")
fileInfo.Md5 = r.PostFormValue("md5")
fileInfo.Name = r.PostFormValue("name")
fileInfo.Scene= r.PostFormValue("scene")
fileInfo.Size,err = strconv.ParseInt(r.PostFormValue("size"), 10, 64)
fileInfo.Scene = r.PostFormValue("scene")
fileInfo.Size, err = strconv.ParseInt(r.PostFormValue("size"), 10, 64)
fileInfo.TimeStamp, err = strconv.ParseInt(r.PostFormValue("timestamp"), 10, 64)
if err != nil {
fileInfo.TimeStamp = time.Now().Unix()
log.Error(err)
@ -1553,8 +1552,6 @@ func (this *Server) SyncFile(w http.ResponseWriter, r *http.Request) {
}
if fileInfo.Peers == nil {
fileInfo.Peers = []string{fmt.Sprintf("http://%s", r.Host)}
}
@ -1606,7 +1603,7 @@ func (this *Server) RemoveFile(w http.ResponseWriter, r *http.Request) {
w.Write([]byte("md5 unvalid"))
return
}
if fileInfo, err = this.GetFileInfoByMd5(md5sum); err != nil {
if fileInfo, err = this.GetFileInfoFromLevelDB(md5sum); err != nil {
w.Write([]byte(err.Error()))
return
}
@ -1712,6 +1709,7 @@ func (this *Server) Upload(w http.ResponseWriter, r *http.Request) {
err error
outFile *os.File
folder string
fi os.FileInfo
)
defer file.Close()
@ -1775,6 +1773,12 @@ func (this *Server) Upload(w http.ResponseWriter, r *http.Request) {
return fileInfo, errors.New("(error)fail," + err.Error())
}
if fi, err = outFile.Stat(); err != nil {
log.Error(err)
} else {
fileInfo.Size = fi.Size()
}
v := this.util.GetFileMd5(outFile)
fileInfo.Md5 = v
fileInfo.Path = folder
@ -1877,7 +1881,6 @@ func (this *Server) Upload(w http.ResponseWriter, r *http.Request) {
// Path: fileInfo.Path, Md5: fileInfo.Md5, ReName: fileInfo.ReName,
// Size: fileInfo.Size, Scene: fileInfo.Scene}
this.queueToPeers <- *fileInfo
}
@ -1905,7 +1908,6 @@ func (this *Server) Upload(w http.ResponseWriter, r *http.Request) {
this.SaveFileMd5Log(&fileInfo, CONST_FILE_Md5_FILE_NAME)
p := strings.Replace(fileInfo.Path, STORE_DIR+"/", "", 1)
p = Config().Group + "/" + p + "/" + outname
download_url := fmt.Sprintf("http://%s/%s", r.Host, p)
@ -1991,7 +1993,7 @@ func (this *Server) BenchMark(w http.ResponseWriter, r *http.Request) {
}
//fmt.Println(server.GetFileInfoByMd5(s))
//fmt.Println(server.GetFileInfoFromLevelDB(s))
}
@ -2124,8 +2126,8 @@ func (this *Server) AutoRepair(force_repair bool) {
localSet mapset.Set
remoteSet mapset.Set
allSet mapset.Set
tmpSet mapset.Set
fileInfo *FileInfo
tmpSet mapset.Set
fileInfo *FileInfo
)
defer func() {
@ -2175,7 +2177,7 @@ func (this *Server) AutoRepair(force_repair bool) {
if md5s, err = req.String(); err != nil {
continue
}
if localSet, err = this.GetMd5sByDate(dateStat.Date,CONST_FILE_Md5_FILE_NAME); err != nil {
if localSet, err = this.GetMd5sByDate(dateStat.Date, CONST_FILE_Md5_FILE_NAME); err != nil {
log.Error(err)
continue
}
@ -2186,12 +2188,12 @@ func (this *Server) AutoRepair(force_repair bool) {
req.SetTimeout(time.Second*5, time.Second*5)
req.Param("md5s", md5s)
req.String()
tmpSet=allSet.Difference(remoteSet)
for v:=range tmpSet.Iter() {
if fileInfo,err=this.GetFileInfoByMd5(v.(string));err!=nil {
tmpSet = allSet.Difference(remoteSet)
for v := range tmpSet.Iter() {
if fileInfo, err = this.GetFileInfoFromLevelDB(v.(string)); err != nil {
continue
}
this.queueToPeers<-*fileInfo
this.queueToPeers <- *fileInfo
}
//Update(peer,dateStat)
@ -2287,13 +2289,13 @@ func (this *Server) Check() {
func (this *Server) Repair(w http.ResponseWriter, r *http.Request) {
var (
force string
force string
force_repair bool
)
r.ParseForm()
force= r.FormValue("force")
if force=="1" {
force_repair=true
force = r.FormValue("force")
if force == "1" {
force_repair = true
}
if this.IsPeer(r) {
this.AutoRepair(force_repair)
@ -2415,7 +2417,7 @@ func (this *Server) initComponent(is_reload bool) {
var peers []string
for _, peer := range Config().Peers {
if this.util.Contains(ip, ex.FindAllString(peer, -1)) ||
this.util.Contains("127.0.0.1", ex.FindAllString(peer, -1)) {
this.util.Contains("127.0.0.1", ex.FindAllString(peer, -1)) {
continue
}
if strings.HasPrefix(peer, "http") {
@ -2512,9 +2514,11 @@ func (HttpHandler) ServeHTTP(res http.ResponseWriter, req *http.Request) {
func (this *Server) Main() {
go func() {
for {
this.CheckFileAndSendToPeer( this.util.GetToDay(),CONST_Md5_ERROR_FILE_NAME, false)
this.CheckFileAndSendToPeer(this.util.GetToDay(), CONST_Md5_ERROR_FILE_NAME, false)
time.Sleep(time.Second * time.Duration(Config().RefreshInterval))
this.util.RemoveEmptyDir(STORE_DIR)
}
@ -2525,9 +2529,9 @@ func (this *Server) Main() {
if Config().AutoRepair {
go func() {
for {
time.Sleep(time.Minute*3)
time.Sleep(time.Minute * 3)
this.AutoRepair(false)
time.Sleep(time.Minute*60)
time.Sleep(time.Minute * 60)
}
}()