add search for low capacity

This commit is contained in:
s_jqzhang 2019-04-24 10:17:20 +08:00
parent 877a575d51
commit 2252680872

View File

@ -1,6 +1,7 @@
package main
import (
"bufio"
"bytes"
"crypto/md5"
"crypto/rand"
@ -71,6 +72,7 @@ var (
CONST_LOG_LEVELDB_FILE_NAME = DATA_DIR + "/log.db"
CONST_STAT_FILE_NAME = DATA_DIR + "/stat.json"
CONST_CONF_FILE_NAME = CONF_DIR + "/cfg.json"
CONST_SEARCH_FILE_NAME = DATA_DIR + "/search.txt"
logConfigStr = `
<seelog type="asynctimer" asyncinterval="1000" minlevel="trace" maxlevel="error">
<outputs formatid="common">
@ -196,6 +198,7 @@ type Server struct {
queueFileLog chan *FileLog
lockMap *CommonMap
sceneMap *CommonMap
searchMap *CommonMap
curDate string
host string
}
@ -294,6 +297,7 @@ func NewServer() *Server {
statMap: NewCommonMap(0),
lockMap: NewCommonMap(0),
sceneMap: NewCommonMap(0),
searchMap: NewCommonMap(0),
queueToPeers: make(chan FileInfo, CONST_QUEUE_SIZE),
queueFromPeers: make(chan FileInfo, CONST_QUEUE_SIZE),
queueFileLog: make(chan *FileLog, CONST_QUEUE_SIZE),
@ -1737,6 +1741,7 @@ func (this *Server) saveFileMd5Log(fileInfo *FileInfo, filename string) {
fullpath = fileInfo.Path + "/" + outname
logKey = fmt.Sprintf("%s_%s_%s", logDate, filename, fileInfo.Md5)
if filename == CONST_FILE_Md5_FILE_NAME {
this.searchMap.Put(fileInfo.Md5, fileInfo.Name)
if ok, err = this.IsExistFromLevelDB(fileInfo.Md5, this.ldb); !ok {
this.statMap.AddCountInt64(logDate+"_"+CONST_STAT_FILE_COUNT_KEY, 1)
this.statMap.AddCountInt64(logDate+"_"+CONST_STAT_FILE_TOTAL_SIZE_KEY, fileInfo.Size)
@ -1754,6 +1759,7 @@ func (this *Server) saveFileMd5Log(fileInfo *FileInfo, filename string) {
return
}
if filename == CONST_REMOME_Md5_FILE_NAME {
this.searchMap.Remove(fileInfo.Md5)
if ok, err = this.IsExistFromLevelDB(fileInfo.Md5, this.ldb); ok {
this.statMap.AddCountInt64(logDate+"_"+CONST_STAT_FILE_COUNT_KEY, -1)
this.statMap.AddCountInt64(logDate+"_"+CONST_STAT_FILE_TOTAL_SIZE_KEY, -fileInfo.Size)
@ -2777,6 +2783,49 @@ func (this *Server) ConsumerLog() {
}
}()
}
func (this *Server) LoadSearchDict() {
go func() {
log.Info("Load search dict ....")
f, err := os.Open(CONST_SEARCH_FILE_NAME)
if err != nil {
log.Error(err)
return
}
defer f.Close()
r := bufio.NewReader(f)
for {
line, isprefix, err := r.ReadLine()
for isprefix && err == nil {
kvs := strings.Split(string(line), "\t")
if len(kvs) == 2 {
this.searchMap.Put(kvs[0], kvs[1])
}
}
}
log.Info("finish load search dict")
}()
}
func (this *Server) SaveSearchDict() {
var (
err error
fp *os.File
searchDict map[string]interface{}
k string
v interface{}
)
this.lockMap.LockKey(CONST_SEARCH_FILE_NAME)
defer this.lockMap.UnLockKey(CONST_SEARCH_FILE_NAME)
searchDict = this.searchMap.Get()
fp, err = os.OpenFile(CONST_SEARCH_FILE_NAME, os.O_RDWR, 0755)
if err != nil {
log.Error(err)
return
}
defer fp.Close()
for k, v = range searchDict {
fp.WriteString(fmt.Sprintf("%s\t%s", k, v.(string)))
}
}
func (this *Server) ConsumerPostToPeer() {
ConsumerFunc := func() {
for {
@ -3136,6 +3185,48 @@ func (this *Server) BackUp(w http.ResponseWriter, r *http.Request) {
w.Write([]byte(this.util.JsonEncodePretty(result)))
}
}
// Notice: performance is poor,just for low capacity,but low memory , if you want to high performance,use searchMap for search,but memory ....
func (this *Server) Search(w http.ResponseWriter, r *http.Request) {
var (
result JsonResult
err error
kw string
count int
fileInfos []FileInfo
md5s []string
)
kw = r.FormValue("kw")
if !this.IsPeer(r) {
result.Message = this.GetClusterNotPermitMessage(r)
w.Write([]byte(this.util.JsonEncodePretty(result)))
}
iter := this.ldb.NewIterator(nil, nil)
for iter.Next() {
var fileInfo FileInfo
value := iter.Value()
if err = json.Unmarshal(value, &fileInfo); err != nil {
log.Error(err)
continue
}
if strings.Contains(fileInfo.Name, kw) && !this.util.Contains(fileInfo.Md5, md5s) {
count = count + 1
fileInfos = append(fileInfos, fileInfo)
md5s = append(md5s, fileInfo.Md5)
}
if count >= 100 {
break
}
}
iter.Release()
err = iter.Error()
if err != nil {
log.Error()
}
result.Status = "ok"
result.Data = fileInfos
w.Write([]byte(this.util.JsonEncodePretty(result)))
}
func (this *Server) ListDir(w http.ResponseWriter, r *http.Request) {
var (
result JsonResult
@ -3178,6 +3269,7 @@ func (this *Server) ListDir(w http.ResponseWriter, r *http.Request) {
}
filesResult = append(filesResult, fi)
}
result.Status = "ok"
result.Data = filesResult
w.Write([]byte(this.util.JsonEncodePretty(result)))
return
@ -3441,6 +3533,7 @@ func init() {
CONST_LOG_LEVELDB_FILE_NAME = DATA_DIR + "/log.db"
CONST_STAT_FILE_NAME = DATA_DIR + "/stat.json"
CONST_CONF_FILE_NAME = CONF_DIR + "/cfg.json"
CONST_SEARCH_FILE_NAME = DATA_DIR + "/search.txt"
FOLDERS = []string{DATA_DIR, STORE_DIR, CONF_DIR, STATIC_DIR}
logAccessConfigStr = strings.Replace(logAccessConfigStr, "{DOCKER_DIR}", DOCKER_DIR, -1)
logConfigStr = strings.Replace(logConfigStr, "{DOCKER_DIR}", DOCKER_DIR, -1)
@ -3792,6 +3885,7 @@ func (this *Server) Main() {
go this.ConsumerPostToPeer()
go this.ConsumerLog()
go this.ConsumerDownLoad()
go this.LoadSearchDict()
if Config().EnableMigrate {
go this.RepairFileInfoFromFile()
}
@ -3826,6 +3920,7 @@ func (this *Server) Main() {
http.HandleFunc(fmt.Sprintf("%s/repair", groupRoute), this.Repair)
http.HandleFunc(fmt.Sprintf("%s/report", groupRoute), this.Report)
http.HandleFunc(fmt.Sprintf("%s/backup", groupRoute), this.BackUp)
http.HandleFunc(fmt.Sprintf("%s/search", groupRoute), this.Search)
http.HandleFunc(fmt.Sprintf("%s/list_dir", groupRoute), this.ListDir)
http.HandleFunc(fmt.Sprintf("%s/remove_empty_dir", groupRoute), this.RemoveEmptyDir)
http.HandleFunc(fmt.Sprintf("%s/repair_fileinfo", groupRoute), this.RepairFileInfo)