mirror of
https://gitee.com/sjqzhang/go-fastdfs.git
synced 2024-11-30 02:07:45 +08:00
format code and GetAndSendToPeer optimlier
This commit is contained in:
parent
893ad9d88a
commit
149cc42cd9
421
fileserver.go
421
fileserver.go
@ -42,8 +42,7 @@ import (
|
||||
|
||||
var staticHandler http.Handler
|
||||
|
||||
var server =NewServer()
|
||||
|
||||
var server = NewServer()
|
||||
|
||||
var logacc log.LoggerInterface
|
||||
|
||||
@ -156,13 +155,9 @@ type Server struct {
|
||||
queueToPeers chan FileInfo
|
||||
fileset mapset.Set
|
||||
errorset mapset.Set
|
||||
curDate string
|
||||
|
||||
|
||||
curDate string
|
||||
}
|
||||
|
||||
|
||||
|
||||
type FileInfo struct {
|
||||
Name string
|
||||
ReName string
|
||||
@ -172,7 +167,7 @@ type FileInfo struct {
|
||||
Peers []string
|
||||
Scene string
|
||||
TimeStamp int64
|
||||
writeLog bool
|
||||
writeLog bool
|
||||
}
|
||||
|
||||
type Status struct {
|
||||
@ -223,25 +218,25 @@ type GloablConfig struct {
|
||||
DownloadUseToken bool `json:"download_use_token"`
|
||||
DownloadTokenExpire int `json:"download_token_expire"`
|
||||
QueueSize int `json:"queue_size"`
|
||||
AutoRepair bool `json:"auto_repair"`
|
||||
AutoRepair bool `json:"auto_repair"`
|
||||
}
|
||||
|
||||
func NewServer() *Server {
|
||||
func NewServer() *Server {
|
||||
var (
|
||||
server *Server
|
||||
size int64
|
||||
size int64
|
||||
)
|
||||
|
||||
size=0
|
||||
size = 0
|
||||
|
||||
server=&Server{
|
||||
server = &Server{
|
||||
util: &Common{},
|
||||
statMap: &CommonMap{m: make(map[string]interface{})},
|
||||
queueToPeers: make(chan FileInfo, CONST_QUEUE_SIZE),
|
||||
fileset: mapset.NewSet(),
|
||||
errorset:mapset.NewSet(),
|
||||
errorset: mapset.NewSet(),
|
||||
}
|
||||
settins:=httplib.BeegoHTTPSettings{
|
||||
settins := httplib.BeegoHTTPSettings{
|
||||
UserAgent: "go-fastdfs",
|
||||
ConnectTimeout: 10 * time.Second,
|
||||
ReadWriteTimeout: 10 * time.Second,
|
||||
@ -249,17 +244,15 @@ func NewServer() *Server {
|
||||
DumpBody: true,
|
||||
}
|
||||
httplib.SetDefaultSetting(settins)
|
||||
server.statMap.Put(CONST_STAT_FILE_COUNT_KEY,size)
|
||||
server.statMap.Put(CONST_STAT_FILE_TOTAL_SIZE_KEY,size)
|
||||
server.statMap.Put(server.util.GetToDay()+"_"+CONST_STAT_FILE_COUNT_KEY,size)
|
||||
server.statMap.Put(server.util.GetToDay()+"_"+CONST_STAT_FILE_TOTAL_SIZE_KEY,size)
|
||||
server.statMap.Put(CONST_STAT_FILE_COUNT_KEY, size)
|
||||
server.statMap.Put(CONST_STAT_FILE_TOTAL_SIZE_KEY, size)
|
||||
server.statMap.Put(server.util.GetToDay()+"_"+CONST_STAT_FILE_COUNT_KEY, size)
|
||||
server.statMap.Put(server.util.GetToDay()+"_"+CONST_STAT_FILE_TOTAL_SIZE_KEY, size)
|
||||
|
||||
|
||||
server.curDate=server.util.GetToDay()
|
||||
server.curDate = server.util.GetToDay()
|
||||
|
||||
return server
|
||||
|
||||
|
||||
}
|
||||
|
||||
type CommonMap struct {
|
||||
@ -323,7 +316,7 @@ func (s *CommonMap) Add(key string) {
|
||||
func (s *CommonMap) Zero() {
|
||||
s.Lock()
|
||||
defer s.Unlock()
|
||||
for k, _ := range s.m {
|
||||
for k := range s.m {
|
||||
|
||||
s.m[k] = 0
|
||||
}
|
||||
@ -398,30 +391,28 @@ func (this *Common) GetToDay() string {
|
||||
|
||||
func (this *Common) GetDayFromTimeStamp(timeStamp int64) string {
|
||||
|
||||
return time.Unix(timeStamp,0).Format("20060102")
|
||||
return time.Unix(timeStamp, 0).Format("20060102")
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
func (this *Common) StrToMapSet(str string,sep string) mapset.Set {
|
||||
result:=mapset.NewSet()
|
||||
for _,v:=range strings.Split(str,sep) {
|
||||
func (this *Common) StrToMapSet(str string, sep string) mapset.Set {
|
||||
result := mapset.NewSet()
|
||||
for _, v := range strings.Split(str, sep) {
|
||||
result.Add(v)
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
func (this *Common) MapSetToStr(set mapset.Set,sep string) string{
|
||||
func (this *Common) MapSetToStr(set mapset.Set, sep string) string {
|
||||
|
||||
var (
|
||||
ret []string
|
||||
)
|
||||
|
||||
for v:=range set.Iter() {
|
||||
ret=append(ret,v.(string))
|
||||
for v := range set.Iter() {
|
||||
ret = append(ret, v.(string))
|
||||
}
|
||||
return strings.Join(ret,sep)
|
||||
return strings.Join(ret, sep)
|
||||
|
||||
}
|
||||
|
||||
@ -619,7 +610,6 @@ func (this *Server) RepairStat() {
|
||||
this.statMap.Put(CONST_STAT_FILE_COUNT_KEY, int64(0))
|
||||
this.statMap.Put(CONST_STAT_FILE_TOTAL_SIZE_KEY, int64(0))
|
||||
|
||||
|
||||
handlefunc := func(file_path string, f os.FileInfo, err error) error {
|
||||
|
||||
var (
|
||||
@ -659,12 +649,12 @@ func (this *Server) RepairStat() {
|
||||
if count > 1 {
|
||||
count = count - 1
|
||||
}
|
||||
count=0
|
||||
count = 0
|
||||
for _, line = range lines {
|
||||
|
||||
cols = strings.Split(line, "|")
|
||||
if len(cols) > 2 {
|
||||
count=count+1
|
||||
count = count + 1
|
||||
if size, err = strconv.ParseInt(cols[1], 10, 64); err != nil {
|
||||
size = 0
|
||||
continue
|
||||
@ -687,19 +677,12 @@ func (this *Server) RepairStat() {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
|
||||
filepath.Walk(DATA_DIR, handlefunc)
|
||||
|
||||
|
||||
this.SaveStat()
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
func (this *Server) DownloadFromPeer(peer string, fileInfo *FileInfo) {
|
||||
var (
|
||||
err error
|
||||
@ -721,13 +704,8 @@ func (this *Server) DownloadFromPeer(peer string, fileInfo *FileInfo) {
|
||||
|
||||
fpath = fileInfo.Path + "/" + filename
|
||||
|
||||
|
||||
|
||||
req.SetTimeout(time.Second*5, time.Second*5)
|
||||
|
||||
|
||||
|
||||
|
||||
if err = req.ToFile(fpath); err != nil {
|
||||
log.Error(err)
|
||||
}
|
||||
@ -759,17 +737,14 @@ func (this *Server) Download(w http.ResponseWriter, r *http.Request) {
|
||||
ts int64
|
||||
md5sum string
|
||||
fp *os.File
|
||||
isPeer bool
|
||||
isPeer bool
|
||||
)
|
||||
|
||||
r.ParseForm()
|
||||
|
||||
isPeer = this.IsPeer(r)
|
||||
|
||||
isPeer=this.IsPeer(r)
|
||||
|
||||
|
||||
|
||||
if Config().DownloadUseToken &&!isPeer {
|
||||
if Config().DownloadUseToken && !isPeer {
|
||||
|
||||
token = r.FormValue("token")
|
||||
timestamp = r.FormValue("timestamp")
|
||||
@ -803,7 +778,7 @@ func (this *Server) Download(w http.ResponseWriter, r *http.Request) {
|
||||
log.Error(err)
|
||||
} else {
|
||||
|
||||
for k, _ := range pathval {
|
||||
for k := range pathval {
|
||||
if k != "" {
|
||||
fullpath = k
|
||||
break
|
||||
@ -819,7 +794,7 @@ func (this *Server) Download(w http.ResponseWriter, r *http.Request) {
|
||||
return true
|
||||
}
|
||||
|
||||
if Config().DownloadUseToken &&!isPeer {
|
||||
if Config().DownloadUseToken && !isPeer {
|
||||
fullpath = strings.Split(fullpath, "?")[0]
|
||||
pathMd5 = this.util.MD5(fullpath)
|
||||
if fileInfo, err = this.GetFileInfoFromLevelDB(pathMd5); err != nil {
|
||||
@ -858,7 +833,7 @@ func (this *Server) Download(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
if fileInfo.Md5 != "" {
|
||||
|
||||
if Config().DownloadUseToken &&!isPeer {
|
||||
if Config().DownloadUseToken && !isPeer {
|
||||
if !CheckToken(token, fileInfo.Md5, timestamp) {
|
||||
w.Write([]byte("unvalid request,error token"))
|
||||
return
|
||||
@ -889,7 +864,7 @@ func (this *Server) GetServerURI(r *http.Request) string {
|
||||
return fmt.Sprintf("http://%s/", r.Host)
|
||||
}
|
||||
|
||||
func (this *Server) CheckFileAndSendToPeer(filename string, is_force_upload bool) {
|
||||
func (this *Server) CheckFileAndSendToPeer(date string, filename string, is_force_upload bool) {
|
||||
|
||||
defer func() {
|
||||
if re := recover(); re != nil {
|
||||
@ -900,9 +875,28 @@ func (this *Server) CheckFileAndSendToPeer(filename string, is_force_upload bool
|
||||
}
|
||||
}()
|
||||
|
||||
if filename == "" {
|
||||
filename = DATA_DIR + "/" + time.Now().Format("20060102") + "/" + CONST_Md5_ERROR_FILE_NAME
|
||||
}
|
||||
|
||||
filename = DATA_DIR + "/" +date+ "/" + filename
|
||||
|
||||
|
||||
if date==this.util.GetToDay() && filename==CONST_Md5_ERROR_FILE_NAME {
|
||||
|
||||
for md:=range this.errorset.Iter() {
|
||||
|
||||
if fileInfo, _ := this.GetFileInfoByMd5(md.(string)); fileInfo != nil && fileInfo.Md5 != "" {
|
||||
if is_force_upload {
|
||||
fileInfo.Peers = []string{}
|
||||
}
|
||||
//this.postFileToPeer(fileInfo, false)
|
||||
fileInfo.writeLog = false
|
||||
this.queueToPeers <- *fileInfo
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
if data, err := ioutil.ReadFile(filename); err == nil {
|
||||
content := string(data)
|
||||
@ -916,8 +910,8 @@ func (this *Server) CheckFileAndSendToPeer(filename string, is_force_upload bool
|
||||
fileInfo.Peers = []string{}
|
||||
}
|
||||
//this.postFileToPeer(fileInfo, false)
|
||||
fileInfo.writeLog=false
|
||||
this.queueToPeers<-*fileInfo
|
||||
fileInfo.writeLog = false
|
||||
this.queueToPeers <- *fileInfo
|
||||
}
|
||||
}
|
||||
|
||||
@ -981,17 +975,9 @@ func (this *Server) postFileToPeer(fileInfo *FileInfo) {
|
||||
continue
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
postURL = fmt.Sprintf("%s/%s", peer, "syncfile")
|
||||
b := httplib.Post(postURL)
|
||||
|
||||
|
||||
|
||||
b.SetTimeout(time.Second*1, time.Second*1)
|
||||
b.Header("Sync-Path", fileInfo.Path)
|
||||
b.Param("name", filename)
|
||||
@ -1036,40 +1022,48 @@ func (this *Server) SaveFileMd5Log(fileInfo *FileInfo, filename string) {
|
||||
logpath string
|
||||
outname string
|
||||
logDate string
|
||||
logSet mapset.Set
|
||||
logSet mapset.Set
|
||||
toDay string
|
||||
)
|
||||
|
||||
|
||||
logDate=this.util.GetDayFromTimeStamp(fileInfo.TimeStamp)
|
||||
toDay = this.util.GetToDay()
|
||||
logDate = this.util.GetDayFromTimeStamp(fileInfo.TimeStamp)
|
||||
|
||||
outname = fileInfo.Name
|
||||
|
||||
if this.curDate!=this.util.GetToDay() {
|
||||
if this.curDate != toDay {
|
||||
this.errorset.Clear()
|
||||
this.fileset.Clear()
|
||||
}
|
||||
|
||||
if logDate == toDay && filename == CONST_Md5_ERROR_FILE_NAME &&
|
||||
this.errorset.Cardinality() == 0 &&
|
||||
this.util.IsExist(DATA_DIR+"/"+toDay+"/"+CONST_Md5_ERROR_FILE_NAME) {
|
||||
|
||||
if logDate!=this.util.GetToDay() && filename==CONST_FILE_Md5_FILE_NAME {
|
||||
this.errorset, err = this.GetMd5sByDate(logDate, CONST_Md5_ERROR_FILE_NAME)
|
||||
|
||||
if logSet,err=this.GetMd5sByDate(logDate);err!=nil {
|
||||
log.Error(err)
|
||||
}
|
||||
if logSet.Contains(fileInfo.Md5) {
|
||||
log.Info(fmt.Sprintf("date log %s contain md5 %s",logDate,fileInfo.Md5))
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
if logDate != this.util.GetToDay() && filename == CONST_FILE_Md5_FILE_NAME {
|
||||
|
||||
if logSet, err = this.GetMd5sByDate(logDate, CONST_FILE_Md5_FILE_NAME); err != nil {
|
||||
log.Error(err)
|
||||
}
|
||||
if logSet.Contains(fileInfo.Md5) {
|
||||
log.Info(fmt.Sprintf("date log %s contain md5 %s", logDate, fileInfo.Md5))
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
if fileInfo.ReName != "" {
|
||||
outname = fileInfo.ReName
|
||||
}
|
||||
|
||||
if filename==CONST_Md5_ERROR_FILE_NAME && this.errorset.Contains(fileInfo.Md5) {
|
||||
if filename == CONST_Md5_ERROR_FILE_NAME && this.errorset.Contains(fileInfo.Md5) {
|
||||
return
|
||||
}
|
||||
|
||||
if filename==CONST_FILE_Md5_FILE_NAME && this.fileset.Contains(fileInfo.Md5) {
|
||||
if filename == CONST_FILE_Md5_FILE_NAME && this.fileset.Contains(fileInfo.Md5) {
|
||||
return
|
||||
}
|
||||
|
||||
@ -1084,10 +1078,10 @@ func (this *Server) SaveFileMd5Log(fileInfo *FileInfo, filename string) {
|
||||
}
|
||||
defer tmpFile.Close()
|
||||
tmpFile.WriteString(msg)
|
||||
if filename==CONST_FILE_Md5_FILE_NAME {
|
||||
if filename == CONST_FILE_Md5_FILE_NAME {
|
||||
this.fileset.Add(fileInfo.Md5)
|
||||
}
|
||||
if filename==CONST_Md5_ERROR_FILE_NAME {
|
||||
if filename == CONST_Md5_ERROR_FILE_NAME {
|
||||
this.errorset.Add(fileInfo.Md5)
|
||||
}
|
||||
|
||||
@ -1166,13 +1160,10 @@ func (this *Server) Sync(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
var (
|
||||
filename string
|
||||
|
||||
)
|
||||
|
||||
|
||||
r.ParseForm()
|
||||
|
||||
|
||||
if !this.IsPeer(r) {
|
||||
w.Write([]byte("client must be in cluster"))
|
||||
return
|
||||
@ -1183,14 +1174,14 @@ func (this *Server) Sync(w http.ResponseWriter, r *http.Request) {
|
||||
force := ""
|
||||
is_force_upload := false
|
||||
|
||||
force=r.FormValue("force")
|
||||
date=r.FormValue("date")
|
||||
force = r.FormValue("force")
|
||||
date = r.FormValue("date")
|
||||
|
||||
if force == "1" {
|
||||
is_force_upload = true
|
||||
}
|
||||
|
||||
if date=="" {
|
||||
if date == "" {
|
||||
|
||||
w.Write([]byte("require paramete date &force , date?=20181230"))
|
||||
return
|
||||
@ -1199,28 +1190,25 @@ func (this *Server) Sync(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
if is_force_upload {
|
||||
|
||||
filename = DATA_DIR + "/" + date + "/" + CONST_FILE_Md5_FILE_NAME
|
||||
|
||||
if this.util.FileExists(filename) {
|
||||
|
||||
go this.CheckFileAndSendToPeer(filename, is_force_upload)
|
||||
go this.CheckFileAndSendToPeer(date, CONST_FILE_Md5_FILE_NAME, is_force_upload)
|
||||
} else {
|
||||
w.Write([]byte(fmt.Sprintf( "%s not found",filename)))
|
||||
w.Write([]byte(fmt.Sprintf("%s not found", filename)))
|
||||
return
|
||||
}
|
||||
} else {
|
||||
} else {
|
||||
|
||||
filename = DATA_DIR + "/" + date + "/" + CONST_Md5_ERROR_FILE_NAME
|
||||
|
||||
if this.util.FileExists(filename) {
|
||||
|
||||
go this.CheckFileAndSendToPeer(filename, is_force_upload)
|
||||
go this.CheckFileAndSendToPeer(date, CONST_Md5_ERROR_FILE_NAME, is_force_upload)
|
||||
} else {
|
||||
w.Write([]byte(fmt.Sprintf( "%s not found",filename)))
|
||||
w.Write([]byte(fmt.Sprintf("%s not found", filename)))
|
||||
return
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
//for _,peer:=range Config().Peers {
|
||||
@ -1279,7 +1267,7 @@ func (this *Server) SaveStat() {
|
||||
if v, ok := stat[CONST_STAT_FILE_TOTAL_SIZE_KEY]; ok {
|
||||
switch v.(type) {
|
||||
case int64:
|
||||
if v.(int64) >=0 {
|
||||
if v.(int64) >= 0 {
|
||||
|
||||
if data, err := json.Marshal(stat); err != nil {
|
||||
log.Error(err)
|
||||
@ -1332,15 +1320,12 @@ func (this *Server) IsPeer(r *http.Request) bool {
|
||||
)
|
||||
ip = this.util.GetClientIp(r)
|
||||
|
||||
if ip=="127.0.0.1" || ip==this.util.GetPulicIP() {
|
||||
if ip == "127.0.0.1" || ip == this.util.GetPulicIP() {
|
||||
return true
|
||||
}
|
||||
ip = "http://" + ip
|
||||
bflag = false
|
||||
|
||||
|
||||
|
||||
|
||||
for _, peer = range Config().Peers {
|
||||
if strings.HasPrefix(peer, ip) {
|
||||
bflag = true
|
||||
@ -1353,47 +1338,44 @@ func (this *Server) IsPeer(r *http.Request) bool {
|
||||
func (this *Server) ReceiveMd5s(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
var (
|
||||
err error
|
||||
md5s string
|
||||
mdfs mapset.Set
|
||||
err error
|
||||
md5s string
|
||||
mdfs mapset.Set
|
||||
fileInfo *FileInfo
|
||||
|
||||
)
|
||||
|
||||
if !this.IsPeer(r) {
|
||||
log.Warn(fmt.Sprintf("ReceiveMd5s %s",this.util.GetClientIp(r)))
|
||||
log.Warn(fmt.Sprintf("ReceiveMd5s %s", this.util.GetClientIp(r)))
|
||||
return
|
||||
}
|
||||
|
||||
mdfs=mapset.NewSet()
|
||||
mdfs = mapset.NewSet()
|
||||
|
||||
r.ParseForm()
|
||||
md5s=r.FormValue("md5s")
|
||||
for _,v:= range strings.Split(md5s,",") {
|
||||
md5s = r.FormValue("md5s")
|
||||
for _, v := range strings.Split(md5s, ",") {
|
||||
|
||||
mdfs.Add(v)
|
||||
|
||||
}
|
||||
|
||||
|
||||
for m:=range mdfs.Iter() {
|
||||
if fileInfo,err =this.GetFileInfoByMd5(m.(string));err!=nil {
|
||||
log.Error(err)
|
||||
continue
|
||||
}
|
||||
this.queueToPeers<- *fileInfo
|
||||
for m := range mdfs.Iter() {
|
||||
if fileInfo, err = this.GetFileInfoByMd5(m.(string)); err != nil {
|
||||
log.Error(err)
|
||||
continue
|
||||
}
|
||||
this.queueToPeers <- *fileInfo
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
func (this *Server) GetMd5sForWeb(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
var (
|
||||
date string
|
||||
err error
|
||||
date string
|
||||
err error
|
||||
result mapset.Set
|
||||
lines []string
|
||||
lines []string
|
||||
)
|
||||
|
||||
if !this.IsPeer(r) {
|
||||
@ -1401,27 +1383,26 @@ func (this *Server) GetMd5sForWeb(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
|
||||
}
|
||||
date = r.FormValue("date")
|
||||
|
||||
if result,err=this.GetMd5sByDate(date);err!=nil {
|
||||
log.Error(err)
|
||||
return
|
||||
}
|
||||
for line:=range result.Iter() {
|
||||
lines=append(lines,line.(string))
|
||||
}
|
||||
w.Write([]byte( strings.Join(lines,",") ))
|
||||
date = r.FormValue("date")
|
||||
|
||||
if result, err = this.GetMd5sByDate(date, CONST_FILE_Md5_FILE_NAME); err != nil {
|
||||
log.Error(err)
|
||||
return
|
||||
}
|
||||
for line := range result.Iter() {
|
||||
lines = append(lines, line.(string))
|
||||
}
|
||||
w.Write([]byte( strings.Join(lines, ",") ))
|
||||
|
||||
}
|
||||
|
||||
func (this *Server) GetMd5File(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
var (
|
||||
date string
|
||||
date string
|
||||
fpath string
|
||||
data []byte
|
||||
err error
|
||||
data []byte
|
||||
err error
|
||||
)
|
||||
if !this.IsPeer(r) {
|
||||
|
||||
@ -1429,13 +1410,13 @@ func (this *Server) GetMd5File(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
}
|
||||
|
||||
fpath=DATA_DIR+"/"+date+"/"+CONST_FILE_Md5_FILE_NAME
|
||||
fpath = DATA_DIR + "/" + date + "/" + CONST_FILE_Md5_FILE_NAME
|
||||
|
||||
if !this.util.FileExists(fpath) {
|
||||
w.WriteHeader(404)
|
||||
return
|
||||
}
|
||||
if data,err=ioutil.ReadFile(fpath);err!=nil {
|
||||
if data, err = ioutil.ReadFile(fpath); err != nil {
|
||||
w.WriteHeader(500)
|
||||
return
|
||||
}
|
||||
@ -1443,33 +1424,35 @@ func (this *Server) GetMd5File(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
}
|
||||
|
||||
|
||||
func (this *Server) GetMd5sByDate(date string) (mapset.Set,error) {
|
||||
func (this *Server) GetMd5sByDate(date string, filename string) (mapset.Set, error) {
|
||||
|
||||
var (
|
||||
err error
|
||||
result mapset.Set
|
||||
fpath string
|
||||
err error
|
||||
result mapset.Set
|
||||
fpath string
|
||||
content string
|
||||
lines []string
|
||||
line string
|
||||
cols []string
|
||||
data []byte
|
||||
lines []string
|
||||
line string
|
||||
cols []string
|
||||
data []byte
|
||||
)
|
||||
|
||||
result=mapset.NewSet()
|
||||
|
||||
fpath=DATA_DIR+"/"+date+"/"+CONST_FILE_Md5_FILE_NAME
|
||||
result = mapset.NewSet()
|
||||
if filename == "" {
|
||||
fpath = DATA_DIR + "/" + date + "/" + CONST_FILE_Md5_FILE_NAME
|
||||
} else {
|
||||
fpath = DATA_DIR + "/" + date + "/" + filename
|
||||
}
|
||||
|
||||
if !this.util.FileExists(fpath) {
|
||||
return result,errors.New("not found")
|
||||
return result, errors.New(fmt.Sprintf("fpath %s not found", fpath))
|
||||
}
|
||||
|
||||
if data,err=ioutil.ReadFile(fpath);err!=nil {
|
||||
return result,err
|
||||
if data, err = ioutil.ReadFile(fpath); err != nil {
|
||||
return result, err
|
||||
}
|
||||
content=string(data)
|
||||
lines=strings.Split(content,"\n")
|
||||
content = string(data)
|
||||
lines = strings.Split(content, "\n")
|
||||
for _, line = range lines {
|
||||
|
||||
cols = strings.Split(line, "|")
|
||||
@ -1481,14 +1464,9 @@ func (this *Server) GetMd5sByDate(date string) (mapset.Set,error) {
|
||||
|
||||
}
|
||||
}
|
||||
return result,nil
|
||||
return result, nil
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
func (this *Server) SyncFile(w http.ResponseWriter, r *http.Request) {
|
||||
var (
|
||||
err error
|
||||
@ -1583,7 +1561,7 @@ func (this *Server) SyncFile(w http.ResponseWriter, r *http.Request) {
|
||||
log.Error(err)
|
||||
} else {
|
||||
fileInfo.Size = fi.Size()
|
||||
date:=time.Unix(fileInfo.TimeStamp,0).Format("20060102")
|
||||
date := time.Unix(fileInfo.TimeStamp, 0).Format("20060102")
|
||||
this.statMap.AddCountInt64(date+"_"+CONST_STAT_FILE_COUNT_KEY, 1)
|
||||
this.statMap.AddCountInt64(date+"_"+CONST_STAT_FILE_TOTAL_SIZE_KEY, fi.Size())
|
||||
this.statMap.AddCountInt64(CONST_STAT_FILE_TOTAL_SIZE_KEY, fi.Size())
|
||||
@ -1912,8 +1890,8 @@ func (this *Server) Upload(w http.ResponseWriter, r *http.Request) {
|
||||
// Path: fileInfo.Path, Md5: fileInfo.Md5, ReName: fileInfo.ReName,
|
||||
// Size: fileInfo.Size, Scene: fileInfo.Scene}
|
||||
|
||||
fileInfo.writeLog=true
|
||||
this.queueToPeers<-*fileInfo
|
||||
fileInfo.writeLog = true
|
||||
this.queueToPeers <- *fileInfo
|
||||
}
|
||||
|
||||
// go this.postFileToPeer(fileInfo, true)
|
||||
@ -2043,24 +2021,23 @@ func (this *Server) RepairStatWeb(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
}
|
||||
|
||||
|
||||
func (this *Server) Stat(w http.ResponseWriter, r *http.Request) {
|
||||
data:=this.util.JsonEncodePretty(this.GetStat())
|
||||
data := this.util.JsonEncodePretty(this.GetStat())
|
||||
w.Write([]byte(data))
|
||||
}
|
||||
|
||||
func (this *Server) GetStat() []StatDateFileInfo {
|
||||
var (
|
||||
min int64
|
||||
max int64
|
||||
err error
|
||||
i int64
|
||||
min int64
|
||||
max int64
|
||||
err error
|
||||
i int64
|
||||
|
||||
rows []StatDateFileInfo
|
||||
)
|
||||
min = 20190101
|
||||
max = 20190101
|
||||
for k, _ := range this.statMap.Get() {
|
||||
for k := range this.statMap.Get() {
|
||||
ks := strings.Split(k, "_")
|
||||
if len(ks) == 2 {
|
||||
if i, err = strconv.ParseInt(ks[0], 10, 64); err != nil {
|
||||
@ -2113,7 +2090,6 @@ func (this *Server) GetStat() []StatDateFileInfo {
|
||||
|
||||
return rows
|
||||
|
||||
|
||||
}
|
||||
|
||||
func (this *Server) RegisterExit() {
|
||||
@ -2156,13 +2132,12 @@ func (this *Server) AutoRepair() {
|
||||
|
||||
var (
|
||||
dateStats []StatDateFileInfo
|
||||
err error
|
||||
countKey string
|
||||
md5s string
|
||||
localSet mapset.Set
|
||||
err error
|
||||
countKey string
|
||||
md5s string
|
||||
localSet mapset.Set
|
||||
remoteSet mapset.Set
|
||||
allSet mapset.Set
|
||||
|
||||
allSet mapset.Set
|
||||
)
|
||||
|
||||
defer func() {
|
||||
@ -2174,69 +2149,61 @@ func (this *Server) AutoRepair() {
|
||||
}
|
||||
}()
|
||||
|
||||
Update:= func(peer string, dateStat StatDateFileInfo) {
|
||||
Update := func(peer string, dateStat StatDateFileInfo) {
|
||||
|
||||
req:=httplib.Get(fmt.Sprintf("%s/sync?date=%s&force=%s",peer, dateStat.Date,"1"))
|
||||
req.SetTimeout(time.Second*5,time.Second*5)
|
||||
if _,err=req.String();err!=nil {
|
||||
req := httplib.Get(fmt.Sprintf("%s/sync?date=%s&force=%s", peer, dateStat.Date, "1"))
|
||||
req.SetTimeout(time.Second*5, time.Second*5)
|
||||
if _, err = req.String(); err != nil {
|
||||
log.Error(err)
|
||||
|
||||
}
|
||||
log.Info(fmt.Sprintf("syn file from %s date %s",peer,dateStat.Date))
|
||||
|
||||
|
||||
log.Info(fmt.Sprintf("syn file from %s date %s", peer, dateStat.Date))
|
||||
|
||||
}
|
||||
|
||||
for _, peer := range Config().Peers {
|
||||
|
||||
for _,peer:=range Config().Peers {
|
||||
|
||||
|
||||
|
||||
req:=httplib.Get(fmt.Sprintf("%s/%s",peer,"stat"))
|
||||
req.SetTimeout(time.Second*5,time.Second*5)
|
||||
if err=req.ToJSON(&dateStats);err!=nil {
|
||||
req := httplib.Get(fmt.Sprintf("%s/%s", peer, "stat"))
|
||||
req.SetTimeout(time.Second*5, time.Second*5)
|
||||
if err = req.ToJSON(&dateStats); err != nil {
|
||||
log.Error(err)
|
||||
continue
|
||||
}
|
||||
|
||||
|
||||
for _,dateStat:=range dateStats {
|
||||
if dateStat.Date=="all" {
|
||||
for _, dateStat := range dateStats {
|
||||
if dateStat.Date == "all" {
|
||||
continue
|
||||
}
|
||||
countKey=dateStat.Date+"_"+ CONST_STAT_FILE_COUNT_KEY
|
||||
if v,ok:= this.statMap.GetValue(countKey);ok {
|
||||
countKey = dateStat.Date + "_" + CONST_STAT_FILE_COUNT_KEY
|
||||
if v, ok := this.statMap.GetValue(countKey); ok {
|
||||
switch v.(type) {
|
||||
case int64:
|
||||
if v.(int64)!=dateStat.FileCount {//不相等,找差异
|
||||
//TODO
|
||||
req:= httplib.Post(fmt.Sprintf("%s/get_md5s_by_date",peer))
|
||||
if v.(int64) != dateStat.FileCount { //不相等,找差异
|
||||
//TODO
|
||||
req := httplib.Post(fmt.Sprintf("%s/get_md5s_by_date", peer))
|
||||
|
||||
req.Param("date",dateStat.Date)
|
||||
req.Param("date", dateStat.Date)
|
||||
|
||||
if md5s,err=req.String();err!=nil {
|
||||
continue
|
||||
if md5s, err = req.String(); err != nil {
|
||||
continue
|
||||
}
|
||||
if localSet,err= this.GetMd5sByDate(dateStat.Date);err!=nil {
|
||||
log.Error(err)
|
||||
continue
|
||||
if localSet, err = this.GetMd5sByDate(dateStat.Date,CONST_FILE_Md5_FILE_NAME); err != nil {
|
||||
log.Error(err)
|
||||
continue
|
||||
}
|
||||
remoteSet=this.util.StrToMapSet(md5s,",")
|
||||
allSet=localSet.Union(remoteSet)
|
||||
md5s= this.util.MapSetToStr(allSet.Difference(localSet),",")
|
||||
req=httplib.Post(fmt.Sprintf("%s/receive_md5s",peer))
|
||||
req.SetTimeout(time.Second*5,time.Second*5)
|
||||
req.Param("md5s",md5s)
|
||||
req.String()
|
||||
|
||||
|
||||
remoteSet = this.util.StrToMapSet(md5s, ",")
|
||||
allSet = localSet.Union(remoteSet)
|
||||
md5s = this.util.MapSetToStr(allSet.Difference(localSet), ",")
|
||||
req = httplib.Post(fmt.Sprintf("%s/receive_md5s", peer))
|
||||
req.SetTimeout(time.Second*5, time.Second*5)
|
||||
req.Param("md5s", md5s)
|
||||
req.String()
|
||||
|
||||
//Update(peer,dateStat)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
Update(peer,dateStat)
|
||||
Update(peer, dateStat)
|
||||
|
||||
}
|
||||
|
||||
@ -2246,22 +2213,15 @@ func (this *Server) AutoRepair() {
|
||||
|
||||
}
|
||||
|
||||
//for {
|
||||
// time.Sleep(time.Second*10)
|
||||
//for {
|
||||
// time.Sleep(time.Second*10)
|
||||
// AutoRepairFunc()
|
||||
// time.Sleep(time.Minute*60)
|
||||
// time.Sleep(time.Minute*60)
|
||||
//}
|
||||
|
||||
|
||||
AutoRepairFunc()
|
||||
}
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
func (this *Server) Check() {
|
||||
|
||||
check := func() {
|
||||
@ -2437,7 +2397,7 @@ func init() {
|
||||
server.initComponent(false)
|
||||
}
|
||||
|
||||
func (this *Server)initComponent(is_reload bool) {
|
||||
func (this *Server) initComponent(is_reload bool) {
|
||||
var (
|
||||
err error
|
||||
ldb *leveldb.DB
|
||||
@ -2547,10 +2507,9 @@ func (HttpHandler) ServeHTTP(res http.ResponseWriter, req *http.Request) {
|
||||
|
||||
func (this *Server) Main() {
|
||||
|
||||
|
||||
go func() {
|
||||
for {
|
||||
this.CheckFileAndSendToPeer("", false)
|
||||
this.CheckFileAndSendToPeer( this.util.GetToDay(),CONST_Md5_ERROR_FILE_NAME, false)
|
||||
time.Sleep(time.Second * time.Duration(Config().RefreshInterval))
|
||||
this.util.RemoveEmptyDir(STORE_DIR)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user