add logDB

This commit is contained in:
s_jqzhang 2019-02-24 14:09:06 +08:00
parent 39c668f3bc
commit aaee635c0d

View File

@ -76,7 +76,8 @@ var (
LARGE_DIR = STORE_DIR + "/haystack"
CONST_LEVELDB_FILE_NAME = DATA_DIR + "/fileserver.db"
CONST_LEVELDB_FILE_NAME = DATA_DIR + "/fileserver.db"
CONST_LOG_LEVELDB_FILE_NAME = DATA_DIR + "/log.db"
CONST_STAT_FILE_NAME = DATA_DIR + "/stat.json"
@ -188,6 +189,7 @@ type Common struct {
type Server struct {
ldb *leveldb.DB
logDB *leveldb.DB
util *Common
statMap *CommonMap
sumMap *CommonMap //map[string]mapset.Set
@ -278,7 +280,6 @@ type GloablConfig struct {
func NewServer() *Server {
var (
ldb *leveldb.DB
server *Server
err error
)
@ -307,18 +308,19 @@ func NewServer() *Server {
server.curDate = server.util.GetToDay()
//o := &opt.Options{
// Filter: filter.NewBloomFilter(160),
//
//}
ldb, err = leveldb.OpenFile(CONST_LEVELDB_FILE_NAME, nil)
server.ldb, err = leveldb.OpenFile(CONST_LEVELDB_FILE_NAME, nil)
if err != nil {
fmt.Println(err)
panic(err)
log.Error(err)
}
server.logDB, err = leveldb.OpenFile(CONST_LOG_LEVELDB_FILE_NAME, nil)
if err != nil {
fmt.Println(err)
panic(err)
log.Error(err)
}
server.ldb = ldb
return server
@ -947,7 +949,7 @@ func (this *Server) RepairStatByDate(date string) {
)
keyPrefix = "%s_%s_"
keyPrefix = fmt.Sprintf(keyPrefix, date, CONST_FILE_Md5_FILE_NAME)
iter := server.ldb.NewIterator(util.BytesPrefix([]byte(keyPrefix)), nil)
iter := server.logDB.NewIterator(util.BytesPrefix([]byte(keyPrefix)), nil)
for iter.Next() {
if err = json.Unmarshal(iter.Value(), &fileInfo); err != nil {
continue
@ -964,14 +966,46 @@ func (this *Server) RepairStatByDate(date string) {
func (this *Server) CheckFileExistByMd5(md5s string, fileInfo *FileInfo) bool {
var (
err error
info *FileInfo
err error
info *FileInfo
fn string
name string
offset int64
data []byte
)
if info, err = this.GetFileInfoFromLevelDB(md5s); err != nil {
return false
}
if info == nil || info.Md5 == "" {
return false
}
fn = info.Name
if info.ReName != "" {
fn = info.ReName
}
if info.OffSet == -1 {
if this.util.FileExists(DOCKER_DIR + info.Path + "/" + fn) {
return true
}
} else { //small file
if name, offset, _, err = this.ParseSmallFile(fn); err != nil {
return false
}
if !this.util.FileExists(DOCKER_DIR + info.Path + "/" + name) {
return false
}
if data, err = this.util.ReadFileByOffSet(DOCKER_DIR+info.Path+"/"+name, offset, 1); err != nil {
return false
}
if data[0] == '1' {
return true
}
}
if info != nil && info.Md5 != "" {
if fileInfo != nil {
if fileInfo.Path != info.Path {
@ -985,6 +1019,40 @@ func (this *Server) CheckFileExistByMd5(md5s string, fileInfo *FileInfo) bool {
}
func (this *Server) ParseSmallFile(filename string) (string, int64, int, error) {
var (
err error
offset int64
length int
)
err = errors.New("unvalid small file")
if len(filename) < 3 {
return filename, -1, -1, err
}
if strings.Contains(filename, "/") {
filename = filename[strings.LastIndex(filename, "/")+1:]
}
pos := strings.Split(filename, ",")
if len(pos) < 3 {
return filename, -1, -1, err
}
offset, err = strconv.ParseInt(pos[1], 10, 64)
if err != nil {
return filename, -1, -1, err
}
if length, err = strconv.Atoi(pos[2]); err != nil {
return filename, offset, -1, err
}
if length > CONST_SMALL_FILE_SIZE || offset < 0 {
err = errors.New("invalid filesize or offset")
return filename, -1, -1, err
}
return pos[0], offset, length, nil
}
func (this *Server) DownloadFromPeer(peer string, fileInfo *FileInfo) {
var (
err error
@ -1000,7 +1068,7 @@ func (this *Server) DownloadFromPeer(peer string, fileInfo *FileInfo) {
filename = fileInfo.ReName
}
if this.util.FileExists(DOCKER_DIR+fileInfo.Path+"/"+filename) && this.CheckFileExistByMd5(fileInfo.Md5, fileInfo) {
if this.CheckFileExistByMd5(fileInfo.Md5, fileInfo) {
return
}
@ -1203,23 +1271,9 @@ func (this *Server) Download(w http.ResponseWriter, r *http.Request) {
}
if isSmallFile {
pos := strings.Split(r.RequestURI, ",")
if len(pos) < 3 {
w.Write([]byte("(error) uri invalid"))
return
}
offset, err = strconv.ParseInt(pos[1], 10, 64)
if err != nil {
log.Error(err)
return
}
if length, err = strconv.Atoi(pos[2]); err != nil {
log.Error(err)
return
}
if length > CONST_SMALL_FILE_SIZE || offset < 0 {
log.Warn("invalid filesize or offset")
if _, offset, length, err = this.ParseSmallFile(r.RequestURI); err != nil {
log.Error(err)
return
}
@ -1248,7 +1302,7 @@ func (this *Server) Download(w http.ResponseWriter, r *http.Request) {
}
}
NotFound:
if info, err = os.Stat(fullpath); err != nil || info.Size() == 0 {
if info, err = os.Stat(fullpath); err != nil || info.Size() == 0 || notFound {
log.Error(err)
if isSmallFile && notFound {
pathMd5 = this.util.MD5(smallPath)
@ -1411,7 +1465,7 @@ func (this *Server) postFileToPeer(fileInfo *FileInfo) {
fileInfo.Peers = append(fileInfo.Peers, peer)
if _, err = this.SaveFileInfoToLevelDB(fileInfo.Md5, fileInfo); err != nil {
if _, err = this.SaveFileInfoToLevelDB(fileInfo.Md5, fileInfo, this.ldb); err != nil {
log.Error(err)
}
@ -1443,7 +1497,7 @@ func (this *Server) postFileToPeer(fileInfo *FileInfo) {
if !this.util.Contains(peer, fileInfo.Peers) {
fileInfo.Peers = append(fileInfo.Peers, peer)
if _, err = this.SaveFileInfoToLevelDB(fileInfo.Md5, fileInfo); err != nil {
if _, err = this.SaveFileInfoToLevelDB(fileInfo.Md5, fileInfo, this.ldb); err != nil {
log.Error(err)
}
}
@ -1498,14 +1552,14 @@ func (this *Server) saveFileMd5Log(fileInfo *FileInfo, filename string) {
logKey = fmt.Sprintf("%s_%s_%s", logDate, filename, fileInfo.Md5)
if filename == CONST_FILE_Md5_FILE_NAME {
if ok, err = this.IsExistFromLevelDB(fileInfo.Md5); !ok {
if ok, err = this.IsExistFromLevelDB(fileInfo.Md5, this.ldb); !ok {
this.SaveFileInfoToLevelDB(logKey, fileInfo)
this.SaveFileInfoToLevelDB(logKey, fileInfo, this.logDB)
if _, err := this.SaveFileInfoToLevelDB(fileInfo.Md5, fileInfo); err != nil {
if _, err := this.SaveFileInfoToLevelDB(fileInfo.Md5, fileInfo, this.ldb); err != nil {
log.Error("saveToLevelDB", err, fileInfo)
}
if _, err = this.SaveFileInfoToLevelDB(this.util.MD5(fullpath), fileInfo); err != nil {
if _, err = this.SaveFileInfoToLevelDB(this.util.MD5(fullpath), fileInfo, this.ldb); err != nil {
log.Error("saveToLevelDB", err, fileInfo)
}
this.statMap.AddCountInt64(logDate+"_"+CONST_STAT_FILE_COUNT_KEY, 1)
@ -1519,13 +1573,13 @@ func (this *Server) saveFileMd5Log(fileInfo *FileInfo, filename string) {
return
}
if filename == CONST_REMOME_Md5_FILE_NAME {
if ok, err = this.IsExistFromLevelDB(fileInfo.Md5); ok {
this.RemoveKeyFromLevelDB(logKey)
if ok, err = this.IsExistFromLevelDB(fileInfo.Md5, this.ldb); ok {
this.RemoveKeyFromLevelDB(logKey, this.logDB)
md5Path = this.util.MD5(fullpath)
if err := this.RemoveKeyFromLevelDB(fileInfo.Md5); err != nil {
if err := this.RemoveKeyFromLevelDB(fileInfo.Md5, this.ldb); err != nil {
log.Error("RemoveKeyFromLevelDB", err, fileInfo)
}
if err = this.RemoveKeyFromLevelDB(md5Path); err != nil {
if err = this.RemoveKeyFromLevelDB(md5Path, this.ldb); err != nil {
log.Error("RemoveKeyFromLevelDB", err, fileInfo)
}
this.statMap.AddCountInt64(logDate+"_"+CONST_STAT_FILE_COUNT_KEY, -1)
@ -1536,8 +1590,11 @@ func (this *Server) saveFileMd5Log(fileInfo *FileInfo, filename string) {
}
return
}
this.SaveFileInfoToLevelDB(logKey, fileInfo)
this.SaveFileInfoToLevelDB(logKey, fileInfo, this.logDB)
if filename == CONST_Md5_QUEUE_FILE_NAME {
this.queueFromPeers <- *fileInfo
return
}
}
@ -1597,7 +1654,7 @@ func (this *Server) CheckFileExist(w http.ResponseWriter, r *http.Request) {
}
} else {
if fileInfo.OffSet == -1 {
this.RemoveKeyFromLevelDB(md5sum) // when file delete,delete from leveldb
this.RemoveKeyFromLevelDB(md5sum, this.ldb) // when file delete,delete from leveldb
}
}
}
@ -1657,9 +1714,9 @@ func (this *Server) Sync(w http.ResponseWriter, r *http.Request) {
}
func (this *Server) IsExistFromLevelDB(key string) (bool, error) {
func (this *Server) IsExistFromLevelDB(key string, db *leveldb.DB) (bool, error) {
return this.ldb.Has([]byte(key), nil)
return db.Has([]byte(key), nil)
}
func (this *Server) GetFileInfoFromLevelDB(key string) (*FileInfo, error) {
@ -1718,17 +1775,17 @@ func (this *Server) SaveStat() {
}
func (this *Server) RemoveKeyFromLevelDB(key string) (error) {
func (this *Server) RemoveKeyFromLevelDB(key string, db *leveldb.DB) (error) {
var (
err error
)
err = this.ldb.Delete([]byte(key), nil)
err = db.Delete([]byte(key), nil)
return err
}
func (this *Server) SaveFileInfoToLevelDB(key string, fileInfo *FileInfo) (*FileInfo, error) {
func (this *Server) SaveFileInfoToLevelDB(key string, fileInfo *FileInfo, db *leveldb.DB) (*FileInfo, error) {
var (
err error
data []byte
@ -1739,7 +1796,7 @@ func (this *Server) SaveFileInfoToLevelDB(key string, fileInfo *FileInfo) (*File
return fileInfo, err
}
if err = this.ldb.Put([]byte(key), data, nil); err != nil {
if err = db.Put([]byte(key), data, nil); err != nil {
return fileInfo, err
}
@ -1928,15 +1985,15 @@ func (this *Server) GetMd5sByDate(date string, filename string) (mapset.Set, err
var (
keyPrefix string
md5set mapset.Set
keys []string
keys []string
)
md5set = mapset.NewSet()
keyPrefix = "%s_%s_"
keyPrefix = fmt.Sprintf(keyPrefix, date, filename)
iter := server.ldb.NewIterator(util.BytesPrefix([]byte(keyPrefix)), nil)
iter := server.logDB.NewIterator(util.BytesPrefix([]byte(keyPrefix)), nil)
for iter.Next() {
keys=strings.Split(string(iter.Key()),"_")
if len(keys)>=3 {
keys = strings.Split(string(iter.Key()), "_")
if len(keys) >= 3 {
md5set.Add(keys[2])
}
}
@ -2529,11 +2586,11 @@ func (this *Server) RepairStatWeb(w http.ResponseWriter, r *http.Request) {
var (
result JsonResult
date string
date string
)
date=r.FormValue("date")
if date=="" {
date=this.util.GetToDay()
date = r.FormValue("date")
if date == "" {
date = this.util.GetToDay()
}
this.RepairStatByDate(date)
result.Status = "ok"
@ -2830,47 +2887,51 @@ func (this *Server) AutoRepair(forceRepair bool) {
AutoRepairFunc(forceRepair)
}
func (this *Server) CleanMd5SumCache() {
func (this *Server) CleanLevelDBByDate(date string, filename string) {
defer func() {
if re := recover(); re != nil {
buffer := debug.Stack()
log.Error("CleanLevelDBByDate")
log.Error(re)
log.Error(string(buffer))
Clean := func() {
}
}()
defer func() {
if re := recover(); re != nil {
buffer := debug.Stack()
log.Error("Check")
log.Error(re)
log.Error(string(buffer))
}
}()
var (
today string
memstat *runtime.MemStats
keys []string
)
memstat = new(runtime.MemStats)
runtime.ReadMemStats(memstat)
_ = memstat
today = this.util.GetToDay()
_ = today
keys = this.sumMap.Keys()
for _, k := range keys {
if strings.HasPrefix(k, today) {
continue
}
if v, ok := this.sumMap.GetValue(k); ok {
v.(mapset.Set).Clear()
}
var (
err error
keyPrefix string
keys mapset.Set
)
keys = mapset.NewSet()
keyPrefix = "%s_%s_"
keyPrefix = fmt.Sprintf(keyPrefix, date, filename)
iter := server.ldb.NewIterator(util.BytesPrefix([]byte(keyPrefix)), nil)
for iter.Next() {
keys.Add(string(iter.Value()))
}
iter.Release()
for key := range keys.Iter() {
err = this.RemoveKeyFromLevelDB(key.(string), this.logDB)
if err != nil {
log.Error(err)
}
}
}
func (this *Server) CleanYesterdayLog() {
Clean := func() {
var (
keys []string
yesterday string
)
keys = []string{CONST_Md5_QUEUE_FILE_NAME, CONST_Md5_ERROR_FILE_NAME, CONST_REMOME_Md5_FILE_NAME}
yesterday = this.util.GetDayFromTimeStamp(time.Now().AddDate(0, 0, -1).Unix())
for _, key := range keys {
this.CleanLevelDBByDate(yesterday, key)
}
}
go func() {
for {
time.Sleep(time.Minute * 10)
@ -2881,6 +2942,50 @@ func (this *Server) CleanMd5SumCache() {
}
func (this *Server) LoadFileInfoByDate(date string, filename string) (mapset.Set, error) {
defer func() {
if re := recover(); re != nil {
buffer := debug.Stack()
log.Error("LoadFileInfoByDate")
log.Error(re)
log.Error(string(buffer))
}
}()
var (
err error
keyPrefix string
fileInfos mapset.Set
)
fileInfos = mapset.NewSet()
keyPrefix = "%s_%s_"
keyPrefix = fmt.Sprintf(keyPrefix, date, filename)
iter := server.logDB.NewIterator(util.BytesPrefix([]byte(keyPrefix)), nil)
for iter.Next() {
var fileInfo FileInfo
if err = json.Unmarshal(iter.Value(), &fileInfo); err != nil {
continue
}
fileInfos.Add(&fileInfo)
}
iter.Release()
return fileInfos, nil
}
func (this *Server) LoadQueue() {
if queue, err := this.LoadFileInfoByDate(this.util.GetToDay(), CONST_Md5_QUEUE_FILE_NAME); err != nil {
log.Error(err)
} else {
for fileInfo := range queue.Iter() {
this.queueFromPeers <- *fileInfo.(*FileInfo)
}
}
}
func (this *Server) Check() {
check := func() {
@ -3216,6 +3321,7 @@ func init() {
LARGE_DIR_NAME = "haystack"
LARGE_DIR = STORE_DIR + "/haystack"
CONST_LEVELDB_FILE_NAME = DATA_DIR + "/fileserver.db"
CONST_LOG_LEVELDB_FILE_NAME = DATA_DIR + "/log.db"
CONST_STAT_FILE_NAME = DATA_DIR + "/stat.json"
CONST_CONF_FILE_NAME = CONF_DIR + "/cfg.json"
FOLDERS = []string{DATA_DIR, STORE_DIR, CONF_DIR}
@ -3455,7 +3561,7 @@ func (this *Server) initTus() {
log.Error(err)
} else {
if fi.Md5 != "" {
if _, err := this.SaveFileInfoToLevelDB(info.ID, fi); err != nil {
if _, err := this.SaveFileInfoToLevelDB(info.ID, fi, this.ldb); err != nil {
log.Error(err)
}
log.Info(fmt.Sprintf("file is found md5:%s", fi.Md5))
@ -3485,7 +3591,7 @@ func (this *Server) initTus() {
}
os.Remove(infoFullPath)
this.postFileToPeer(fileInfo)
this.SaveFileInfoToLevelDB(info.ID, fileInfo) //add fileId to ldb
this.SaveFileInfoToLevelDB(info.ID, fileInfo, this.ldb) //add fileId to ldb
this.SaveFileMd5Log(fileInfo, CONST_FILE_Md5_FILE_NAME)
}
}
@ -3627,8 +3733,9 @@ func (this *Server) Main() {
}
}()
go this.CleanMd5SumCache()
go this.CleanYesterdayLog()
go this.Check()
go this.LoadQueue()
go this.Consumer()
go this.ConsumerLog()
go this.ConsumerDownLoad()