add queue protect

This commit is contained in:
s_jqzhang 2019-02-28 11:59:34 +08:00
parent b0b3e795ef
commit 2fe605f699

View File

@ -942,6 +942,7 @@ func (this *Server) RepairFileInfoFromFile() {
}
//log.Info(fileInfo)
log.Info(file_path, fi.Name())
//this.AppendToQueue(&fileInfo)
this.postFileToPeer(&fileInfo)
this.SaveFileMd5Log(&fileInfo, CONST_FILE_Md5_FILE_NAME)
}
@ -1870,6 +1871,7 @@ func (this *Server) SyncFileInfo(w http.ResponseWriter, r *http.Request) {
}
p := strings.Replace(fileInfo.Path, STORE_DIR+"/", "", 1)
downloadUrl := fmt.Sprintf("http://%s/%s", r.Host, Config().Group+"/"+p+"/"+filename)
log.Info("SyncFileInfo: ", downloadUrl)
w.Write([]byte(downloadUrl))
}
func (this *Server) CheckScene(scene string) (bool, error) {
@ -2451,9 +2453,16 @@ func (this *Server) RegisterExit() {
}()
}
func (this *Server) AppendToQueue(fileInfo *FileInfo) {
for (len(this.queueToPeers) + CONST_QUEUE_SIZE/10) > CONST_QUEUE_SIZE {
time.Sleep(time.Second * 1)
}
this.queueToPeers <- *fileInfo
}
func (this *Server) AppendToDownloadQueue(fileInfo *FileInfo) {
for (len(this.queueFromPeers) + CONST_QUEUE_SIZE/10) > CONST_QUEUE_SIZE {
time.Sleep(time.Second * 1)
}
this.queueFromPeers <- *fileInfo
}
func (this *Server) ConsumerDownLoad() {
@ -2678,7 +2687,8 @@ func (this *Server) LoadQueue() {
log.Error(err)
} else {
for fileInfo := range queue.Iter() {
this.queueFromPeers <- *fileInfo.(*FileInfo)
//this.queueFromPeers <- *fileInfo.(*FileInfo)
this.AppendToDownloadQueue(fileInfo.(*FileInfo))
}
}
}
@ -2920,6 +2930,7 @@ func (this *Server) Status(w http.ResponseWriter, r *http.Request) {
sts = make(map[string]interface{})
sts["Fs.QueueFromPeers"] = len(this.queueFromPeers)
sts["Fs.QueueToPeers"] = len(this.queueToPeers)
sts["Fs.QueueFileLog"] = len(this.queueFileLog)
for _, k := range []string{CONST_FILE_Md5_FILE_NAME, CONST_Md5_ERROR_FILE_NAME, CONST_Md5_QUEUE_FILE_NAME} {
k2 := fmt.Sprintf("%s_%s", today, k)
if v, ok = this.sumMap.GetValue(k2); ok {
@ -3028,7 +3039,7 @@ func (this *Server) Index(w http.ResponseWriter, r *http.Request) {
uppy = string(data)
}
} else {
this.util.WriteFile(uppyFileName,uppy)
this.util.WriteFile(uppyFileName, uppy)
}
fmt.Fprintf(w,
fmt.Sprintf(uppy, uploadUrl, Config().DefaultScene, uploadBigUrl))
@ -3269,6 +3280,7 @@ func (this *Server) initTus() {
log.Error(err)
continue
}
log.Info(fileInfo)
os.Remove(infoFullPath)
if _, err = this.SaveFileInfoToLevelDB(info.ID, fileInfo, this.ldb); err != nil { //assosiate file id
log.Error(err)