remove auto repair

This commit is contained in:
s_jqzhang 2019-01-18 22:49:09 +08:00
parent 2bdd667def
commit 7908f4c3be

View File

@ -172,6 +172,7 @@ type FileInfo struct {
Peers []string
Scene string
TimeStamp int64
writeLog bool
}
type Status struct {
@ -228,8 +229,11 @@ type GloablConfig struct {
func NewServer() *Server {
var (
server *Server
size int64
)
size=0
server=&Server{
util: &Common{},
statMap: &CommonMap{m: make(map[string]interface{})},
@ -237,6 +241,20 @@ func NewServer() *Server {
fileset: mapset.NewSet(),
errorset:mapset.NewSet(),
}
settins:=httplib.BeegoHTTPSettings{
UserAgent: "go-fastdfs",
ConnectTimeout: 10 * time.Second,
ReadWriteTimeout: 10 * time.Second,
Gzip: true,
DumpBody: true,
}
httplib.SetDefaultSetting(settins)
server.statMap.Put(CONST_STAT_FILE_COUNT_KEY,size)
server.statMap.Put(CONST_STAT_FILE_TOTAL_SIZE_KEY,size)
server.statMap.Put(server.util.GetToDay()+"_"+CONST_STAT_FILE_COUNT_KEY,size)
server.statMap.Put(server.util.GetToDay()+"_"+CONST_STAT_FILE_TOTAL_SIZE_KEY,size)
server.curDate=server.util.GetToDay()
return server
@ -581,10 +599,6 @@ func (this *Common) GetClientIp(r *http.Request) string {
func (this *Server) RepairStat() {
var (
size int64
count int64
)
defer func() {
if re := recover(); re != nil {
buffer := debug.Stack()
@ -595,6 +609,7 @@ func (this *Server) RepairStat() {
}
}()
handlefunc := func(file_path string, f os.FileInfo, err error) error {
var (
@ -662,11 +677,13 @@ func (this *Server) RepairStat() {
return nil
}
this.statMap.Put(CONST_STAT_FILE_COUNT_KEY, count)
this.statMap.Put(CONST_STAT_FILE_TOTAL_SIZE_KEY, size)
filepath.Walk(DATA_DIR, handlefunc)
this.SaveStat()
}
@ -882,7 +899,9 @@ func (this *Server) CheckFileAndSendToPeer(filename string, is_force_upload bool
if is_force_upload {
fileInfo.Peers = []string{}
}
this.postFileToPeer(fileInfo, false)
//this.postFileToPeer(fileInfo, false)
fileInfo.writeLog=false
this.queueToPeers<-*fileInfo
}
}
@ -890,7 +909,7 @@ func (this *Server) CheckFileAndSendToPeer(filename string, is_force_upload bool
}
func (this *Server) postFileToPeer(fileInfo *FileInfo, write_log bool) {
func (this *Server) postFileToPeer(fileInfo *FileInfo) {
var (
err error
@ -967,7 +986,7 @@ func (this *Server) postFileToPeer(fileInfo *FileInfo, write_log bool) {
result, err = b.String()
if !strings.HasPrefix(result, "http://") {
if write_log {
if fileInfo.writeLog {
this.SaveFileMd5Log(fileInfo, CONST_Md5_ERROR_FILE_NAME)
}
} else {
@ -1114,10 +1133,18 @@ func (this *Server) Sync(w http.ResponseWriter, r *http.Request) {
var (
filename string
)
r.ParseForm()
if !this.IsPeer(r) {
w.Write([]byte("client must be in cluster"))
return
}
date := ""
force := ""
@ -1156,6 +1183,24 @@ func (this *Server) Sync(w http.ResponseWriter, r *http.Request) {
}
//for _,peer:=range Config().Peers {
//
// fmt.Println(peer)
//
// req:=httplib.Post(fmt.Sprintf("%s/sync",peer))
// req.Param("date",date)
// if is_force_upload {
// req.Param("force","1")
// }
// if _,err:=req.String();err!=nil {
// log.Error(err)
// }
//
//
//
//}
w.Write([]byte("job is running"))
}
@ -1195,7 +1240,7 @@ func (this *Server) SaveStat() {
if v, ok := stat[CONST_STAT_FILE_TOTAL_SIZE_KEY]; ok {
switch v.(type) {
case int64:
if v.(int64) > 0 {
if v.(int64) >=0 {
if data, err := json.Marshal(stat); err != nil {
log.Error(err)
@ -1210,11 +1255,13 @@ func (this *Server) SaveStat() {
}
for {
time.Sleep(time.Minute * 1)
SaveStatFunc()
}
SaveStatFunc()
//
//for {
//
// time.Sleep(time.Minute * 1)
// SaveStatFunc()
//}
}
@ -1497,6 +1544,9 @@ func (this *Server) SyncFile(w http.ResponseWriter, r *http.Request) {
log.Error(err)
} else {
fileInfo.Size = fi.Size()
date:=time.Unix(fileInfo.TimeStamp,0).Format("20060102")
this.statMap.AddCountInt64(date+"_"+CONST_STAT_FILE_COUNT_KEY, 1)
this.statMap.AddCountInt64(date+"_"+CONST_STAT_FILE_TOTAL_SIZE_KEY, fi.Size())
this.statMap.AddCountInt64(CONST_STAT_FILE_TOTAL_SIZE_KEY, fi.Size())
this.statMap.AddCountInt64(CONST_STAT_FILE_COUNT_KEY, 1)
@ -1818,10 +1868,13 @@ func (this *Server) Upload(w http.ResponseWriter, r *http.Request) {
}
if len(this.queueToPeers) < CONST_QUEUE_SIZE {
this.queueToPeers <- FileInfo{Name: fileInfo.Name,
Peers: []string{}, TimeStamp: fileInfo.TimeStamp,
Path: fileInfo.Path, Md5: fileInfo.Md5, ReName: fileInfo.ReName,
Size: fileInfo.Size, Scene: fileInfo.Scene}
//this.queueToPeers <- FileInfo{Name: fileInfo.Name,
// Peers: []string{}, TimeStamp: fileInfo.TimeStamp,
// Path: fileInfo.Path, Md5: fileInfo.Md5, ReName: fileInfo.ReName,
// Size: fileInfo.Size, Scene: fileInfo.Scene}
fileInfo.writeLog=true
this.queueToPeers<-*fileInfo
}
// go this.postFileToPeer(fileInfo, true)
@ -1848,6 +1901,8 @@ func (this *Server) Upload(w http.ResponseWriter, r *http.Request) {
this.SaveFileMd5Log(&fileInfo, CONST_FILE_Md5_FILE_NAME)
this.SaveStat()
p := strings.Replace(fileInfo.Path, STORE_DIR+"/", "", 1)
p = Config().Group + "/" + p + "/" + outname
download_url := fmt.Sprintf("http://%s/%s", r.Host, p)
@ -2043,7 +2098,7 @@ func (this *Server) Consumer() {
for {
fileInfo := <-this.queueToPeers
this.postFileToPeer(&fileInfo, true)
this.postFileToPeer(&fileInfo)
}
}
@ -2152,11 +2207,11 @@ func (this *Server) AutoRepair() {
}
for {
time.Sleep(time.Second*10)
AutoRepairFunc()
time.Sleep(time.Minute*60)
}
//for {
// time.Sleep(time.Second*10)
// AutoRepairFunc()
// time.Sleep(time.Minute*60)
//}
AutoRepairFunc()
@ -2235,6 +2290,16 @@ func (this *Server) Check() {
}
//func (this *Server) Repair(w http.ResponseWriter, r *http.Request) {
//
// if this.IsPeer(r) {
// this.AutoRepair()
// } else {
// w.Write([]byte("not permit"))
// }
//
//}
func (this *Server) Status(w http.ResponseWriter, r *http.Request) {
var (
@ -2441,6 +2506,8 @@ func (HttpHandler) ServeHTTP(res http.ResponseWriter, req *http.Request) {
func (this *Server) Main() {
server.RepairStat()
go func() {
for {
this.CheckFileAndSendToPeer("", false)
@ -2448,13 +2515,19 @@ func (this *Server) Main() {
this.util.RemoveEmptyDir(STORE_DIR)
}
}()
go server.RepairStat()
go this.SaveStat()
go this.Check()
go this.Consumer()
if Config().AutoRepair {
go this.AutoRepair()
}
//if Config().AutoRepair {
// go func() {
// for {
// time.Sleep(time.Second*10)
// this.AutoRepair()
// time.Sleep(time.Minute*60)
// }
// }()
//
//}
http.HandleFunc("/", this.Index)
http.HandleFunc("/check_file_exist", this.CheckFileExist)
@ -2464,6 +2537,7 @@ func (this *Server) Main() {
http.HandleFunc("/stat", this.Stat)
http.HandleFunc("/repair_stat", this.RepairStatWeb)
http.HandleFunc("/status", this.Status)
//http.HandleFunc("/repair", this.Repair)
http.HandleFunc("/syncfile", this.SyncFile)
http.HandleFunc("/get_md5s_by_date", this.GetMd5sForWeb)
http.HandleFunc("/receive_md5s", this.ReceiveMd5s)