add md5 sync

This commit is contained in:
s_jqzhang 2019-01-18 19:05:18 +08:00
parent e81e73c91b
commit 985896c4f1

View File

@ -156,6 +156,7 @@ type Server struct {
queueToPeers chan FileInfo
fileset mapset.Set
errorset mapset.Set
curDate string
}
@ -235,8 +236,9 @@ func NewServer() *Server {
queueToPeers: make(chan FileInfo, CONST_QUEUE_SIZE),
fileset: mapset.NewSet(),
errorset:mapset.NewSet(),
}
server.curDate=server.util.GetToDay()
return server
@ -376,6 +378,28 @@ func (this *Common) GetToDay() string {
}
func (this *Common) StrToMapSet(str string,sep string) mapset.Set {
result:=mapset.NewSet()
for _,v:=range strings.Split(str,sep) {
result.Add(v)
}
return result
}
func (this *Common) MapSetToStr(set mapset.Set,sep string) string{
var (
ret []string
)
for v:=range set.Iter() {
ret=append(ret,v.(string))
}
return strings.Join(ret,sep)
}
func (this *Common) GetPulicIP() string {
conn, _ := net.Dial("udp", "8.8.8.8:80")
defer conn.Close()
@ -610,10 +634,12 @@ func (this *Server) RepairStat() {
if count > 1 {
count = count - 1
}
count=0
for _, line = range lines {
cols = strings.Split(line, "|")
if len(cols) > 2 {
count=count+1
if size, err = strconv.ParseInt(cols[1], 10, 64); err != nil {
size = 0
continue
@ -978,6 +1004,11 @@ func (this *Server) SaveFileMd5Log(fileInfo *FileInfo, filename string) {
outname = fileInfo.Name
if this.curDate!=this.util.GetToDay() {
this.errorset.Clear()
this.fileset.Clear()
}
if fileInfo.ReName != "" {
outname = fileInfo.ReName
}
@ -1081,6 +1112,10 @@ func (this *Server) CheckFileExist(w http.ResponseWriter, r *http.Request) {
func (this *Server) Sync(w http.ResponseWriter, r *http.Request) {
var (
filename string
)
r.ParseForm()
date := ""
@ -1101,17 +1136,25 @@ func (this *Server) Sync(w http.ResponseWriter, r *http.Request) {
return
}
date = strings.Replace(date, ".", "", -1)
filename := DATA_DIR + "/" + date + "/" + CONST_Md5_ERROR_FILE_NAME
if this.util.FileExists(filename) {
if is_force_upload {
go this.CheckFileAndSendToPeer(filename, is_force_upload)
}
filename = DATA_DIR + "/" + date + "/" + CONST_FILE_Md5_FILE_NAME
filename = DATA_DIR + "/" + date + "/" + CONST_FILE_Md5_FILE_NAME
if this.util.FileExists(filename) {
go this.CheckFileAndSendToPeer(filename, is_force_upload)
}
} else {
filename = DATA_DIR + "/" + date + "/" + CONST_Md5_ERROR_FILE_NAME
if this.util.FileExists(filename) {
go this.CheckFileAndSendToPeer(filename, is_force_upload)
}
if this.util.FileExists(filename) {
go this.CheckFileAndSendToPeer(filename, is_force_upload)
}
w.Write([]byte("job is running"))
}
@ -1202,9 +1245,16 @@ func (this *Server) IsPeer(r *http.Request) bool {
bflag bool
)
ip = this.util.GetClientIp(r)
if ip=="127.0.0.1" || ip==this.util.GetPulicIP() {
return true
}
ip = "http://" + ip
bflag = false
for _, peer = range Config().Peers {
if strings.HasPrefix(peer, ip) {
bflag = true
@ -1214,11 +1264,150 @@ func (this *Server) IsPeer(r *http.Request) bool {
return bflag
}
func (this *Server) ReceiveMd5s(w http.ResponseWriter, r *http.Request) {
var (
err error
md5s string
mdfs mapset.Set
fileInfo *FileInfo
)
if !this.IsPeer(r) {
log.Warn(fmt.Sprintf("ReceiveMd5s %s",this.util.GetClientIp(r)))
return
}
mdfs=mapset.NewSet()
r.ParseForm()
md5s=r.FormValue("md5s")
for _,v:= range strings.Split(md5s,",") {
mdfs.Add(v)
}
for m:=range mdfs.Iter() {
if fileInfo,err =this.GetFileInfoByMd5(m.(string));err!=nil {
log.Error(err)
continue
}
this.queueToPeers<- *fileInfo
}
}
func (this *Server) GetMd5sForWeb(w http.ResponseWriter, r *http.Request) {
var (
date string
err error
result mapset.Set
lines []string
)
if !this.IsPeer(r) {
return
}
date = r.FormValue("date")
if result,err=this.GetMd5sByDate(date);err!=nil {
log.Error(err)
return
}
for line:=range result.Iter() {
lines=append(lines,line.(string))
}
w.Write([]byte( strings.Join(lines,",") ))
}
func (this *Server) GetMd5File(w http.ResponseWriter, r *http.Request) {
var (
date string
fpath string
data []byte
err error
)
if !this.IsPeer(r) {
return
}
fpath=DATA_DIR+"/"+date+"/"+CONST_FILE_Md5_FILE_NAME
if !this.util.FileExists(fpath) {
w.WriteHeader(404)
return
}
if data,err=ioutil.ReadFile(fpath);err!=nil {
w.WriteHeader(500)
return
}
w.Write(data)
}
func (this *Server) GetMd5sByDate(date string) (mapset.Set,error) {
var (
err error
result mapset.Set
fpath string
content string
lines []string
line string
cols []string
data []byte
)
result=mapset.NewSet()
fpath=DATA_DIR+"/"+date+"/"+CONST_FILE_Md5_FILE_NAME
if !this.util.FileExists(fpath) {
return result,errors.New("not found")
}
if data,err=ioutil.ReadFile(fpath);err!=nil {
return result,err
}
content=string(data)
lines=strings.Split(content,"\n")
for _, line = range lines {
cols = strings.Split(line, "|")
if len(cols) > 2 {
if _, err = strconv.ParseInt(cols[1], 10, 64); err != nil {
continue
}
result.Add(cols[0])
}
}
return result,nil
}
func (this *Server) SyncFile(w http.ResponseWriter, r *http.Request) {
var (
err error
outPath string
outname string
//outname string
// timestamp string
fileInfo FileInfo
tmpFile *os.File
@ -1250,19 +1439,19 @@ func (this *Server) SyncFile(w http.ResponseWriter, r *http.Request) {
defer uploadFile.Close()
if v, _ := this.GetFileInfoFromLevelDB(fileInfo.Md5); v != nil && v.Md5 != "" {
outname = v.Name
if v.ReName != "" {
outname = v.ReName
}
p := strings.Replace(v.Path, STORE_DIR+"/", "", 1)
download_url := fmt.Sprintf("http://%s/%s", r.Host, Config().Group+"/"+p+"/"+outname)
w.Write([]byte(download_url))
return
}
//if v, _ := this.GetFileInfoFromLevelDB(fileInfo.Md5); v != nil && v.Md5 != "" {
// outname = v.Name
// if v.ReName != "" {
// outname = v.ReName
// }
//
// p := strings.Replace(v.Path, STORE_DIR+"/", "", 1)
//
// download_url := fmt.Sprintf("http://%s/%s", r.Host, Config().Group+"/"+p+"/"+outname)
// w.Write([]byte(download_url))
//
// return
//}
os.MkdirAll(fileInfo.Path, 0666)
@ -1875,6 +2064,10 @@ func (this *Server) AutoRepair() {
dateStats []StatDateFileInfo
err error
countKey string
md5s string
localSet mapset.Set
remoteSet mapset.Set
allSet mapset.Set
)
@ -1922,8 +2115,30 @@ func (this *Server) AutoRepair() {
if v,ok:= this.statMap.GetValue(countKey);ok {
switch v.(type) {
case int64:
if v.(int64)<dateStat.FileCount {
Update(peer,dateStat)
if v.(int64)!=dateStat.FileCount {//不相等,找差异
//TODO
req:= httplib.Post(fmt.Sprintf("%s/get_md5s_by_date",peer))
req.Param("date",dateStat.Date)
if md5s,err=req.String();err!=nil {
continue
}
if localSet,err= this.GetMd5sByDate(dateStat.Date);err!=nil {
log.Error(err)
continue
}
remoteSet=this.util.StrToMapSet(md5s,",")
allSet=localSet.Union(remoteSet)
md5s= this.util.MapSetToStr(allSet.Difference(localSet),",")
req=httplib.Post(fmt.Sprintf("%s/receive_md5s",peer))
req.SetTimeout(time.Second*5,time.Second*5)
req.Param("md5s",md5s)
req.String()
//Update(peer,dateStat)
}
}
} else {
@ -1944,7 +2159,7 @@ func (this *Server) AutoRepair() {
}
AutoRepairFunc()
}
@ -1960,7 +2175,7 @@ func (this *Server) Check() {
defer func() {
if re := recover(); re != nil {
buffer := debug.Stack()
log.Error("postFileToPeer")
log.Error("Check")
log.Error(re)
log.Error(string(buffer))
}
@ -2250,6 +2465,8 @@ func (this *Server) Main() {
http.HandleFunc("/repair_stat", this.RepairStatWeb)
http.HandleFunc("/status", this.Status)
http.HandleFunc("/syncfile", this.SyncFile)
http.HandleFunc("/get_md5s_by_date", this.GetMd5sForWeb)
http.HandleFunc("/receive_md5s", this.ReceiveMd5s)
http.HandleFunc("/"+Config().Group+"/", this.Download)
fmt.Println("Listen on " + Config().Addr)
err := http.ListenAndServe(Config().Addr, new(HttpHandler))
@ -2258,9 +2475,6 @@ func (this *Server) Main() {
}
func main() {
server.Main()
}