go-fastdfs/fileserver.go

4528 lines
126 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package main
import (
"bufio"
"bytes"
"errors"
"flag"
"fmt"
"image"
"image/jpeg"
"image/png"
"io"
"io/ioutil"
slog "log"
random "math/rand"
"mime/multipart"
"net"
"net/http"
_ "net/http/pprof"
"net/smtp"
"net/url"
"os"
"os/signal"
"path"
"path/filepath"
"regexp"
"runtime"
"runtime/debug"
"strconv"
"strings"
"sync"
"sync/atomic"
"syscall"
"time"
"unsafe"
"github.com/astaxie/beego/httplib"
mapset "github.com/deckarep/golang-set"
_ "github.com/eventials/go-tus"
"github.com/garyburd/redigo/redis"
jsoniter "github.com/json-iterator/go"
"github.com/nfnt/resize"
"github.com/radovskyb/watcher"
"github.com/shirou/gopsutil/disk"
"github.com/shirou/gopsutil/mem"
"github.com/sjqzhang/googleAuthenticator"
"github.com/sjqzhang/goutil"
log "github.com/sjqzhang/seelog"
"github.com/sjqzhang/tusd"
"github.com/sjqzhang/tusd/filestore"
"github.com/syndtr/goleveldb/leveldb"
"github.com/syndtr/goleveldb/leveldb/opt"
"github.com/syndtr/goleveldb/leveldb/util"
)
var staticHandler http.Handler
var json = jsoniter.ConfigCompatibleWithStandardLibrary
var server *Server = nil
var logacc log.LoggerInterface
var FOLDERS = []string{DATA_DIR, STORE_DIR, CONF_DIR, STATIC_DIR}
var CONST_QUEUE_SIZE = 10000
var (
VERSION string
BUILD_TIME string
GO_VERSION string
GIT_VERSION string
v = flag.Bool("v", false, "display version")
)
var (
FileName string
ptr unsafe.Pointer
DOCKER_DIR = ""
STORE_DIR = STORE_DIR_NAME
CONF_DIR = CONF_DIR_NAME
LOG_DIR = LOG_DIR_NAME
DATA_DIR = DATA_DIR_NAME
STATIC_DIR = STATIC_DIR_NAME
LARGE_DIR_NAME = "haystack"
LARGE_DIR = STORE_DIR + "/haystack"
CONST_LEVELDB_FILE_NAME = DATA_DIR + "/fileserver.db"
CONST_LOG_LEVELDB_FILE_NAME = DATA_DIR + "/log.db"
CONST_STAT_FILE_NAME = DATA_DIR + "/stat.json"
CONST_CONF_FILE_NAME = CONF_DIR + "/cfg.json"
CONST_SERVER_CRT_FILE_NAME = CONF_DIR + "/server.crt"
CONST_SERVER_KEY_FILE_NAME = CONF_DIR + "/server.key"
CONST_SEARCH_FILE_NAME = DATA_DIR + "/search.txt"
CONST_UPLOAD_COUNTER_KEY = "__CONST_UPLOAD_COUNTER_KEY__"
logConfigStr = `
<seelog type="asynctimer" asyncinterval="1000" minlevel="trace" maxlevel="error">
<outputs formatid="common">
<buffered formatid="common" size="1048576" flushperiod="1000">
<rollingfile type="size" filename="{DOCKER_DIR}log/fileserver.log" maxsize="104857600" maxrolls="10"/>
</buffered>
</outputs>
<formats>
<format id="common" format="%Date %Time [%LEV] [%File:%Line] [%Func] %Msg%n" />
</formats>
</seelog>
`
logAccessConfigStr = `
<seelog type="asynctimer" asyncinterval="1000" minlevel="trace" maxlevel="error">
<outputs formatid="common">
<buffered formatid="common" size="1048576" flushperiod="1000">
<rollingfile type="size" filename="{DOCKER_DIR}log/access.log" maxsize="104857600" maxrolls="10"/>
</buffered>
</outputs>
<formats>
<format id="common" format="%Date %Time [%LEV] [%File:%Line] [%Func] %Msg%n" />
</formats>
</seelog>
`
)
const (
STORE_DIR_NAME = "files"
LOG_DIR_NAME = "log"
DATA_DIR_NAME = "data"
CONF_DIR_NAME = "conf"
STATIC_DIR_NAME = "static"
CONST_STAT_FILE_COUNT_KEY = "fileCount"
CONST_BIG_UPLOAD_PATH_SUFFIX = "/big/upload/"
CONST_STAT_FILE_TOTAL_SIZE_KEY = "totalSize"
CONST_Md5_ERROR_FILE_NAME = "errors.md5"
CONST_Md5_QUEUE_FILE_NAME = "queue.md5"
CONST_FILE_Md5_FILE_NAME = "files.md5"
CONST_REMOME_Md5_FILE_NAME = "removes.md5"
CONST_SMALL_FILE_SIZE = 1024 * 1024
CONST_MESSAGE_CLUSTER_IP = "Can only be called by the cluster ip or 127.0.0.1 or admin_ips(cfg.json),current ip:%s"
cfgJson = `{
"绑定端号": "端口",
"addr": ":8080",
"是否开启https": "默认不开启如需启开启请在conf目录中增加证书文件 server.crt 私钥 文件 server.key",
"enable_https": false,
"PeerID": "集群内唯一,请使用0-9的单字符默认自动生成",
"peer_id": "%s",
"本主机地址": "本机http地址,默认自动生成(注意端口必须与addr中的端口一致必段为内网自动生成不为内网请自行修改下同",
"host": "%s",
"集群": "集群列表,注意为了高可用IP必须不能是同一个,同一不会自动备份且不能为127.0.0.1,且必须为内网IP默认自动生成",
"peers": ["%s"],
"组号": "用于区别不同的集群(上传或下载)与support_group_manage配合使用,带在下载路径中",
"group": "group1",
"是否支持按组(集群)管理,主要用途是Nginx支持多集群": "默认支持,不支持时路径为http://10.1.5.4:8080/action,支持时为http://10.1.5.4:8080/group(配置中的group参数)/action,action为动作名如status,delete,sync等",
"support_group_manage": true,
"是否合并小文件": "默认不合并,合并可以解决inode不够用的情况当前对于小于1M文件进行合并",
"enable_merge_small_file": false,
"允许后缀名": "允许可以上传的文件后缀名如jpg,jpeg,png等。留空允许所有。",
"extensions": [],
"重试同步失败文件的时间": "单位秒",
"refresh_interval": 1800,
"是否自动重命名": "默认不自动重命名,使用原文件名",
"rename_file": false,
"是否支持web上传,方便调试": "默认支持web上传",
"enable_web_upload": true,
"是否支持非日期路径": "默认支持非日期路径,也即支持自定义路径,需要上传文件时指定path",
"enable_custom_path": true,
"下载域名": "用于外网下载文件的域名,不包含http://",
"download_domain": "",
"场景列表": "当设定后,用户指的场景必项在列表中,默认不做限制(注意:如果想开启场景认功能,格式如下:'场景名:googleauth_secret' 如 default:N7IET373HB2C5M6D ",
"scenes": [],
"默认场景": "默认default",
"default_scene": "default",
"是否显示目录": "默认显示,方便调试用,上线时请关闭",
"show_dir": true,
"redis配置": "用于保存文件元信息",
"邮件配置": "",
"mail": {
"user": "abc@163.com",
"password": "abc",
"host": "smtp.163.com:25"
},
"告警接收邮件列表": "接收人数组",
"alarm_receivers": [],
"告警接收URL": "方法post,参数:subject,message",
"alarm_url": "",
"下载是否需带token": "真假",
"download_use_token": false,
"下载token过期时间": "单位秒",
"download_token_expire": 600,
"是否自动修复": "在超过1亿文件时出现性能问题取消此选项请手动按天同步请查看FAQ",
"auto_repair": true,
"文件去重算法md5可能存在冲突默认md5": "sha1|md5",
"file_sum_arithmetic": "md5",
"管理ip列表": "用于管理集的ip白名单,",
"admin_ips": ["127.0.0.1"],
"是否启用迁移": "默认不启用",
"enable_migrate": false,
"文件是否去重": "默认去重",
"enable_distinct_file": true,
"图片是否缩放": "默认是",
"enable_image_resize": true,
"是否开启跨站访问": "默认开启",
"enable_cross_origin": true,
"是否开启Google认证实现安全的上传、下载": "默认不开启",
"enable_google_auth": false,
"认证url": "当url不为空时生效,注意:普通上传中使用http参数 auth_token 作为认证参数, 在断点续传中通过HTTP头Upload-Metadata中的auth_token作为认证参数,认证流程参考认证架构图",
"auth_url": "",
"下载是否认证": "默认不认证(注意此选项是在auth_url不为空的情况下生效)",
"enable_download_auth": false,
"默认是否下载": "默认下载",
"default_download": true,
"本机是否只读": "默认可读可写",
"read_only": false,
"是否开启断点续传": "默认开启",
"enable_tus": true,
"同步单一文件超时时间(单位秒)": "默认为0,程序自动计算,在特殊情况下,自已设定",
"sync_timeout": 0
}
`
)
type Server struct {
ldb *leveldb.DB
logDB *leveldb.DB
util *goutil.Common
statMap *goutil.CommonMap
sumMap *goutil.CommonMap
rtMap *goutil.CommonMap
queueToPeers chan FileInfo
queueFromPeers chan FileInfo
queueFileLog chan *FileLog
queueUpload chan WrapReqResp
lockMap *goutil.CommonMap
sceneMap *goutil.CommonMap
searchMap *goutil.CommonMap
rp *redis.Pool
curDate string
host string
routers map[*regexp.Regexp]func(http.ResponseWriter, *http.Request)
}
type Redis struct {
Address string `json:"address"`
Pwd string `json:"pwd"`
MaxIdle int `json:"maxIdle"`
MaxActive int `json:"maxActive"`
IdleTimeout int `json:"idleTimeout"`
ConnectTimeout int `json:"connectTimeout"`
DB int `json:"db"`
}
type FileInfo struct {
Name string `json:"name"`
ReName string `json:"rename"`
Path string `json:"path"`
Md5 string `json:"md5"`
Size int64 `json:"size"`
Peers []string `json:"peers"`
Scene string `json:"scene"`
TimeStamp int64 `json:"timeStamp"`
OffSet int64 `json:"offset"`
retry int
op string
}
type FileLog struct {
FileInfo *FileInfo
FileName string
}
type WrapReqResp struct {
w *http.ResponseWriter
r *http.Request
done chan bool
}
type JsonResult struct {
Message string `json:"message"`
Status string `json:"status"`
Data interface{} `json:"data"`
}
type FileResult struct {
Url string `json:"url"`
Md5 string `json:"md5"`
Path string `json:"path"`
Domain string `json:"domain"`
Scene string `json:"scene"`
Size int64 `json:"size"`
ModTime int64 `json:"mtime"`
//Just for Compatibility
Scenes string `json:"scenes"`
Retmsg string `json:"retmsg"`
Retcode int `json:"retcode"`
Src string `json:"src"`
}
type Mail struct {
User string `json:"user"`
Password string `json:"password"`
Host string `json:"host"`
}
type StatDateFileInfo struct {
Date string `json:"date"`
TotalSize int64 `json:"totalSize"`
FileCount int64 `json:"fileCount"`
}
type GloablConfig struct {
Addr string `json:"addr"`
Peers []string `json:"peers"`
EnableHttps bool `json:"enable_https"`
Group string `json:"group"`
RenameFile bool `json:"rename_file"`
ShowDir bool `json:"show_dir"`
Extensions []string `json:"extensions"`
RefreshInterval int `json:"refresh_interval"`
EnableWebUpload bool `json:"enable_web_upload"`
DownloadDomain string `json:"download_domain"`
EnableCustomPath bool `json:"enable_custom_path"`
Scenes []string `json:"scenes"`
AlarmReceivers []string `json:"alarm_receivers"`
DefaultScene string `json:"default_scene"`
Mail Mail `json:"mail"`
AlarmUrl string `json:"alarm_url"`
DownloadUseToken bool `json:"download_use_token"`
DownloadTokenExpire int `json:"download_token_expire"`
QueueSize int `json:"queue_size"`
AutoRepair bool `json:"auto_repair"`
Host string `json:"host"`
FileSumArithmetic string `json:"file_sum_arithmetic"`
PeerId string `json:"peer_id"`
SupportGroupManage bool `json:"support_group_manage"`
AdminIps []string `json:"admin_ips"`
EnableMergeSmallFile bool `json:"enable_merge_small_file"`
EnableMigrate bool `json:"enable_migrate"`
EnableDistinctFile bool `json:"enable_distinct_file"`
ReadOnly bool `json:"read_only"`
EnableImageResize bool `json:"enable_image_resize"`
EnableCrossOrigin bool `json:"enable_cross_origin"`
EnableGoogleAuth bool `json:"enable_google_auth"`
AuthUrl string `json:"auth_url"`
EnableDownloadAuth bool `json:"enable_download_auth"`
DefaultDownload bool `json:"default_download"`
EnableTus bool `json:"enable_tus"`
SyncTimeout int64 `json:"sync_timeout"`
EnableFsnotify bool `json:"enable_fsnotify"`
EnableDiskCache bool `json:"enable_disk_cache"`
ConnectTimeout bool `json:"connect_timeout"`
ReadTimeout int `json:"read_timeout"`
WriteTimeout int `json:"write_timeout"`
IdleTimeout int `json:"idle_timeout"`
ReadHeaderTimeout int `json:"read_header_timeout"`
SyncWorker int `json:"sync_worker"`
UploadWorker int `json:"upload_worker"`
UploadQueueSize int `json:"upload_queue_size"`
RetryCount int `json:"retry_count"`
SyncDelay int64 `json:"sync_delay"`
WatchChanSize int `json:"watch_chan_size"`
Redis Redis `json:"redis"`
EnableRedis bool `json:"enable_redis"`
}
type FileInfoResult struct {
Name string `json:"name"`
Md5 string `json:"md5"`
Path string `json:"path"`
Size int64 `json:"size"`
ModTime int64 `json:"mtime"`
IsDir bool `json:"is_dir"`
}
func NewServer() *Server {
var (
//server *Server
err error
)
if server != nil {
return server
}
server = &Server{
util: &goutil.Common{},
statMap: goutil.NewCommonMap(0),
lockMap: goutil.NewCommonMap(0),
rtMap: goutil.NewCommonMap(0),
sceneMap: goutil.NewCommonMap(0),
searchMap: goutil.NewCommonMap(0),
queueToPeers: make(chan FileInfo, CONST_QUEUE_SIZE),
queueFromPeers: make(chan FileInfo, CONST_QUEUE_SIZE),
queueFileLog: make(chan *FileLog, CONST_QUEUE_SIZE),
queueUpload: make(chan WrapReqResp, 100),
sumMap: goutil.NewCommonMap(365 * 3),
routers: make(map[*regexp.Regexp]func(http.ResponseWriter, *http.Request)),
}
defaultTransport := &http.Transport{
DisableKeepAlives: true,
Dial: httplib.TimeoutDialer(time.Second*15, time.Second*300),
MaxIdleConns: 100,
MaxIdleConnsPerHost: 100,
}
settins := httplib.BeegoHTTPSettings{
UserAgent: "Go-FastDFS",
ConnectTimeout: 15 * time.Second,
ReadWriteTimeout: 15 * time.Second,
Gzip: true,
DumpBody: true,
Transport: defaultTransport,
}
httplib.SetDefaultSetting(settins)
server.statMap.Put(CONST_STAT_FILE_COUNT_KEY, int64(0))
server.statMap.Put(CONST_STAT_FILE_TOTAL_SIZE_KEY, int64(0))
server.statMap.Put(server.util.GetToDay()+"_"+CONST_STAT_FILE_COUNT_KEY, int64(0))
server.statMap.Put(server.util.GetToDay()+"_"+CONST_STAT_FILE_TOTAL_SIZE_KEY, int64(0))
server.curDate = server.util.GetToDay()
opts := &opt.Options{
CompactionTableSize: 1024 * 1024 * 20,
WriteBuffer: 1024 * 1024 * 20,
}
server.ldb, err = leveldb.OpenFile(CONST_LEVELDB_FILE_NAME, opts)
if err != nil {
fmt.Println(fmt.Sprintf("open db file %s fail,maybe has opening", CONST_LEVELDB_FILE_NAME))
log.Error(err)
panic(err)
}
server.logDB, err = leveldb.OpenFile(CONST_LOG_LEVELDB_FILE_NAME, opts)
if err != nil {
fmt.Println(fmt.Sprintf("open db file %s fail,maybe has opening", CONST_LOG_LEVELDB_FILE_NAME))
log.Error(err)
panic(err)
}
return server
}
func Config() *GloablConfig {
return (*GloablConfig)(atomic.LoadPointer(&ptr))
}
func ParseConfig(filePath string) {
var (
data []byte
)
if filePath == "" {
data = []byte(strings.TrimSpace(cfgJson))
} else {
file, err := os.Open(filePath)
if err != nil {
panic(fmt.Sprintln("open file path:", filePath, "error:", err))
}
defer file.Close()
FileName = filePath
data, err = ioutil.ReadAll(file)
if err != nil {
panic(fmt.Sprintln("file path:", filePath, " read all error:", err))
}
}
var c GloablConfig
if err := json.Unmarshal(data, &c); err != nil {
panic(fmt.Sprintln("file path:", filePath, "json unmarshal error:", err))
}
log.Info(c)
atomic.StorePointer(&ptr, unsafe.Pointer(&c))
log.Info("config parse success")
}
func (this *Server) BackUpMetaDataByDate(date string) {
defer func() {
if re := recover(); re != nil {
buffer := debug.Stack()
log.Error("BackUpMetaDataByDate")
log.Error(re)
log.Error(string(buffer))
}
}()
var (
err error
keyPrefix string
msg string
name string
fileInfo FileInfo
logFileName string
fileLog *os.File
fileMeta *os.File
metaFileName string
fi os.FileInfo
)
logFileName = DATA_DIR + "/" + date + "/" + CONST_FILE_Md5_FILE_NAME
this.lockMap.LockKey(logFileName)
defer this.lockMap.UnLockKey(logFileName)
metaFileName = DATA_DIR + "/" + date + "/" + "meta.data"
os.MkdirAll(DATA_DIR+"/"+date, 0775)
if this.util.IsExist(logFileName) {
os.Remove(logFileName)
}
if this.util.IsExist(metaFileName) {
os.Remove(metaFileName)
}
fileLog, err = os.OpenFile(logFileName, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0664)
if err != nil {
log.Error(err)
return
}
defer fileLog.Close()
fileMeta, err = os.OpenFile(metaFileName, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0664)
if err != nil {
log.Error(err)
return
}
defer fileMeta.Close()
keyPrefix = "%s_%s_"
keyPrefix = fmt.Sprintf(keyPrefix, date, CONST_FILE_Md5_FILE_NAME)
iter := server.logDB.NewIterator(util.BytesPrefix([]byte(keyPrefix)), nil)
defer iter.Release()
for iter.Next() {
if err = json.Unmarshal(iter.Value(), &fileInfo); err != nil {
continue
}
name = fileInfo.Name
if fileInfo.ReName != "" {
name = fileInfo.ReName
}
msg = fmt.Sprintf("%s\t%s\n", fileInfo.Md5, string(iter.Value()))
if _, err = fileMeta.WriteString(msg); err != nil {
log.Error(err)
}
msg = fmt.Sprintf("%s\t%s\n", this.util.MD5(fileInfo.Path+"/"+name), string(iter.Value()))
if _, err = fileMeta.WriteString(msg); err != nil {
log.Error(err)
}
if Config().EnableRedis {
this.SaveFileInfoToRedis(fileInfo.Md5, &fileInfo)
this.SaveFileInfoToRedis(this.util.MD5(fileInfo.Path+"/"+name), &fileInfo)
}
msg = fmt.Sprintf("%s|%d|%d|%s\n", fileInfo.Md5, fileInfo.Size, fileInfo.TimeStamp, fileInfo.Path+"/"+name)
if _, err = fileLog.WriteString(msg); err != nil {
log.Error(err)
}
}
if fi, err = fileLog.Stat(); err != nil {
log.Error(err)
} else if fi.Size() == 0 {
fileLog.Close()
os.Remove(logFileName)
}
if fi, err = fileMeta.Stat(); err != nil {
log.Error(err)
} else if fi.Size() == 0 {
fileMeta.Close()
os.Remove(metaFileName)
}
}
func (this *Server) RepairFileInfoFromFile() {
var (
pathPrefix string
err error
fi os.FileInfo
)
defer func() {
if re := recover(); re != nil {
buffer := debug.Stack()
log.Error("RepairFileInfoFromFile")
log.Error(re)
log.Error(string(buffer))
}
}()
if this.lockMap.IsLock("RepairFileInfoFromFile") {
log.Warn("Lock RepairFileInfoFromFile")
return
}
this.lockMap.LockKey("RepairFileInfoFromFile")
defer this.lockMap.UnLockKey("RepairFileInfoFromFile")
handlefunc := func(file_path string, f os.FileInfo, err error) error {
var (
files []os.FileInfo
fi os.FileInfo
fileInfo FileInfo
sum string
pathMd5 string
)
if f.IsDir() {
files, err = ioutil.ReadDir(file_path)
if err != nil {
return err
}
for _, fi = range files {
if fi.IsDir() || fi.Size() == 0 {
continue
}
file_path = strings.Replace(file_path, "\\", "/", -1)
if DOCKER_DIR != "" {
file_path = strings.Replace(file_path, DOCKER_DIR, "", 1)
}
if pathPrefix != "" {
file_path = strings.Replace(file_path, pathPrefix, STORE_DIR_NAME, 1)
}
if strings.HasPrefix(file_path, STORE_DIR_NAME+"/"+LARGE_DIR_NAME) {
log.Info(fmt.Sprintf("ignore small file file %s", file_path+"/"+fi.Name()))
continue
}
pathMd5 = this.util.MD5(file_path + "/" + fi.Name())
//if finfo, _ := this.GetFileInfoFromLevelDB(pathMd5); finfo != nil && finfo.Md5 != "" {
// log.Info(fmt.Sprintf("exist ignore file %s", file_path+"/"+fi.Name()))
// continue
//}
//sum, err = this.util.GetFileSumByName(file_path+"/"+fi.Name(), Config().FileSumArithmetic)
sum = pathMd5
if err != nil {
log.Error(err)
continue
}
fileInfo = FileInfo{
Size: fi.Size(),
Name: fi.Name(),
Path: file_path,
Md5: sum,
TimeStamp: fi.ModTime().Unix(),
Peers: []string{this.host},
OffSet: -2,
}
//log.Info(fileInfo)
log.Info(file_path, "/", fi.Name())
this.AppendToQueue(&fileInfo)
//this.postFileToPeer(&fileInfo)
this.SaveFileInfoToLevelDB(fileInfo.Md5, &fileInfo, this.ldb)
//this.SaveFileMd5Log(&fileInfo, CONST_FILE_Md5_FILE_NAME)
}
}
return nil
}
pathname := STORE_DIR
pathPrefix, err = os.Readlink(pathname)
if err == nil {
//link
pathname = pathPrefix
if strings.HasSuffix(pathPrefix, "/") {
//bugfix fullpath
pathPrefix = pathPrefix[0 : len(pathPrefix)-1]
}
}
fi, err = os.Stat(pathname)
if err != nil {
log.Error(err)
}
if fi.IsDir() {
filepath.Walk(pathname, handlefunc)
}
log.Info("RepairFileInfoFromFile is finish.")
}
func (this *Server) WatchFilesChange() {
var (
w *watcher.Watcher
//fileInfo FileInfo
curDir string
err error
qchan chan *FileInfo
isLink bool
)
qchan = make(chan *FileInfo, Config().WatchChanSize)
w = watcher.New()
w.FilterOps(watcher.Create)
//w.FilterOps(watcher.Create, watcher.Remove)
curDir, err = filepath.Abs(filepath.Dir(STORE_DIR_NAME))
if err != nil {
log.Error(err)
}
go func() {
for {
select {
case event := <-w.Event:
if event.IsDir() {
continue
}
fpath := strings.Replace(event.Path, curDir+string(os.PathSeparator), "", 1)
if isLink {
fpath = strings.Replace(event.Path, curDir, STORE_DIR_NAME, 1)
}
fpath = strings.Replace(fpath, string(os.PathSeparator), "/", -1)
sum := this.util.MD5(fpath)
fileInfo := FileInfo{
Size: event.Size(),
Name: event.Name(),
Path: strings.TrimSuffix(fpath, "/"+event.Name()), // files/default/20190927/xxx
Md5: sum,
TimeStamp: event.ModTime().Unix(),
Peers: []string{this.host},
OffSet: -2,
op: event.Op.String(),
}
log.Info(fmt.Sprintf("WatchFilesChange op:%s path:%s", event.Op.String(), fpath))
qchan <- &fileInfo
//this.AppendToQueue(&fileInfo)
case err := <-w.Error:
log.Error(err)
case <-w.Closed:
return
}
}
}()
go func() {
for {
c := <-qchan
if time.Now().Unix()-c.TimeStamp < Config().SyncDelay {
qchan <- c
time.Sleep(time.Second * 1)
continue
} else {
//if c.op == watcher.Remove.String() {
// req := httplib.Post(fmt.Sprintf("%s%s?md5=%s", this.host, this.getRequestURI("delete"), c.Md5))
// req.Param("md5", c.Md5)
// req.SetTimeout(time.Second*5, time.Second*10)
// log.Infof(req.String())
//}
if c.op == watcher.Create.String() {
log.Info(fmt.Sprintf("Syncfile Add to Queue path:%s", c.Path+"/"+c.Name))
this.AppendToQueue(c)
this.SaveFileInfoToLevelDB(c.Md5, c, this.ldb)
}
}
}
}()
if dir, err := os.Readlink(STORE_DIR_NAME); err == nil {
if strings.HasSuffix(dir, string(os.PathSeparator)) {
dir = strings.TrimSuffix(dir, string(os.PathSeparator))
}
curDir = dir
isLink = true
if err := w.AddRecursive(dir); err != nil {
log.Error(err)
}
w.Ignore(dir + "/_tmp/")
w.Ignore(dir + "/" + LARGE_DIR_NAME + "/")
}
if err := w.AddRecursive("./" + STORE_DIR_NAME); err != nil {
log.Error(err)
}
w.Ignore("./" + STORE_DIR_NAME + "/_tmp/")
w.Ignore("./" + STORE_DIR_NAME + "/" + LARGE_DIR_NAME + "/")
if err := w.Start(time.Millisecond * 100); err != nil {
log.Error(err)
}
}
func (this *Server) RepairStatByDate(date string) StatDateFileInfo {
defer func() {
if re := recover(); re != nil {
buffer := debug.Stack()
log.Error("RepairStatByDate")
log.Error(re)
log.Error(string(buffer))
}
}()
var (
err error
keyPrefix string
fileInfo FileInfo
fileCount int64
fileSize int64
stat StatDateFileInfo
)
keyPrefix = "%s_%s_"
keyPrefix = fmt.Sprintf(keyPrefix, date, CONST_FILE_Md5_FILE_NAME)
iter := server.logDB.NewIterator(util.BytesPrefix([]byte(keyPrefix)), nil)
defer iter.Release()
for iter.Next() {
if err = json.Unmarshal(iter.Value(), &fileInfo); err != nil {
continue
}
fileCount = fileCount + 1
fileSize = fileSize + fileInfo.Size
}
this.statMap.Put(date+"_"+CONST_STAT_FILE_COUNT_KEY, fileCount)
this.statMap.Put(date+"_"+CONST_STAT_FILE_TOTAL_SIZE_KEY, fileSize)
this.SaveStat()
stat.Date = date
stat.FileCount = fileCount
stat.TotalSize = fileSize
return stat
}
func (this *Server) GetFilePathByInfo(fileInfo *FileInfo, withDocker bool) string {
var (
fn string
)
fn = fileInfo.Name
if fileInfo.ReName != "" {
fn = fileInfo.ReName
}
if withDocker {
return DOCKER_DIR + fileInfo.Path + "/" + fn
}
return fileInfo.Path + "/" + fn
}
func (this *Server) CheckFileExistByInfo(md5s string, fileInfo *FileInfo) bool {
var (
err error
fullpath string
fi os.FileInfo
info *FileInfo
)
if fileInfo == nil {
return false
}
if fileInfo.OffSet >= 0 {
//small file
if info, err = this.GetFileInfoFromLevelDB(fileInfo.Md5); err == nil && info.Md5 == fileInfo.Md5 {
return true
} else {
return false
}
}
fullpath = this.GetFilePathByInfo(fileInfo, true)
if fi, err = os.Stat(fullpath); err != nil {
return false
}
if fi.Size() == fileInfo.Size {
return true
} else {
return false
}
}
func (this *Server) ParseSmallFile(filename string) (string, int64, int, error) {
var (
err error
offset int64
length int
)
err = errors.New("unvalid small file")
if len(filename) < 3 {
return filename, -1, -1, err
}
if strings.Contains(filename, "/") {
filename = filename[strings.LastIndex(filename, "/")+1:]
}
pos := strings.Split(filename, ",")
if len(pos) < 3 {
return filename, -1, -1, err
}
offset, err = strconv.ParseInt(pos[1], 10, 64)
if err != nil {
return filename, -1, -1, err
}
if length, err = strconv.Atoi(pos[2]); err != nil {
return filename, offset, -1, err
}
if length > CONST_SMALL_FILE_SIZE || offset < 0 {
err = errors.New("invalid filesize or offset")
return filename, -1, -1, err
}
return pos[0], offset, length, nil
}
func (this *Server) DownloadFromPeer(peer string, fileInfo *FileInfo) {
var (
err error
filename string
fpath string
fpathTmp string
fi os.FileInfo
sum string
data []byte
downloadUrl string
)
if Config().ReadOnly {
log.Warn("ReadOnly", fileInfo)
return
}
if Config().RetryCount > 0 && fileInfo.retry >= Config().RetryCount {
log.Error("DownloadFromPeer Error ", fileInfo)
return
} else {
fileInfo.retry = fileInfo.retry + 1
}
filename = fileInfo.Name
if fileInfo.ReName != "" {
filename = fileInfo.ReName
}
if fileInfo.OffSet != -2 && Config().EnableDistinctFile && this.CheckFileExistByInfo(fileInfo.Md5, fileInfo) {
// ignore migrate file
log.Info(fmt.Sprintf("DownloadFromPeer file Exist, path:%s", fileInfo.Path+"/"+fileInfo.Name))
return
}
if (!Config().EnableDistinctFile || fileInfo.OffSet == -2) && this.util.FileExists(this.GetFilePathByInfo(fileInfo, true)) {
// ignore migrate file
if fi, err = os.Stat(this.GetFilePathByInfo(fileInfo, true)); err == nil {
if fi.ModTime().Unix() > fileInfo.TimeStamp {
log.Info(fmt.Sprintf("ignore file sync path:%s", this.GetFilePathByInfo(fileInfo, false)))
fileInfo.TimeStamp = fi.ModTime().Unix()
this.postFileToPeer(fileInfo) // keep newer
return
}
os.Remove(this.GetFilePathByInfo(fileInfo, true))
}
}
if _, err = os.Stat(fileInfo.Path); err != nil {
os.MkdirAll(DOCKER_DIR+fileInfo.Path, 0775)
}
//fmt.Println("downloadFromPeer",fileInfo)
p := strings.Replace(fileInfo.Path, STORE_DIR_NAME+"/", "", 1)
//filename=this.util.UrlEncode(filename)
if Config().SupportGroupManage {
downloadUrl = peer + "/" + Config().Group + "/" + p + "/" + filename
} else {
downloadUrl = peer + "/" + p + "/" + filename
}
log.Info("DownloadFromPeer: ", downloadUrl)
fpath = DOCKER_DIR + fileInfo.Path + "/" + filename
fpathTmp = DOCKER_DIR + fileInfo.Path + "/" + fmt.Sprintf("%s_%s", "tmp_", filename)
timeout := fileInfo.Size/1024/1024/1 + 30
if Config().SyncTimeout > 0 {
timeout = Config().SyncTimeout
}
this.lockMap.LockKey(fpath)
defer this.lockMap.UnLockKey(fpath)
download_key := fmt.Sprintf("downloading_%d_%s", time.Now().Unix(), fpath)
this.ldb.Put([]byte(download_key), []byte(""), nil)
defer func() {
this.ldb.Delete([]byte(download_key), nil)
}()
if fileInfo.OffSet == -2 {
//migrate file
if fi, err = os.Stat(fpath); err == nil && fi.Size() == fileInfo.Size {
//prevent double download
this.SaveFileInfoToLevelDB(fileInfo.Md5, fileInfo, this.ldb)
//log.Info(fmt.Sprintf("file '%s' has download", fpath))
return
}
req := httplib.Get(downloadUrl)
req.SetTimeout(time.Second*30, time.Second*time.Duration(timeout))
if err = req.ToFile(fpathTmp); err != nil {
this.AppendToDownloadQueue(fileInfo) //retry
os.Remove(fpathTmp)
log.Error(err, fpathTmp)
return
}
if os.Rename(fpathTmp, fpath) == nil {
//this.SaveFileMd5Log(fileInfo, CONST_FILE_Md5_FILE_NAME)
this.SaveFileInfoToLevelDB(fileInfo.Md5, fileInfo, this.ldb)
}
return
}
req := httplib.Get(downloadUrl)
req.SetTimeout(time.Second*30, time.Second*time.Duration(timeout))
if fileInfo.OffSet >= 0 {
//small file download
data, err = req.Bytes()
if err != nil {
this.AppendToDownloadQueue(fileInfo) //retry
log.Error(err)
return
}
data2 := make([]byte, len(data)+1)
data2[0] = '1'
for i, v := range data {
data2[i+1] = v
}
data = data2
if int64(len(data)) != fileInfo.Size {
log.Warn("file size is error")
return
}
fpath = strings.Split(fpath, ",")[0]
err = this.util.WriteFileByOffSet(fpath, fileInfo.OffSet, data)
if err != nil {
log.Warn(err)
return
}
this.SaveFileMd5Log(fileInfo, CONST_FILE_Md5_FILE_NAME)
return
}
if err = req.ToFile(fpathTmp); err != nil {
this.AppendToDownloadQueue(fileInfo) //retry
os.Remove(fpathTmp)
log.Error(err)
return
}
if fi, err = os.Stat(fpathTmp); err != nil {
os.Remove(fpathTmp)
return
}
_ = sum
//if Config().EnableDistinctFile {
// //DistinctFile
// if sum, err = this.util.GetFileSumByName(fpathTmp, Config().FileSumArithmetic); err != nil {
// log.Error(err)
// return
// }
//} else {
// //DistinctFile By path
// sum = this.util.MD5(this.GetFilePathByInfo(fileInfo, false))
//}
if fi.Size() != fileInfo.Size { // maybe has bug remove || sum != fileInfo.Md5
log.Error("file sum check error")
os.Remove(fpathTmp)
return
}
if os.Rename(fpathTmp, fpath) == nil {
this.SaveFileMd5Log(fileInfo, CONST_FILE_Md5_FILE_NAME)
}
}
func (this *Server) CrossOrigin(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Headers", "Authorization, Content-Type, Depth, User-Agent, X-File-Size, X-Requested-With, X-Requested-By, If-Modified-Since, X-File-Name, X-File-Type, Cache-Control, Origin")
w.Header().Set("Access-Control-Allow-Methods", "GET, POST, OPTIONS, PUT, DELETE")
w.Header().Set("Access-Control-Expose-Headers", "Authorization")
//https://blog.csdn.net/yanzisu_congcong/article/details/80552155
}
func (this *Server) SetDownloadHeader(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/octet-stream")
w.Header().Set("Content-Disposition", "attachment")
}
func (this *Server) CheckAuth(w http.ResponseWriter, r *http.Request) bool {
var (
err error
req *httplib.BeegoHTTPRequest
result string
jsonResult JsonResult
)
if err = r.ParseForm(); err != nil {
log.Error(err)
return false
}
req = httplib.Post(Config().AuthUrl)
req.SetTimeout(time.Second*10, time.Second*10)
req.Param("__path__", r.URL.Path)
req.Param("__query__", r.URL.RawQuery)
for k, _ := range r.Form {
req.Param(k, r.FormValue(k))
}
for k, v := range r.Header {
req.Header(k, v[0])
}
result, err = req.String()
result = strings.TrimSpace(result)
if strings.HasPrefix(result, "{") && strings.HasSuffix(result, "}") {
if err = json.Unmarshal([]byte(result), &jsonResult); err != nil {
log.Error(err)
return false
}
if jsonResult.Data != "ok" {
log.Warn(result)
return false
}
} else {
if result != "ok" {
log.Warn(result)
return false
}
}
return true
}
func (this *Server) NotPermit(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(401)
}
func (this *Server) GetFilePathFromRequest(w http.ResponseWriter, r *http.Request) (string, string) {
var (
err error
fullpath string
smallPath string
prefix string
)
fullpath = r.RequestURI[1:]
if strings.HasPrefix(r.RequestURI, "/"+Config().Group+"/") {
fullpath = r.RequestURI[len(Config().Group)+2 : len(r.RequestURI)]
}
fullpath = strings.Split(fullpath, "?")[0] // just path
fullpath = DOCKER_DIR + STORE_DIR_NAME + "/" + fullpath
prefix = "/" + LARGE_DIR_NAME + "/"
if Config().SupportGroupManage {
prefix = "/" + Config().Group + "/" + LARGE_DIR_NAME + "/"
}
if strings.HasPrefix(r.RequestURI, prefix) {
smallPath = fullpath //notice order
fullpath = strings.Split(fullpath, ",")[0]
}
if fullpath, err = url.PathUnescape(fullpath); err != nil {
log.Error(err)
}
return fullpath, smallPath
}
func (this *Server) CheckDownloadAuth(w http.ResponseWriter, r *http.Request) (bool, error) {
var (
err error
maxTimestamp int64
minTimestamp int64
ts int64
token string
timestamp string
fullpath string
smallPath string
pathMd5 string
fileInfo *FileInfo
scene string
secret interface{}
code string
ok bool
)
CheckToken := func(token string, md5sum string, timestamp string) bool {
if this.util.MD5(md5sum+timestamp) != token {
return false
}
return true
}
if Config().EnableDownloadAuth && Config().AuthUrl != "" && !this.IsPeer(r) && !this.CheckAuth(w, r) {
return false, errors.New("auth fail")
}
if Config().DownloadUseToken && !this.IsPeer(r) {
token = r.FormValue("token")
timestamp = r.FormValue("timestamp")
if token == "" || timestamp == "" {
return false, errors.New("unvalid request")
}
maxTimestamp = time.Now().Add(time.Second *
time.Duration(Config().DownloadTokenExpire)).Unix()
minTimestamp = time.Now().Add(-time.Second *
time.Duration(Config().DownloadTokenExpire)).Unix()
if ts, err = strconv.ParseInt(timestamp, 10, 64); err != nil {
return false, errors.New("unvalid timestamp")
}
if ts > maxTimestamp || ts < minTimestamp {
return false, errors.New("timestamp expire")
}
fullpath, smallPath = this.GetFilePathFromRequest(w, r)
if smallPath != "" {
pathMd5 = this.util.MD5(smallPath)
} else {
pathMd5 = this.util.MD5(fullpath)
}
if fileInfo, err = this.GetFileInfoFromLevelDB(pathMd5); err != nil {
// TODO
} else {
ok := CheckToken(token, fileInfo.Md5, timestamp)
if !ok {
return ok, errors.New("unvalid token")
}
return ok, nil
}
}
if Config().EnableGoogleAuth && !this.IsPeer(r) {
fullpath = r.RequestURI[len(Config().Group)+2 : len(r.RequestURI)]
fullpath = strings.Split(fullpath, "?")[0] // just path
scene = strings.Split(fullpath, "/")[0]
code = r.FormValue("code")
if secret, ok = this.sceneMap.GetValue(scene); ok {
if !this.VerifyGoogleCode(secret.(string), code, int64(Config().DownloadTokenExpire/30)) {
return false, errors.New("invalid google code")
}
}
}
return true, nil
}
func (this *Server) GetSmallFileByURI(w http.ResponseWriter, r *http.Request) ([]byte, bool, error) {
var (
err error
data []byte
offset int64
length int
fullpath string
info os.FileInfo
)
fullpath, _ = this.GetFilePathFromRequest(w, r)
if _, offset, length, err = this.ParseSmallFile(r.RequestURI); err != nil {
return nil, false, err
}
if info, err = os.Stat(fullpath); err != nil {
return nil, false, err
}
if info.Size() < offset+int64(length) {
return nil, true, errors.New("noFound")
} else {
data, err = this.util.ReadFileByOffSet(fullpath, offset, length)
if err != nil {
return nil, false, err
}
return data, false, err
}
}
func (this *Server) DownloadSmallFileByURI(w http.ResponseWriter, r *http.Request) (bool, error) {
var (
err error
data []byte
isDownload bool
imgWidth int
imgHeight int
width string
height string
notFound bool
)
r.ParseForm()
isDownload = true
if r.FormValue("download") == "" {
isDownload = Config().DefaultDownload
}
if r.FormValue("download") == "0" {
isDownload = false
}
width = r.FormValue("width")
height = r.FormValue("height")
if width != "" {
imgWidth, err = strconv.Atoi(width)
if err != nil {
log.Error(err)
}
}
if height != "" {
imgHeight, err = strconv.Atoi(height)
if err != nil {
log.Error(err)
}
}
data, notFound, err = this.GetSmallFileByURI(w, r)
_ = notFound
if data != nil && string(data[0]) == "1" {
if isDownload {
this.SetDownloadHeader(w, r)
}
if imgWidth != 0 || imgHeight != 0 {
this.ResizeImageByBytes(w, data[1:], uint(imgWidth), uint(imgHeight))
return true, nil
}
w.Write(data[1:])
return true, nil
}
return false, errors.New("not found")
}
func (this *Server) DownloadNormalFileByURI(w http.ResponseWriter, r *http.Request) (bool, error) {
var (
err error
isDownload bool
imgWidth int
imgHeight int
width string
height string
)
r.ParseForm()
isDownload = true
if r.FormValue("download") == "" {
isDownload = Config().DefaultDownload
}
if r.FormValue("download") == "0" {
isDownload = false
}
width = r.FormValue("width")
height = r.FormValue("height")
if width != "" {
imgWidth, err = strconv.Atoi(width)
if err != nil {
log.Error(err)
}
}
if height != "" {
imgHeight, err = strconv.Atoi(height)
if err != nil {
log.Error(err)
}
}
if isDownload {
this.SetDownloadHeader(w, r)
}
fullpath, _ := this.GetFilePathFromRequest(w, r)
if imgWidth != 0 || imgHeight != 0 {
if Config().EnableImageResize {
this.ResizeImage(w, fullpath, uint(imgWidth), uint(imgHeight))
}
return true, nil
}
staticHandler.ServeHTTP(w, r)
return true, nil
}
func (this *Server) DownloadNotFound(w http.ResponseWriter, r *http.Request) {
var (
err error
fullpath string
smallPath string
isDownload bool
pathMd5 string
peer string
fileInfo *FileInfo
)
fullpath, smallPath = this.GetFilePathFromRequest(w, r)
isDownload = true
if r.FormValue("download") == "" {
isDownload = Config().DefaultDownload
}
if r.FormValue("download") == "0" {
isDownload = false
}
if smallPath != "" {
pathMd5 = this.util.MD5(smallPath)
} else {
pathMd5 = this.util.MD5(fullpath)
}
for _, peer = range Config().Peers {
if fileInfo, err = this.checkPeerFileExist(peer, pathMd5, fullpath); err != nil {
log.Error(err)
continue
}
if fileInfo.Md5 != "" {
go this.DownloadFromPeer(peer, fileInfo)
//http.Redirect(w, r, peer+r.RequestURI, 302)
if isDownload {
this.SetDownloadHeader(w, r)
}
this.DownloadFileToResponse(peer+r.RequestURI, w, r)
return
}
}
w.WriteHeader(404)
return
}
func (this *Server) Download(w http.ResponseWriter, r *http.Request) {
var (
err error
ok bool
fullpath string
smallPath string
fi os.FileInfo
)
// redirect to upload
if r.RequestURI == "/" || r.RequestURI == "" ||
r.RequestURI == "/"+Config().Group ||
r.RequestURI == "/"+Config().Group+"/" {
this.Index(w, r)
return
}
if ok, err = this.CheckDownloadAuth(w, r); !ok {
log.Error(err)
this.NotPermit(w, r)
return
}
if Config().EnableCrossOrigin {
this.CrossOrigin(w, r)
}
fullpath, smallPath = this.GetFilePathFromRequest(w, r)
if smallPath == "" {
if fi, err = os.Stat(fullpath); err != nil {
this.DownloadNotFound(w, r)
return
}
if !Config().ShowDir && fi.IsDir() {
w.Write([]byte("list dir deny"))
return
}
//staticHandler.ServeHTTP(w, r)
this.DownloadNormalFileByURI(w, r)
return
}
if smallPath != "" {
if ok, err = this.DownloadSmallFileByURI(w, r); !ok {
this.DownloadNotFound(w, r)
return
}
return
}
}
func (this *Server) DownloadFileToResponse(url string, w http.ResponseWriter, r *http.Request) {
var (
err error
req *httplib.BeegoHTTPRequest
resp *http.Response
)
req = httplib.Get(url)
req.SetTimeout(time.Second*20, time.Second*600)
resp, err = req.DoRequest()
if err != nil {
log.Error(err)
}
defer resp.Body.Close()
_, err = io.Copy(w, resp.Body)
if err != nil {
log.Error(err)
}
}
func (this *Server) ResizeImageByBytes(w http.ResponseWriter, data []byte, width, height uint) {
var (
img image.Image
err error
imgType string
)
reader := bytes.NewReader(data)
img, imgType, err = image.Decode(reader)
if err != nil {
log.Error(err)
return
}
img = resize.Resize(width, height, img, resize.Lanczos3)
if imgType == "jpg" || imgType == "jpeg" {
jpeg.Encode(w, img, nil)
} else if imgType == "png" {
png.Encode(w, img)
} else {
w.Write(data)
}
}
func (this *Server) ResizeImage(w http.ResponseWriter, fullpath string, width, height uint) {
var (
img image.Image
err error
imgType string
file *os.File
smallFile *os.File
resizeFileName string
resizePath string
)
resizePath = path.Dir(fullpath)
resizeFileName = "cache_" + this.util.MD5(fmt.Sprintf("%s?w=%d&h=%d", fullpath, width, height))
resizePath = fmt.Sprintf("%s/%s", resizePath, resizeFileName)
if this.util.IsExist(resizePath) {
if smallFile, err = os.Open(resizePath); err != nil {
log.Error(err)
}
defer smallFile.Close()
io.Copy(w, smallFile)
return
}
file, err = os.Open(fullpath)
if err != nil {
log.Error(err)
return
}
img, imgType, err = image.Decode(file)
if err != nil {
log.Error(err)
return
}
defer file.Close()
if smallFile, err = os.Create(resizePath); err != nil {
log.Error(err)
return
}
defer smallFile.Close()
img = resize.Resize(width, height, img, resize.Lanczos3)
if imgType == "jpg" || imgType == "jpeg" {
jpeg.Encode(smallFile, img, nil)
} else if imgType == "png" {
png.Encode(smallFile, img)
} else {
file.Seek(0, 0)
io.Copy(w, file)
return
}
smallFile.Seek(0, 0)
io.Copy(w, smallFile)
}
func (this *Server) GetServerURI(r *http.Request) string {
return fmt.Sprintf("http://%s/", r.Host)
}
func (this *Server) CheckFileAndSendToPeer(date string, filename string, isForceUpload bool) {
var (
md5set mapset.Set
err error
md5s []interface{}
)
defer func() {
if re := recover(); re != nil {
buffer := debug.Stack()
log.Error("CheckFileAndSendToPeer")
log.Error(re)
log.Error(string(buffer))
}
}()
if md5set, err = this.GetMd5sByDate(date, filename); err != nil {
log.Error(err)
return
}
md5s = md5set.ToSlice()
for _, md := range md5s {
if md == nil {
continue
}
if fileInfo, _ := this.GetFileInfoFromLevelDB(md.(string)); fileInfo != nil && fileInfo.Md5 != "" {
if isForceUpload {
fileInfo.Peers = []string{}
}
if len(fileInfo.Peers) > len(Config().Peers) {
continue
}
if !this.util.Contains(this.host, fileInfo.Peers) {
fileInfo.Peers = append(fileInfo.Peers, this.host) // peer is null
}
if filename == CONST_Md5_QUEUE_FILE_NAME {
this.AppendToDownloadQueue(fileInfo)
} else {
this.AppendToQueue(fileInfo)
}
}
}
}
func (this *Server) postFileToPeer(fileInfo *FileInfo) {
var (
err error
peer string
filename string
info *FileInfo
postURL string
result string
fi os.FileInfo
i int
data []byte
fpath string
)
defer func() {
if re := recover(); re != nil {
buffer := debug.Stack()
log.Error("postFileToPeer")
log.Error(re)
log.Error(string(buffer))
}
}()
//fmt.Println("postFile",fileInfo)
for i, peer = range Config().Peers {
_ = i
if fileInfo.Peers == nil {
fileInfo.Peers = []string{}
}
if this.util.Contains(peer, fileInfo.Peers) {
continue
}
filename = fileInfo.Name
if fileInfo.ReName != "" {
filename = fileInfo.ReName
if fileInfo.OffSet != -1 {
filename = strings.Split(fileInfo.ReName, ",")[0]
}
}
fpath = DOCKER_DIR + fileInfo.Path + "/" + filename
if !this.util.FileExists(fpath) {
log.Warn(fmt.Sprintf("file '%s' not found", fpath))
continue
} else {
if fileInfo.Size == 0 {
if fi, err = os.Stat(fpath); err != nil {
log.Error(err)
} else {
fileInfo.Size = fi.Size()
}
}
}
if fileInfo.OffSet != -2 && Config().EnableDistinctFile {
//not migrate file should check or update file
// where not EnableDistinctFile should check
if info, err = this.checkPeerFileExist(peer, fileInfo.Md5, ""); info.Md5 != "" {
fileInfo.Peers = append(fileInfo.Peers, peer)
if _, err = this.SaveFileInfoToLevelDB(fileInfo.Md5, fileInfo, this.ldb); err != nil {
log.Error(err)
}
continue
}
}
postURL = fmt.Sprintf("%s%s", peer, this.getRequestURI("syncfile_info"))
b := httplib.Post(postURL)
b.SetTimeout(time.Second*30, time.Second*30)
if data, err = json.Marshal(fileInfo); err != nil {
log.Error(err)
return
}
b.Param("fileInfo", string(data))
result, err = b.String()
if err != nil {
if fileInfo.retry <= Config().RetryCount {
fileInfo.retry = fileInfo.retry + 1
this.AppendToQueue(fileInfo)
}
log.Error(err, fmt.Sprintf(" path:%s", fileInfo.Path+"/"+fileInfo.Name))
}
if !strings.HasPrefix(result, "http://") || err != nil {
this.SaveFileMd5Log(fileInfo, CONST_Md5_ERROR_FILE_NAME)
}
if strings.HasPrefix(result, "http://") {
log.Info(result)
if !this.util.Contains(peer, fileInfo.Peers) {
fileInfo.Peers = append(fileInfo.Peers, peer)
if _, err = this.SaveFileInfoToLevelDB(fileInfo.Md5, fileInfo, this.ldb); err != nil {
log.Error(err)
}
}
}
if err != nil {
log.Error(err)
}
}
}
func (this *Server) SaveFileMd5Log(fileInfo *FileInfo, filename string) {
var (
info FileInfo
)
for len(this.queueFileLog)+len(this.queueFileLog)/10 > CONST_QUEUE_SIZE {
time.Sleep(time.Second * 1)
}
info = *fileInfo
this.queueFileLog <- &FileLog{FileInfo: &info, FileName: filename}
}
func (this *Server) saveFileMd5Log(fileInfo *FileInfo, filename string) {
var (
err error
outname string
logDate string
ok bool
fullpath string
md5Path string
logKey string
)
defer func() {
if re := recover(); re != nil {
buffer := debug.Stack()
log.Error("saveFileMd5Log")
log.Error(re)
log.Error(string(buffer))
}
}()
if fileInfo == nil || fileInfo.Md5 == "" || filename == "" {
log.Warn("saveFileMd5Log", fileInfo, filename)
return
}
logDate = this.util.GetDayFromTimeStamp(fileInfo.TimeStamp)
outname = fileInfo.Name
if fileInfo.ReName != "" {
outname = fileInfo.ReName
}
fullpath = fileInfo.Path + "/" + outname
logKey = fmt.Sprintf("%s_%s_%s", logDate, filename, fileInfo.Md5)
if filename == CONST_FILE_Md5_FILE_NAME {
//this.searchMap.Put(fileInfo.Md5, fileInfo.Name)
if ok, err = this.IsExistFromLevelDB(fileInfo.Md5, this.ldb); !ok {
this.statMap.AddCountInt64(logDate+"_"+CONST_STAT_FILE_COUNT_KEY, 1)
this.statMap.AddCountInt64(logDate+"_"+CONST_STAT_FILE_TOTAL_SIZE_KEY, fileInfo.Size)
this.SaveStat()
}
if _, err = this.SaveFileInfoToLevelDB(logKey, fileInfo, this.logDB); err != nil {
log.Error(err)
}
if _, err := this.SaveFileInfoToLevelDB(fileInfo.Md5, fileInfo, this.ldb); err != nil {
log.Error("saveToLevelDB", err, fileInfo)
}
if _, err = this.SaveFileInfoToLevelDB(this.util.MD5(fullpath), fileInfo, this.ldb); err != nil {
log.Error("saveToLevelDB", err, fileInfo)
}
return
}
if filename == CONST_REMOME_Md5_FILE_NAME {
//this.searchMap.Remove(fileInfo.Md5)
if ok, err = this.IsExistFromLevelDB(fileInfo.Md5, this.ldb); ok {
this.statMap.AddCountInt64(logDate+"_"+CONST_STAT_FILE_COUNT_KEY, -1)
this.statMap.AddCountInt64(logDate+"_"+CONST_STAT_FILE_TOTAL_SIZE_KEY, -fileInfo.Size)
this.SaveStat()
}
this.RemoveKeyFromLevelDB(logKey, this.logDB)
md5Path = this.util.MD5(fullpath)
if err := this.RemoveKeyFromLevelDB(fileInfo.Md5, this.ldb); err != nil {
log.Error("RemoveKeyFromLevelDB", err, fileInfo)
}
if err = this.RemoveKeyFromLevelDB(md5Path, this.ldb); err != nil {
log.Error("RemoveKeyFromLevelDB", err, fileInfo)
}
// remove files.md5 for stat info(repair from logDB)
logKey = fmt.Sprintf("%s_%s_%s", logDate, CONST_FILE_Md5_FILE_NAME, fileInfo.Md5)
this.RemoveKeyFromLevelDB(logKey, this.logDB)
return
}
this.SaveFileInfoToLevelDB(logKey, fileInfo, this.logDB)
}
func (this *Server) checkPeerFileExist(peer string, md5sum string, fpath string) (*FileInfo, error) {
var (
err error
fileInfo FileInfo
)
req := httplib.Post(fmt.Sprintf("%s%s?md5=%s", peer, this.getRequestURI("check_file_exist"), md5sum))
req.Param("path", fpath)
req.Param("md5", md5sum)
req.SetTimeout(time.Second*5, time.Second*10)
if err = req.ToJSON(&fileInfo); err != nil {
return &FileInfo{}, err
}
if fileInfo.Md5 == "" {
return &fileInfo, errors.New("not found")
}
return &fileInfo, nil
}
func (this *Server) CheckFileExist(w http.ResponseWriter, r *http.Request) {
var (
data []byte
err error
fileInfo *FileInfo
fpath string
fi os.FileInfo
)
r.ParseForm()
md5sum := ""
md5sum = r.FormValue("md5")
fpath = r.FormValue("path")
if fileInfo, err = this.GetFileInfoFromLevelDB(md5sum); fileInfo != nil {
if fileInfo.OffSet != -1 {
if data, err = json.Marshal(fileInfo); err != nil {
log.Error(err)
}
w.Write(data)
return
}
fpath = DOCKER_DIR + fileInfo.Path + "/" + fileInfo.Name
if fileInfo.ReName != "" {
fpath = DOCKER_DIR + fileInfo.Path + "/" + fileInfo.ReName
}
if this.util.IsExist(fpath) {
if data, err = json.Marshal(fileInfo); err == nil {
w.Write(data)
return
} else {
log.Error(err)
}
} else {
if fileInfo.OffSet == -1 {
this.RemoveKeyFromLevelDB(md5sum, this.ldb) // when file delete,delete from leveldb
}
}
} else {
if fpath != "" {
fi, err = os.Stat(fpath)
if err == nil {
sum := this.util.MD5(fpath)
//if Config().EnableDistinctFile {
// sum, err = this.util.GetFileSumByName(fpath, Config().FileSumArithmetic)
// if err != nil {
// log.Error(err)
// }
//}
fileInfo = &FileInfo{
Path: path.Dir(fpath),
Name: path.Base(fpath),
Size: fi.Size(),
Md5: sum,
Peers: []string{Config().Host},
OffSet: -1, //very important
TimeStamp: fi.ModTime().Unix(),
}
data, err = json.Marshal(fileInfo)
w.Write(data)
return
}
}
}
data, _ = json.Marshal(FileInfo{})
w.Write(data)
return
}
func (this *Server) CheckFilesExist(w http.ResponseWriter, r *http.Request) {
var (
data []byte
err error
fileInfo *FileInfo
fileInfos []*FileInfo
fpath string
result JsonResult
)
r.ParseForm()
md5sum := ""
md5sum = r.FormValue("md5s")
md5s := strings.Split(md5sum, ",")
for _, m := range md5s {
if fileInfo, err = this.GetFileInfoFromLevelDB(m); fileInfo != nil {
if fileInfo.OffSet != -1 {
if data, err = json.Marshal(fileInfo); err != nil {
log.Error(err)
}
//w.Write(data)
//return
fileInfos = append(fileInfos, fileInfo)
continue
}
fpath = DOCKER_DIR + fileInfo.Path + "/" + fileInfo.Name
if fileInfo.ReName != "" {
fpath = DOCKER_DIR + fileInfo.Path + "/" + fileInfo.ReName
}
if this.util.IsExist(fpath) {
if data, err = json.Marshal(fileInfo); err == nil {
fileInfos = append(fileInfos, fileInfo)
//w.Write(data)
//return
continue
} else {
log.Error(err)
}
} else {
if fileInfo.OffSet == -1 {
this.RemoveKeyFromLevelDB(md5sum, this.ldb) // when file delete,delete from leveldb
}
}
}
}
result.Data = fileInfos
data, _ = json.Marshal(result)
w.Write(data)
return
}
func (this *Server) Sync(w http.ResponseWriter, r *http.Request) {
var (
result JsonResult
)
r.ParseForm()
result.Status = "fail"
if !this.IsPeer(r) {
result.Message = "client must be in cluster"
w.Write([]byte(this.util.JsonEncodePretty(result)))
return
}
date := ""
force := ""
inner := ""
isForceUpload := false
force = r.FormValue("force")
date = r.FormValue("date")
inner = r.FormValue("inner")
if force == "1" {
isForceUpload = true
}
if inner != "1" {
for _, peer := range Config().Peers {
req := httplib.Post(peer + this.getRequestURI("sync"))
req.Param("force", force)
req.Param("inner", "1")
req.Param("date", date)
if _, err := req.String(); err != nil {
log.Error(err)
}
}
}
if date == "" {
result.Message = "require paramete date &force , ?date=20181230"
w.Write([]byte(this.util.JsonEncodePretty(result)))
return
}
date = strings.Replace(date, ".", "", -1)
if isForceUpload {
go this.CheckFileAndSendToPeer(date, CONST_FILE_Md5_FILE_NAME, isForceUpload)
} else {
go this.CheckFileAndSendToPeer(date, CONST_Md5_ERROR_FILE_NAME, isForceUpload)
}
result.Status = "ok"
result.Message = "job is running"
w.Write([]byte(this.util.JsonEncodePretty(result)))
}
func (this *Server) IsExistFromLevelDB(key string, db *leveldb.DB) (bool, error) {
return db.Has([]byte(key), nil)
}
func (this *Server) redisDo(action string, args ...interface{}) (reply interface{}, err error) {
c := this.rp.Get()
defer c.Close()
return c.Do(action, args...)
}
func (this *Server) GetFileInfoFromLevelDB(key string) (*FileInfo, error) {
var (
err error
data []byte
fileInfo FileInfo
fiJson string
)
if Config().EnableRedis {
if fiJson, err = redis.String(this.redisDo("GET", key)); err != nil {
return nil, err
} else {
if err = json.Unmarshal([]byte(fiJson), &fileInfo); err != nil {
return nil, err
}
return &fileInfo, nil
}
}
if data, err = this.ldb.Get([]byte(key), nil); err != nil {
return nil, err
}
if err = json.Unmarshal(data, &fileInfo); err != nil {
return nil, err
}
return &fileInfo, nil
}
func (this *Server) SaveStat() {
SaveStatFunc := func() {
defer func() {
if re := recover(); re != nil {
buffer := debug.Stack()
log.Error("SaveStatFunc")
log.Error(re)
log.Error(string(buffer))
}
}()
stat := this.statMap.Get()
if v, ok := stat[CONST_STAT_FILE_COUNT_KEY]; ok {
switch v.(type) {
case int64, int32, int, float64, float32:
if v.(int64) >= 0 {
if data, err := json.Marshal(stat); err != nil {
log.Error(err)
} else {
this.util.WriteBinFile(CONST_STAT_FILE_NAME, data)
}
}
}
}
}
SaveStatFunc()
}
func (this *Server) RemoveKeyFromLevelDB(key string, db *leveldb.DB) error {
var (
err error
)
err = db.Delete([]byte(key), nil)
return err
}
func (this *Server) SaveFileInfoToRedis(key string, fileInfo *FileInfo) (*FileInfo, error) {
var (
err error
data []byte
)
if data, err = json.Marshal(fileInfo); err != nil {
return fileInfo, err
}
if _, err = this.redisDo("SET", key, string(data)); err != nil {
return nil, err
}
return fileInfo, nil
}
func (this *Server) SaveFileInfoToLevelDB(key string, fileInfo *FileInfo, db *leveldb.DB) (*FileInfo, error) {
var (
err error
data []byte
)
if fileInfo == nil || db == nil {
return nil, errors.New("fileInfo is null or db is null")
}
if data, err = json.Marshal(fileInfo); err != nil {
return fileInfo, err
}
if err = db.Put([]byte(key), data, nil); err != nil {
return fileInfo, err
}
if db == this.ldb { //search slow ,write fast, double write logDB
logDate := this.util.GetDayFromTimeStamp(fileInfo.TimeStamp)
logKey := fmt.Sprintf("%s_%s_%s", logDate, CONST_FILE_Md5_FILE_NAME, fileInfo.Md5)
this.logDB.Put([]byte(logKey), data, nil)
}
if Config().EnableRedis && db == this.ldb {
if _, err = this.redisDo("SET", key, string(data)); err != nil {
log.Error(err)
}
}
return fileInfo, nil
}
func (this *Server) IsPeer(r *http.Request) bool {
var (
ip string
peer string
bflag bool
cidr *net.IPNet
err error
)
IsPublicIP := func(IP net.IP) bool {
if IP.IsLoopback() || IP.IsLinkLocalMulticast() || IP.IsLinkLocalUnicast() {
return false
}
if ip4 := IP.To4(); ip4 != nil {
switch true {
case ip4[0] == 10:
return false
case ip4[0] == 172 && ip4[1] >= 16 && ip4[1] <= 31:
return false
case ip4[0] == 192 && ip4[1] == 168:
return false
default:
return true
}
}
return false
}
//return true
ip = this.util.GetClientIp(r)
if this.util.Contains("0.0.0.0", Config().AdminIps) {
if IsPublicIP(net.ParseIP(ip)) {
return false
}
return true
}
if this.util.Contains(ip, Config().AdminIps) {
return true
}
for _, v := range Config().AdminIps {
if strings.Contains(v, "/") {
if _, cidr, err = net.ParseCIDR(v); err != nil {
log.Error(err)
return false
}
if cidr.Contains(net.ParseIP(ip)) {
return true
}
}
}
realIp := os.Getenv("GO_FASTDFS_IP")
if realIp == "" {
realIp = this.util.GetPulicIP()
}
if ip == "127.0.0.1" || ip == realIp {
return true
}
ip = "http://" + ip
bflag = false
for _, peer = range Config().Peers {
if strings.HasPrefix(peer, ip) {
bflag = true
break
}
}
return bflag
}
func (this *Server) ReceiveMd5s(w http.ResponseWriter, r *http.Request) {
var (
err error
md5str string
fileInfo *FileInfo
md5s []string
)
if !this.IsPeer(r) {
log.Warn(fmt.Sprintf("ReceiveMd5s %s", this.util.GetClientIp(r)))
w.Write([]byte(this.GetClusterNotPermitMessage(r)))
return
}
r.ParseForm()
md5str = r.FormValue("md5s")
md5s = strings.Split(md5str, ",")
AppendFunc := func(md5s []string) {
for _, m := range md5s {
if m != "" {
if fileInfo, err = this.GetFileInfoFromLevelDB(m); err != nil {
log.Error(err)
continue
}
this.AppendToQueue(fileInfo)
}
}
}
go AppendFunc(md5s)
}
func (this *Server) GetClusterNotPermitMessage(r *http.Request) string {
var (
message string
)
message = fmt.Sprintf(CONST_MESSAGE_CLUSTER_IP, this.util.GetClientIp(r))
return message
}
func (this *Server) GetMd5sForWeb(w http.ResponseWriter, r *http.Request) {
var (
date string
err error
result mapset.Set
lines []string
md5s []interface{}
)
if !this.IsPeer(r) {
w.Write([]byte(this.GetClusterNotPermitMessage(r)))
return
}
date = r.FormValue("date")
if result, err = this.GetMd5sByDate(date, CONST_FILE_Md5_FILE_NAME); err != nil {
log.Error(err)
return
}
md5s = result.ToSlice()
for _, line := range md5s {
if line != nil && line != "" {
lines = append(lines, line.(string))
}
}
w.Write([]byte(strings.Join(lines, ",")))
}
func (this *Server) GetMd5File(w http.ResponseWriter, r *http.Request) {
var (
date string
fpath string
data []byte
err error
)
if !this.IsPeer(r) {
return
}
fpath = DATA_DIR + "/" + date + "/" + CONST_FILE_Md5_FILE_NAME
if !this.util.FileExists(fpath) {
w.WriteHeader(404)
return
}
if data, err = ioutil.ReadFile(fpath); err != nil {
w.WriteHeader(500)
return
}
w.Write(data)
}
func (this *Server) GetMd5sMapByDate(date string, filename string) (*goutil.CommonMap, error) {
var (
err error
result *goutil.CommonMap
fpath string
content string
lines []string
line string
cols []string
data []byte
)
result = goutil.NewCommonMap(0)
if filename == "" {
fpath = DATA_DIR + "/" + date + "/" + CONST_FILE_Md5_FILE_NAME
} else {
fpath = DATA_DIR + "/" + date + "/" + filename
}
if !this.util.FileExists(fpath) {
return result, errors.New(fmt.Sprintf("fpath %s not found", fpath))
}
if data, err = ioutil.ReadFile(fpath); err != nil {
return result, err
}
content = string(data)
lines = strings.Split(content, "\n")
for _, line = range lines {
cols = strings.Split(line, "|")
if len(cols) > 2 {
if _, err = strconv.ParseInt(cols[1], 10, 64); err != nil {
continue
}
result.Add(cols[0])
}
}
return result, nil
}
func (this *Server) GetMd5sByDate(date string, filename string) (mapset.Set, error) {
var (
keyPrefix string
md5set mapset.Set
keys []string
)
md5set = mapset.NewSet()
keyPrefix = "%s_%s_"
keyPrefix = fmt.Sprintf(keyPrefix, date, filename)
iter := server.logDB.NewIterator(util.BytesPrefix([]byte(keyPrefix)), nil)
for iter.Next() {
keys = strings.Split(string(iter.Key()), "_")
if len(keys) >= 3 {
md5set.Add(keys[2])
}
}
iter.Release()
return md5set, nil
}
func (this *Server) SyncFileInfo(w http.ResponseWriter, r *http.Request) {
var (
err error
fileInfo FileInfo
fileInfoStr string
filename string
)
r.ParseForm()
fileInfoStr = r.FormValue("fileInfo")
if !this.IsPeer(r) {
log.Info("isn't peer fileInfo:", fileInfo)
return
}
if err = json.Unmarshal([]byte(fileInfoStr), &fileInfo); err != nil {
w.Write([]byte(this.GetClusterNotPermitMessage(r)))
log.Error(err)
return
}
if fileInfo.OffSet == -2 {
// optimize migrate
this.SaveFileInfoToLevelDB(fileInfo.Md5, &fileInfo, this.ldb)
} else {
this.SaveFileMd5Log(&fileInfo, CONST_Md5_QUEUE_FILE_NAME)
}
this.AppendToDownloadQueue(&fileInfo)
filename = fileInfo.Name
if fileInfo.ReName != "" {
filename = fileInfo.ReName
}
p := strings.Replace(fileInfo.Path, STORE_DIR+"/", "", 1)
downloadUrl := fmt.Sprintf("http://%s/%s", r.Host, Config().Group+"/"+p+"/"+filename)
log.Info("SyncFileInfo: ", downloadUrl)
w.Write([]byte(downloadUrl))
}
func (this *Server) CheckScene(scene string) (bool, error) {
var (
scenes []string
)
if len(Config().Scenes) == 0 {
return true, nil
}
for _, s := range Config().Scenes {
scenes = append(scenes, strings.Split(s, ":")[0])
}
if !this.util.Contains(scene, scenes) {
return false, errors.New("not valid scene")
}
return true, nil
}
func (this *Server) GetFileInfo(w http.ResponseWriter, r *http.Request) {
var (
fpath string
md5sum string
fileInfo *FileInfo
err error
result JsonResult
)
md5sum = r.FormValue("md5")
fpath = r.FormValue("path")
result.Status = "fail"
if !this.IsPeer(r) {
w.Write([]byte(this.GetClusterNotPermitMessage(r)))
return
}
md5sum = r.FormValue("md5")
if fpath != "" {
fpath = strings.Replace(fpath, "/"+Config().Group+"/", STORE_DIR_NAME+"/", 1)
md5sum = this.util.MD5(fpath)
}
if fileInfo, err = this.GetFileInfoFromLevelDB(md5sum); err != nil {
log.Error(err)
result.Message = err.Error()
w.Write([]byte(this.util.JsonEncodePretty(result)))
return
}
result.Status = "ok"
result.Data = fileInfo
w.Write([]byte(this.util.JsonEncodePretty(result)))
return
}
func (this *Server) RemoveFile(w http.ResponseWriter, r *http.Request) {
var (
err error
md5sum string
fileInfo *FileInfo
fpath string
delUrl string
result JsonResult
inner string
name string
)
_ = delUrl
_ = inner
r.ParseForm()
md5sum = r.FormValue("md5")
fpath = r.FormValue("path")
inner = r.FormValue("inner")
result.Status = "fail"
if !this.IsPeer(r) {
w.Write([]byte(this.GetClusterNotPermitMessage(r)))
return
}
if Config().AuthUrl != "" && !this.CheckAuth(w, r) {
this.NotPermit(w, r)
return
}
if fpath != "" && md5sum == "" {
fpath = strings.Replace(fpath, "/"+Config().Group+"/", STORE_DIR_NAME+"/", 1)
md5sum = this.util.MD5(fpath)
}
if inner != "1" {
for _, peer := range Config().Peers {
delFile := func(peer string, md5sum string, fileInfo *FileInfo) {
delUrl = fmt.Sprintf("%s%s", peer, this.getRequestURI("delete"))
req := httplib.Post(delUrl)
req.Param("md5", md5sum)
req.Param("inner", "1")
req.SetTimeout(time.Second*5, time.Second*10)
if _, err = req.String(); err != nil {
log.Error(err)
}
}
go delFile(peer, md5sum, fileInfo)
}
}
if len(md5sum) < 32 {
result.Message = "md5 unvalid"
w.Write([]byte(this.util.JsonEncodePretty(result)))
return
}
if fileInfo, err = this.GetFileInfoFromLevelDB(md5sum); err != nil {
result.Message = err.Error()
w.Write([]byte(this.util.JsonEncodePretty(result)))
return
}
if fileInfo.OffSet >= 0 {
result.Message = "small file delete not support"
w.Write([]byte(this.util.JsonEncodePretty(result)))
return
}
name = fileInfo.Name
if fileInfo.ReName != "" {
name = fileInfo.ReName
}
fpath = fileInfo.Path + "/" + name
if fileInfo.Path != "" && this.util.FileExists(DOCKER_DIR+fpath) {
this.SaveFileMd5Log(fileInfo, CONST_REMOME_Md5_FILE_NAME)
if err = os.Remove(DOCKER_DIR + fpath); err != nil {
result.Message = err.Error()
w.Write([]byte(this.util.JsonEncodePretty(result)))
return
} else {
result.Message = "remove success"
result.Status = "ok"
w.Write([]byte(this.util.JsonEncodePretty(result)))
return
}
}
result.Message = "fail remove"
w.Write([]byte(this.util.JsonEncodePretty(result)))
}
func (this *Server) getRequestURI(action string) string {
var (
uri string
)
if Config().SupportGroupManage {
uri = "/" + Config().Group + "/" + action
} else {
uri = "/" + action
}
return uri
}
func (this *Server) BuildFileResult(fileInfo *FileInfo, r *http.Request) FileResult {
var (
outname string
fileResult FileResult
p string
downloadUrl string
domain string
host string
protocol string
)
if Config().EnableHttps {
protocol = "https"
} else {
protocol = "http"
}
host = strings.Replace(Config().Host, "http://", "", -1)
if r != nil {
host = r.Host
}
if !strings.HasPrefix(Config().DownloadDomain, "http") {
if Config().DownloadDomain == "" {
Config().DownloadDomain = fmt.Sprintf("%s://%s", protocol, host)
} else {
Config().DownloadDomain = fmt.Sprintf("%s://%s", protocol, Config().DownloadDomain)
}
}
if Config().DownloadDomain != "" {
domain = Config().DownloadDomain
} else {
domain = fmt.Sprintf("%s://%s", protocol, host)
}
outname = fileInfo.Name
if fileInfo.ReName != "" {
outname = fileInfo.ReName
}
p = strings.Replace(fileInfo.Path, STORE_DIR_NAME+"/", "", 1)
if Config().SupportGroupManage {
p = Config().Group + "/" + p + "/" + outname
} else {
p = p + "/" + outname
}
downloadUrl = fmt.Sprintf("%s://%s/%s", protocol, host, p)
if Config().DownloadDomain != "" {
downloadUrl = fmt.Sprintf("%s/%s", Config().DownloadDomain, p)
}
fileResult.Url = downloadUrl
fileResult.Md5 = fileInfo.Md5
fileResult.Path = "/" + p
fileResult.Domain = domain
fileResult.Scene = fileInfo.Scene
fileResult.Size = fileInfo.Size
fileResult.ModTime = fileInfo.TimeStamp
// Just for Compatibility
fileResult.Src = fileResult.Path
fileResult.Scenes = fileInfo.Scene
return fileResult
}
func (this *Server) SaveUploadFile(file multipart.File, header *multipart.FileHeader, fileInfo *FileInfo, r *http.Request) (*FileInfo, error) {
var (
err error
outFile *os.File
folder string
fi os.FileInfo
)
defer file.Close()
_, fileInfo.Name = filepath.Split(header.Filename)
// bugfix for ie upload file contain fullpath
if len(Config().Extensions) > 0 && !this.util.Contains(path.Ext(fileInfo.Name), Config().Extensions) {
return fileInfo, errors.New("(error)file extension mismatch")
}
if Config().RenameFile {
fileInfo.ReName = this.util.MD5(this.util.GetUUID()) + path.Ext(fileInfo.Name)
}
folder = time.Now().Format("20060102/15/04")
if Config().PeerId != "" {
folder = fmt.Sprintf(folder+"/%s", Config().PeerId)
}
if fileInfo.Scene != "" {
folder = fmt.Sprintf(STORE_DIR+"/%s/%s", fileInfo.Scene, folder)
} else {
folder = fmt.Sprintf(STORE_DIR+"/%s", folder)
}
if fileInfo.Path != "" {
if strings.HasPrefix(fileInfo.Path, STORE_DIR) {
folder = fileInfo.Path
} else {
folder = STORE_DIR + "/" + fileInfo.Path
}
}
if !this.util.FileExists(folder) {
if err = os.MkdirAll(folder, 0775); err != nil {
log.Error(err)
}
}
outPath := fmt.Sprintf(folder+"/%s", fileInfo.Name)
if fileInfo.ReName != "" {
outPath = fmt.Sprintf(folder+"/%s", fileInfo.ReName)
}
if this.util.FileExists(outPath) && Config().EnableDistinctFile {
for i := 0; i < 10000; i++ {
outPath = fmt.Sprintf(folder+"/%d_%s", i, filepath.Base(header.Filename))
fileInfo.Name = fmt.Sprintf("%d_%s", i, header.Filename)
if !this.util.FileExists(outPath) {
break
}
}
}
log.Info(fmt.Sprintf("upload: %s", outPath))
if outFile, err = os.Create(outPath); err != nil {
return fileInfo, err
}
defer outFile.Close()
if err != nil {
log.Error(err)
return fileInfo, errors.New("(error)fail," + err.Error())
}
if _, err = io.Copy(outFile, file); err != nil {
log.Error(err)
return fileInfo, errors.New("(error)fail," + err.Error())
}
if fi, err = outFile.Stat(); err != nil {
log.Error(err)
return fileInfo, errors.New("(error)fail," + err.Error())
} else {
fileInfo.Size = fi.Size()
}
if fi.Size() != header.Size {
return fileInfo, errors.New("(error)file uncomplete")
}
v := "" // this.util.GetFileSum(outFile, Config().FileSumArithmetic)
if Config().EnableDistinctFile {
v = this.util.GetFileSum(outFile, Config().FileSumArithmetic)
} else {
v = this.util.MD5(this.GetFilePathByInfo(fileInfo, false))
}
fileInfo.Md5 = v
//fileInfo.Path = folder //strings.Replace( folder,DOCKER_DIR,"",1)
fileInfo.Path = strings.Replace(folder, DOCKER_DIR, "", 1)
fileInfo.Peers = append(fileInfo.Peers, this.host)
//fmt.Println("upload",fileInfo)
return fileInfo, nil
}
func (this *Server) Upload(w http.ResponseWriter, r *http.Request) {
var (
err error
fn string
folder string
fpTmp *os.File
fpBody *os.File
)
if r.Method == http.MethodGet {
this.upload(w, r)
return
}
folder = STORE_DIR + "/_tmp/" + time.Now().Format("20060102")
if !this.util.FileExists(folder) {
if err = os.MkdirAll(folder, 0777); err != nil {
log.Error(err)
}
}
fn = folder + "/" + this.util.GetUUID()
fpTmp, err = os.OpenFile(fn, os.O_RDWR|os.O_CREATE, 0777)
if err != nil {
log.Error(err)
w.Write([]byte(err.Error()))
return
}
if _, err = io.Copy(fpTmp, r.Body); err != nil {
log.Error(err)
w.Write([]byte(err.Error()))
return
}
fpTmp.Close()
if fpBody, err = os.Open(fn); err != nil {
log.Error(err)
w.Write([]byte(err.Error()))
return
}
r.Body = fpBody
defer func() {
fpBody.Close()
os.Remove(fn)
}()
done := make(chan bool, 1)
this.queueUpload <- WrapReqResp{&w, r, done}
<-done
}
func (this *Server) upload(w http.ResponseWriter, r *http.Request) {
var (
err error
ok bool
// pathname string
md5sum string
fileName string
fileInfo FileInfo
uploadFile multipart.File
uploadHeader *multipart.FileHeader
scene string
output string
fileResult FileResult
result JsonResult
data []byte
code string
secret interface{}
msg string
)
output = r.FormValue("output")
if Config().EnableCrossOrigin {
this.CrossOrigin(w, r)
if r.Method == http.MethodOptions {
return
}
}
result.Status = "fail"
if Config().AuthUrl != "" {
if !this.CheckAuth(w, r) {
msg = "auth fail"
log.Warn(msg, r.Form)
this.NotPermit(w, r)
result.Message = msg
w.Write([]byte(this.util.JsonEncodePretty(result)))
return
}
}
if r.Method == http.MethodPost {
md5sum = r.FormValue("md5")
fileName = r.FormValue("filename")
output = r.FormValue("output")
if Config().ReadOnly {
msg = "(error) readonly"
result.Message = msg
log.Warn(msg)
w.Write([]byte(this.util.JsonEncodePretty(result)))
return
}
if Config().EnableCustomPath {
fileInfo.Path = r.FormValue("path")
fileInfo.Path = strings.Trim(fileInfo.Path, "/")
}
scene = r.FormValue("scene")
code = r.FormValue("code")
if scene == "" {
//Just for Compatibility
scene = r.FormValue("scenes")
}
if Config().EnableGoogleAuth && scene != "" {
if secret, ok = this.sceneMap.GetValue(scene); ok {
if !this.VerifyGoogleCode(secret.(string), code, int64(Config().DownloadTokenExpire/30)) {
this.NotPermit(w, r)
msg = "invalid request,error google code"
result.Message = msg
log.Error(msg)
w.Write([]byte(this.util.JsonEncodePretty(result)))
return
}
}
}
fileInfo.Md5 = md5sum
fileInfo.ReName = fileName
fileInfo.OffSet = -1
if uploadFile, uploadHeader, err = r.FormFile("file"); err != nil {
log.Error(err)
result.Message = err.Error()
w.Write([]byte(this.util.JsonEncodePretty(result)))
return
}
fileInfo.Peers = []string{}
fileInfo.TimeStamp = time.Now().Unix()
if scene == "" {
scene = Config().DefaultScene
}
if output == "" {
output = "text"
}
if !this.util.Contains(output, []string{"json", "text", "json2"}) {
msg = "output just support json or text or json2"
result.Message = msg
log.Warn(msg)
w.Write([]byte(this.util.JsonEncodePretty(result)))
return
}
fileInfo.Scene = scene
if _, err = this.CheckScene(scene); err != nil {
result.Message = err.Error()
w.Write([]byte(this.util.JsonEncodePretty(result)))
log.Error(err)
return
}
if err != nil {
log.Error(err)
http.Redirect(w, r, "/", http.StatusMovedPermanently)
return
}
if _, err = this.SaveUploadFile(uploadFile, uploadHeader, &fileInfo, r); err != nil {
result.Message = err.Error()
log.Error(err)
w.Write([]byte(this.util.JsonEncodePretty(result)))
return
}
if Config().EnableDistinctFile {
if v, _ := this.GetFileInfoFromLevelDB(fileInfo.Md5); v != nil && v.Md5 != "" {
fileResult = this.BuildFileResult(v, r)
if Config().RenameFile {
os.Remove(DOCKER_DIR + fileInfo.Path + "/" + fileInfo.ReName)
} else {
os.Remove(DOCKER_DIR + fileInfo.Path + "/" + fileInfo.Name)
}
if output == "json" || output == "json2" {
if output == "json2" {
result.Data = fileResult
result.Status = "ok"
w.Write([]byte(this.util.JsonEncodePretty(result)))
return
}
w.Write([]byte(this.util.JsonEncodePretty(fileResult)))
} else {
w.Write([]byte(fileResult.Url))
}
return
}
}
if fileInfo.Md5 == "" {
msg = " fileInfo.Md5 is null"
log.Warn(msg)
result.Message = msg
w.Write([]byte(this.util.JsonEncodePretty(result)))
return
}
if md5sum != "" && fileInfo.Md5 != md5sum {
msg = " fileInfo.Md5 and md5sum !="
log.Warn(msg)
result.Message = msg
w.Write([]byte(this.util.JsonEncodePretty(result)))
return
}
if !Config().EnableDistinctFile {
// bugfix filecount stat
fileInfo.Md5 = this.util.MD5(this.GetFilePathByInfo(&fileInfo, false))
}
if Config().EnableMergeSmallFile && fileInfo.Size < CONST_SMALL_FILE_SIZE {
if err = this.SaveSmallFile(&fileInfo); err != nil {
log.Error(err)
result.Message = err.Error()
w.Write([]byte(this.util.JsonEncodePretty(result)))
return
}
}
this.saveFileMd5Log(&fileInfo, CONST_FILE_Md5_FILE_NAME) //maybe slow
go this.postFileToPeer(&fileInfo)
if fileInfo.Size <= 0 {
msg = "file size is zero"
result.Message = msg
w.Write([]byte(this.util.JsonEncodePretty(result)))
log.Error(msg)
return
}
fileResult = this.BuildFileResult(&fileInfo, r)
if output == "json" || output == "json2" {
if output == "json2" {
result.Data = fileResult
result.Status = "ok"
w.Write([]byte(this.util.JsonEncodePretty(result)))
return
}
w.Write([]byte(this.util.JsonEncodePretty(fileResult)))
} else {
w.Write([]byte(fileResult.Url))
}
return
} else {
md5sum = r.FormValue("md5")
output = r.FormValue("output")
if md5sum == "" {
msg = "(error) if you want to upload fast md5 is require" +
",and if you want to upload file,you must use post method "
result.Message = msg
log.Error(msg)
w.Write([]byte(this.util.JsonEncodePretty(result)))
return
}
if v, _ := this.GetFileInfoFromLevelDB(md5sum); v != nil && v.Md5 != "" {
fileResult = this.BuildFileResult(v, r)
result.Data = fileResult
result.Status = "ok"
}
if output == "json" || output == "json2" {
if data, err = json.Marshal(fileResult); err != nil {
log.Error(err)
result.Message = err.Error()
w.Write([]byte(this.util.JsonEncodePretty(result)))
return
}
if output == "json2" {
w.Write([]byte(this.util.JsonEncodePretty(result)))
return
}
w.Write(data)
} else {
w.Write([]byte(fileResult.Url))
}
}
}
func (this *Server) SaveSmallFile(fileInfo *FileInfo) error {
var (
err error
filename string
fpath string
srcFile *os.File
desFile *os.File
largeDir string
destPath string
reName string
fileExt string
)
filename = fileInfo.Name
fileExt = path.Ext(filename)
if fileInfo.ReName != "" {
filename = fileInfo.ReName
}
fpath = DOCKER_DIR + fileInfo.Path + "/" + filename
largeDir = LARGE_DIR + "/" + Config().PeerId
if !this.util.FileExists(largeDir) {
os.MkdirAll(largeDir, 0775)
}
reName = fmt.Sprintf("%d", this.util.RandInt(100, 300))
destPath = largeDir + "/" + reName
this.lockMap.LockKey(destPath)
defer this.lockMap.UnLockKey(destPath)
if this.util.FileExists(fpath) {
srcFile, err = os.OpenFile(fpath, os.O_CREATE|os.O_RDONLY, 06666)
if err != nil {
return err
}
defer srcFile.Close()
desFile, err = os.OpenFile(destPath, os.O_CREATE|os.O_RDWR, 06666)
if err != nil {
return err
}
defer desFile.Close()
fileInfo.OffSet, err = desFile.Seek(0, 2)
if _, err = desFile.Write([]byte("1")); err != nil {
//first byte set 1
return err
}
fileInfo.OffSet, err = desFile.Seek(0, 2)
if err != nil {
return err
}
fileInfo.OffSet = fileInfo.OffSet - 1 //minus 1 byte
fileInfo.Size = fileInfo.Size + 1
fileInfo.ReName = fmt.Sprintf("%s,%d,%d,%s", reName, fileInfo.OffSet, fileInfo.Size, fileExt)
if _, err = io.Copy(desFile, srcFile); err != nil {
return err
}
srcFile.Close()
os.Remove(fpath)
fileInfo.Path = strings.Replace(largeDir, DOCKER_DIR, "", 1)
}
return nil
}
func (this *Server) SendToMail(to, subject, body, mailtype string) error {
host := Config().Mail.Host
user := Config().Mail.User
password := Config().Mail.Password
hp := strings.Split(host, ":")
auth := smtp.PlainAuth("", user, password, hp[0])
var contentType string
if mailtype == "html" {
contentType = "Content-Type: text/" + mailtype + "; charset=UTF-8"
} else {
contentType = "Content-Type: text/plain" + "; charset=UTF-8"
}
msg := []byte("To: " + to + "\r\nFrom: " + user + ">\r\nSubject: " + "\r\n" + contentType + "\r\n\r\n" + body)
sendTo := strings.Split(to, ";")
err := smtp.SendMail(host, auth, user, sendTo, msg)
return err
}
func (this *Server) BenchMark(w http.ResponseWriter, r *http.Request) {
t := time.Now()
batch := new(leveldb.Batch)
for i := 0; i < 100000000; i++ {
f := FileInfo{}
f.Peers = []string{"http://192.168.0.1", "http://192.168.2.5"}
f.Path = "20190201/19/02"
s := strconv.Itoa(i)
s = this.util.MD5(s)
f.Name = s
f.Md5 = s
if data, err := json.Marshal(&f); err == nil {
batch.Put([]byte(s), data)
}
if i%10000 == 0 {
if batch.Len() > 0 {
server.ldb.Write(batch, nil)
// batch = new(leveldb.Batch)
batch.Reset()
}
fmt.Println(i, time.Since(t).Seconds())
}
//fmt.Println(server.GetFileInfoFromLevelDB(s))
}
this.util.WriteFile("time.txt", time.Since(t).String())
fmt.Println(time.Since(t).String())
}
func (this *Server) RepairStatWeb(w http.ResponseWriter, r *http.Request) {
var (
result JsonResult
date string
inner string
)
if !this.IsPeer(r) {
result.Message = this.GetClusterNotPermitMessage(r)
w.Write([]byte(this.util.JsonEncodePretty(result)))
return
}
date = r.FormValue("date")
inner = r.FormValue("inner")
if ok, err := regexp.MatchString("\\d{8}", date); err != nil || !ok {
result.Message = "invalid date"
w.Write([]byte(this.util.JsonEncodePretty(result)))
return
}
if date == "" || len(date) != 8 {
date = this.util.GetToDay()
}
if inner != "1" {
for _, peer := range Config().Peers {
req := httplib.Post(peer + this.getRequestURI("repair_stat"))
req.Param("inner", "1")
req.Param("date", date)
if _, err := req.String(); err != nil {
log.Error(err)
}
}
}
result.Data = this.RepairStatByDate(date)
result.Status = "ok"
w.Write([]byte(this.util.JsonEncodePretty(result)))
}
func (this *Server) Stat(w http.ResponseWriter, r *http.Request) {
var (
result JsonResult
inner string
echart string
category []string
barCount []int64
barSize []int64
dataMap map[string]interface{}
)
if !this.IsPeer(r) {
result.Message = this.GetClusterNotPermitMessage(r)
w.Write([]byte(this.util.JsonEncodePretty(result)))
return
}
r.ParseForm()
inner = r.FormValue("inner")
echart = r.FormValue("echart")
data := this.GetStat()
result.Status = "ok"
result.Data = data
if echart == "1" {
dataMap = make(map[string]interface{}, 3)
for _, v := range data {
barCount = append(barCount, v.FileCount)
barSize = append(barSize, v.TotalSize)
category = append(category, v.Date)
}
dataMap["category"] = category
dataMap["barCount"] = barCount
dataMap["barSize"] = barSize
result.Data = dataMap
}
if inner == "1" {
w.Write([]byte(this.util.JsonEncodePretty(data)))
} else {
w.Write([]byte(this.util.JsonEncodePretty(result)))
}
}
func (this *Server) GetStat() []StatDateFileInfo {
var (
min int64
max int64
err error
i int64
rows []StatDateFileInfo
total StatDateFileInfo
)
min = 20190101
max = 20190101
for k := range this.statMap.Get() {
ks := strings.Split(k, "_")
if len(ks) == 2 {
if i, err = strconv.ParseInt(ks[0], 10, 64); err != nil {
continue
}
if i >= max {
max = i
}
if i < min {
min = i
}
}
}
for i := min; i <= max; i++ {
s := fmt.Sprintf("%d", i)
if v, ok := this.statMap.GetValue(s + "_" + CONST_STAT_FILE_TOTAL_SIZE_KEY); ok {
var info StatDateFileInfo
info.Date = s
switch v.(type) {
case int64:
info.TotalSize = v.(int64)
total.TotalSize = total.TotalSize + v.(int64)
}
if v, ok := this.statMap.GetValue(s + "_" + CONST_STAT_FILE_COUNT_KEY); ok {
switch v.(type) {
case int64:
info.FileCount = v.(int64)
total.FileCount = total.FileCount + v.(int64)
}
}
rows = append(rows, info)
}
}
total.Date = "all"
rows = append(rows, total)
return rows
}
func (this *Server) RegisterExit() {
c := make(chan os.Signal)
signal.Notify(c, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
go func() {
for s := range c {
switch s {
case syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT:
this.ldb.Close()
log.Info("Exit", s)
os.Exit(1)
}
}
}()
}
func (this *Server) AppendToQueue(fileInfo *FileInfo) {
for (len(this.queueToPeers) + CONST_QUEUE_SIZE/10) > CONST_QUEUE_SIZE {
time.Sleep(time.Millisecond * 50)
}
this.queueToPeers <- *fileInfo
}
func (this *Server) AppendToDownloadQueue(fileInfo *FileInfo) {
for (len(this.queueFromPeers) + CONST_QUEUE_SIZE/10) > CONST_QUEUE_SIZE {
time.Sleep(time.Millisecond * 50)
}
this.queueFromPeers <- *fileInfo
}
func (this *Server) ConsumerDownLoad() {
ConsumerFunc := func() {
for {
fileInfo := <-this.queueFromPeers
if len(fileInfo.Peers) <= 0 {
log.Warn("Peer is null", fileInfo)
continue
}
for _, peer := range fileInfo.Peers {
if strings.Contains(peer, "127.0.0.1") {
log.Warn("sync error with 127.0.0.1", fileInfo)
continue
}
if peer != this.host {
this.DownloadFromPeer(peer, &fileInfo)
break
}
}
}
}
for i := 0; i < Config().SyncWorker; i++ {
go ConsumerFunc()
}
}
func (this *Server) RemoveDownloading() {
RemoveDownloadFunc := func() {
for {
iter := this.ldb.NewIterator(util.BytesPrefix([]byte("downloading_")), nil)
for iter.Next() {
key := iter.Key()
keys := strings.Split(string(key), "_")
if len(keys) == 3 {
if t, err := strconv.ParseInt(keys[1], 10, 64); err == nil && time.Now().Unix()-t > 60*10 {
os.Remove(DOCKER_DIR + keys[2])
}
}
}
iter.Release()
time.Sleep(time.Minute * 3)
}
}
go RemoveDownloadFunc()
}
func (this *Server) ConsumerLog() {
go func() {
var (
fileLog *FileLog
)
for {
fileLog = <-this.queueFileLog
this.saveFileMd5Log(fileLog.FileInfo, fileLog.FileName)
}
}()
}
func (this *Server) LoadSearchDict() {
go func() {
log.Info("Load search dict ....")
f, err := os.Open(CONST_SEARCH_FILE_NAME)
if err != nil {
log.Error(err)
return
}
defer f.Close()
r := bufio.NewReader(f)
for {
line, isprefix, err := r.ReadLine()
for isprefix && err == nil {
kvs := strings.Split(string(line), "\t")
if len(kvs) == 2 {
this.searchMap.Put(kvs[0], kvs[1])
}
}
}
log.Info("finish load search dict")
}()
}
func (this *Server) SaveSearchDict() {
var (
err error
fp *os.File
searchDict map[string]interface{}
k string
v interface{}
)
this.lockMap.LockKey(CONST_SEARCH_FILE_NAME)
defer this.lockMap.UnLockKey(CONST_SEARCH_FILE_NAME)
searchDict = this.searchMap.Get()
fp, err = os.OpenFile(CONST_SEARCH_FILE_NAME, os.O_RDWR, 0755)
if err != nil {
log.Error(err)
return
}
defer fp.Close()
for k, v = range searchDict {
fp.WriteString(fmt.Sprintf("%s\t%s", k, v.(string)))
}
}
func (this *Server) ConsumerPostToPeer() {
ConsumerFunc := func() {
for {
fileInfo := <-this.queueToPeers
this.postFileToPeer(&fileInfo)
}
}
for i := 0; i < Config().SyncWorker; i++ {
go ConsumerFunc()
}
}
func (this *Server) ConsumerUpload() {
ConsumerFunc := func() {
for {
wr := <-this.queueUpload
this.upload(*wr.w, wr.r)
this.rtMap.AddCountInt64(CONST_UPLOAD_COUNTER_KEY, wr.r.ContentLength)
if v, ok := this.rtMap.GetValue(CONST_UPLOAD_COUNTER_KEY); ok {
if v.(int64) > 1*1024*1024*1024 {
var _v int64
this.rtMap.Put(CONST_UPLOAD_COUNTER_KEY, _v)
debug.FreeOSMemory()
}
}
wr.done <- true
}
}
for i := 0; i < Config().UploadWorker; i++ {
go ConsumerFunc()
}
}
func (this *Server) AutoRepair(forceRepair bool) {
if this.lockMap.IsLock("AutoRepair") {
log.Warn("Lock AutoRepair")
return
}
this.lockMap.LockKey("AutoRepair")
defer this.lockMap.UnLockKey("AutoRepair")
AutoRepairFunc := func(forceRepair bool) {
var (
dateStats []StatDateFileInfo
err error
countKey string
md5s string
localSet mapset.Set
remoteSet mapset.Set
allSet mapset.Set
tmpSet mapset.Set
fileInfo *FileInfo
)
defer func() {
if re := recover(); re != nil {
buffer := debug.Stack()
log.Error("AutoRepair")
log.Error(re)
log.Error(string(buffer))
}
}()
Update := func(peer string, dateStat StatDateFileInfo) {
//从远端拉数据过来
req := httplib.Get(fmt.Sprintf("%s%s?date=%s&force=%s", peer, this.getRequestURI("sync"), dateStat.Date, "1"))
req.SetTimeout(time.Second*5, time.Second*5)
if _, err = req.String(); err != nil {
log.Error(err)
}
log.Info(fmt.Sprintf("syn file from %s date %s", peer, dateStat.Date))
}
for _, peer := range Config().Peers {
req := httplib.Post(fmt.Sprintf("%s%s", peer, this.getRequestURI("stat")))
req.Param("inner", "1")
req.SetTimeout(time.Second*5, time.Second*15)
if err = req.ToJSON(&dateStats); err != nil {
log.Error(err)
continue
}
for _, dateStat := range dateStats {
if dateStat.Date == "all" {
continue
}
countKey = dateStat.Date + "_" + CONST_STAT_FILE_COUNT_KEY
if v, ok := this.statMap.GetValue(countKey); ok {
switch v.(type) {
case int64:
if v.(int64) != dateStat.FileCount || forceRepair {
//不相等,找差异
//TODO
req := httplib.Post(fmt.Sprintf("%s%s", peer, this.getRequestURI("get_md5s_by_date")))
req.SetTimeout(time.Second*15, time.Second*60)
req.Param("date", dateStat.Date)
if md5s, err = req.String(); err != nil {
continue
}
if localSet, err = this.GetMd5sByDate(dateStat.Date, CONST_FILE_Md5_FILE_NAME); err != nil {
log.Error(err)
continue
}
remoteSet = this.util.StrToMapSet(md5s, ",")
allSet = localSet.Union(remoteSet)
md5s = this.util.MapSetToStr(allSet.Difference(localSet), ",")
req = httplib.Post(fmt.Sprintf("%s%s", peer, this.getRequestURI("receive_md5s")))
req.SetTimeout(time.Second*15, time.Second*60)
req.Param("md5s", md5s)
req.String()
tmpSet = allSet.Difference(remoteSet)
for v := range tmpSet.Iter() {
if v != nil {
if fileInfo, err = this.GetFileInfoFromLevelDB(v.(string)); err != nil {
log.Error(err)
continue
}
this.AppendToQueue(fileInfo)
}
}
//Update(peer,dateStat)
}
}
} else {
Update(peer, dateStat)
}
}
}
}
AutoRepairFunc(forceRepair)
}
func (this *Server) CleanLogLevelDBByDate(date string, filename string) {
defer func() {
if re := recover(); re != nil {
buffer := debug.Stack()
log.Error("CleanLogLevelDBByDate")
log.Error(re)
log.Error(string(buffer))
}
}()
var (
err error
keyPrefix string
keys mapset.Set
)
keys = mapset.NewSet()
keyPrefix = "%s_%s_"
keyPrefix = fmt.Sprintf(keyPrefix, date, filename)
iter := server.logDB.NewIterator(util.BytesPrefix([]byte(keyPrefix)), nil)
for iter.Next() {
keys.Add(string(iter.Value()))
}
iter.Release()
for key := range keys.Iter() {
err = this.RemoveKeyFromLevelDB(key.(string), this.logDB)
if err != nil {
log.Error(err)
}
}
}
func (this *Server) CleanAndBackUp() {
Clean := func() {
var (
filenames []string
yesterday string
)
if this.curDate != this.util.GetToDay() {
filenames = []string{CONST_Md5_QUEUE_FILE_NAME, CONST_Md5_ERROR_FILE_NAME, CONST_REMOME_Md5_FILE_NAME}
yesterday = this.util.GetDayFromTimeStamp(time.Now().AddDate(0, 0, -1).Unix())
for _, filename := range filenames {
this.CleanLogLevelDBByDate(yesterday, filename)
}
this.BackUpMetaDataByDate(yesterday)
this.curDate = this.util.GetToDay()
}
}
go func() {
for {
time.Sleep(time.Hour * 6)
Clean()
}
}()
}
func (this *Server) LoadFileInfoByDate(date string, filename string) (mapset.Set, error) {
defer func() {
if re := recover(); re != nil {
buffer := debug.Stack()
log.Error("LoadFileInfoByDate")
log.Error(re)
log.Error(string(buffer))
}
}()
var (
err error
keyPrefix string
fileInfos mapset.Set
)
fileInfos = mapset.NewSet()
keyPrefix = "%s_%s_"
keyPrefix = fmt.Sprintf(keyPrefix, date, filename)
iter := server.logDB.NewIterator(util.BytesPrefix([]byte(keyPrefix)), nil)
for iter.Next() {
var fileInfo FileInfo
if err = json.Unmarshal(iter.Value(), &fileInfo); err != nil {
continue
}
fileInfos.Add(&fileInfo)
}
iter.Release()
return fileInfos, nil
}
func (this *Server) LoadQueueSendToPeer() {
if queue, err := this.LoadFileInfoByDate(this.util.GetToDay(), CONST_Md5_QUEUE_FILE_NAME); err != nil {
log.Error(err)
} else {
for fileInfo := range queue.Iter() {
//this.queueFromPeers <- *fileInfo.(*FileInfo)
this.AppendToDownloadQueue(fileInfo.(*FileInfo))
}
}
}
func (this *Server) CheckClusterStatus() {
check := func() {
defer func() {
if re := recover(); re != nil {
buffer := debug.Stack()
log.Error("CheckClusterStatus")
log.Error(re)
log.Error(string(buffer))
}
}()
var (
status JsonResult
err error
subject string
body string
req *httplib.BeegoHTTPRequest
)
for _, peer := range Config().Peers {
req = httplib.Get(fmt.Sprintf("%s%s", peer, this.getRequestURI("status")))
req.SetTimeout(time.Second*5, time.Second*5)
err = req.ToJSON(&status)
if err != nil || status.Status != "ok" {
for _, to := range Config().AlarmReceivers {
subject = "fastdfs server error"
if err != nil {
body = fmt.Sprintf("%s\nserver:%s\nerror:\n%s", subject, peer, err.Error())
} else {
body = fmt.Sprintf("%s\nserver:%s\n", subject, peer)
}
if err = this.SendToMail(to, subject, body, "text"); err != nil {
log.Error(err)
}
}
if Config().AlarmUrl != "" {
req = httplib.Post(Config().AlarmUrl)
req.SetTimeout(time.Second*10, time.Second*10)
req.Param("message", body)
req.Param("subject", subject)
if _, err = req.String(); err != nil {
log.Error(err)
}
}
}
}
}
go func() {
for {
time.Sleep(time.Minute * 10)
check()
}
}()
}
func (this *Server) RepairFileInfo(w http.ResponseWriter, r *http.Request) {
var (
result JsonResult
)
if !this.IsPeer(r) {
w.Write([]byte(this.GetClusterNotPermitMessage(r)))
return
}
if !Config().EnableMigrate {
w.Write([]byte("please set enable_migrate=true"))
return
}
result.Status = "ok"
result.Message = "repair job start,don't try again,very danger "
go this.RepairFileInfoFromFile()
w.Write([]byte(this.util.JsonEncodePretty(result)))
}
func (this *Server) Reload(w http.ResponseWriter, r *http.Request) {
var (
err error
data []byte
cfg GloablConfig
action string
cfgjson string
result JsonResult
)
result.Status = "fail"
r.ParseForm()
if !this.IsPeer(r) {
w.Write([]byte(this.GetClusterNotPermitMessage(r)))
return
}
cfgjson = r.FormValue("cfg")
action = r.FormValue("action")
_ = cfgjson
if action == "get" {
result.Data = Config()
result.Status = "ok"
w.Write([]byte(this.util.JsonEncodePretty(result)))
return
}
if action == "set" {
if cfgjson == "" {
result.Message = "(error)parameter cfg(json) require"
w.Write([]byte(this.util.JsonEncodePretty(result)))
return
}
if err = json.Unmarshal([]byte(cfgjson), &cfg); err != nil {
log.Error(err)
result.Message = err.Error()
w.Write([]byte(this.util.JsonEncodePretty(result)))
return
}
result.Status = "ok"
cfgjson = this.util.JsonEncodePretty(cfg)
this.util.WriteFile(CONST_CONF_FILE_NAME, cfgjson)
w.Write([]byte(this.util.JsonEncodePretty(result)))
return
}
if action == "reload" {
if data, err = ioutil.ReadFile(CONST_CONF_FILE_NAME); err != nil {
result.Message = err.Error()
w.Write([]byte(this.util.JsonEncodePretty(result)))
return
}
if err = json.Unmarshal(data, &cfg); err != nil {
result.Message = err.Error()
w.Write([]byte(this.util.JsonEncodePretty(result)))
return
}
ParseConfig(CONST_CONF_FILE_NAME)
this.initComponent(true)
result.Status = "ok"
w.Write([]byte(this.util.JsonEncodePretty(result)))
return
}
if action == "" {
w.Write([]byte("(error)action support set(json) get reload"))
}
}
func (this *Server) RemoveEmptyDir(w http.ResponseWriter, r *http.Request) {
var (
result JsonResult
)
result.Status = "ok"
if this.IsPeer(r) {
go this.util.RemoveEmptyDir(DATA_DIR)
go this.util.RemoveEmptyDir(STORE_DIR)
result.Message = "clean job start ..,don't try again!!!"
w.Write([]byte(this.util.JsonEncodePretty(result)))
} else {
result.Message = this.GetClusterNotPermitMessage(r)
w.Write([]byte(this.util.JsonEncodePretty(result)))
}
}
func (this *Server) BackUp(w http.ResponseWriter, r *http.Request) {
var (
err error
date string
result JsonResult
inner string
url string
)
result.Status = "ok"
r.ParseForm()
date = r.FormValue("date")
inner = r.FormValue("inner")
if date == "" {
date = this.util.GetToDay()
}
if this.IsPeer(r) {
if inner != "1" {
for _, peer := range Config().Peers {
backUp := func(peer string, date string) {
url = fmt.Sprintf("%s%s", peer, this.getRequestURI("backup"))
req := httplib.Post(url)
req.Param("date", date)
req.Param("inner", "1")
req.SetTimeout(time.Second*5, time.Second*600)
if _, err = req.String(); err != nil {
log.Error(err)
}
}
go backUp(peer, date)
}
}
go this.BackUpMetaDataByDate(date)
result.Message = "back job start..."
w.Write([]byte(this.util.JsonEncodePretty(result)))
} else {
result.Message = this.GetClusterNotPermitMessage(r)
w.Write([]byte(this.util.JsonEncodePretty(result)))
}
}
// Notice: performance is poor,just for low capacity,but low memory , if you want to high performance,use searchMap for search,but memory ....
func (this *Server) Search(w http.ResponseWriter, r *http.Request) {
var (
result JsonResult
err error
kw string
count int
fileInfos []FileInfo
md5s []string
)
kw = r.FormValue("kw")
if !this.IsPeer(r) {
result.Message = this.GetClusterNotPermitMessage(r)
w.Write([]byte(this.util.JsonEncodePretty(result)))
return
}
iter := this.ldb.NewIterator(nil, nil)
for iter.Next() {
var fileInfo FileInfo
value := iter.Value()
if err = json.Unmarshal(value, &fileInfo); err != nil {
log.Error(err)
continue
}
if strings.Contains(fileInfo.Name, kw) && !this.util.Contains(fileInfo.Md5, md5s) {
count = count + 1
fileInfos = append(fileInfos, fileInfo)
md5s = append(md5s, fileInfo.Md5)
}
if count >= 100 {
break
}
}
iter.Release()
err = iter.Error()
if err != nil {
log.Error()
}
//fileInfos=this.SearchDict(kw) // serch file from map for huge capacity
result.Status = "ok"
result.Data = fileInfos
w.Write([]byte(this.util.JsonEncodePretty(result)))
}
func (this *Server) SearchDict(kw string) []FileInfo {
var (
fileInfos []FileInfo
fileInfo *FileInfo
)
for dict := range this.searchMap.Iter() {
if strings.Contains(dict.Val.(string), kw) {
if fileInfo, _ = this.GetFileInfoFromLevelDB(dict.Key); fileInfo != nil {
fileInfos = append(fileInfos, *fileInfo)
}
}
}
return fileInfos
}
func (this *Server) ListDir(w http.ResponseWriter, r *http.Request) {
var (
result JsonResult
dir string
filesInfo []os.FileInfo
err error
filesResult []FileInfoResult
tmpDir string
)
if !this.IsPeer(r) {
result.Message = this.GetClusterNotPermitMessage(r)
w.Write([]byte(this.util.JsonEncodePretty(result)))
return
}
dir = r.FormValue("dir")
//if dir == "" {
// result.Message = "dir can't null"
// w.Write([]byte(this.util.JsonEncodePretty(result)))
// return
//}
dir = strings.Replace(dir, ".", "", -1)
if tmpDir, err = os.Readlink(dir); err == nil {
dir = tmpDir
}
filesInfo, err = ioutil.ReadDir(DOCKER_DIR + STORE_DIR_NAME + "/" + dir)
if err != nil {
log.Error(err)
result.Message = err.Error()
w.Write([]byte(this.util.JsonEncodePretty(result)))
return
}
for _, f := range filesInfo {
fi := FileInfoResult{
Name: f.Name(),
Size: f.Size(),
IsDir: f.IsDir(),
ModTime: f.ModTime().Unix(),
Path: dir,
Md5: this.util.MD5(strings.Replace(STORE_DIR_NAME+"/"+dir+"/"+f.Name(), "//", "/", -1)),
}
filesResult = append(filesResult, fi)
}
result.Status = "ok"
result.Data = filesResult
w.Write([]byte(this.util.JsonEncodePretty(result)))
return
}
func (this *Server) VerifyGoogleCode(secret string, code string, discrepancy int64) bool {
var (
goauth *googleAuthenticator.GAuth
)
goauth = googleAuthenticator.NewGAuth()
if ok, err := goauth.VerifyCode(secret, code, discrepancy); ok {
return ok
} else {
log.Error(err)
return ok
}
}
func (this *Server) GenGoogleCode(w http.ResponseWriter, r *http.Request) {
var (
err error
result JsonResult
secret string
goauth *googleAuthenticator.GAuth
)
r.ParseForm()
goauth = googleAuthenticator.NewGAuth()
secret = r.FormValue("secret")
result.Status = "ok"
result.Message = "ok"
if !this.IsPeer(r) {
result.Message = this.GetClusterNotPermitMessage(r)
w.Write([]byte(this.util.JsonEncodePretty(result)))
return
}
if result.Data, err = goauth.GetCode(secret); err != nil {
result.Message = err.Error()
w.Write([]byte(this.util.JsonEncodePretty(result)))
return
}
w.Write([]byte(this.util.JsonEncodePretty(result)))
}
func (this *Server) GenGoogleSecret(w http.ResponseWriter, r *http.Request) {
var (
result JsonResult
)
result.Status = "ok"
result.Message = "ok"
if !this.IsPeer(r) {
result.Message = this.GetClusterNotPermitMessage(r)
w.Write([]byte(this.util.JsonEncodePretty(result)))
return
}
GetSeed := func(length int) string {
seeds := "ABCDEFGHIJKLMNOPQRSTUVWXYZ234567"
s := ""
random.Seed(time.Now().UnixNano())
for i := 0; i < length; i++ {
s += string(seeds[random.Intn(32)])
}
return s
}
result.Data = GetSeed(16)
w.Write([]byte(this.util.JsonEncodePretty(result)))
}
func (this *Server) Report(w http.ResponseWriter, r *http.Request) {
var (
reportFileName string
result JsonResult
html string
)
result.Status = "ok"
r.ParseForm()
if this.IsPeer(r) {
reportFileName = STATIC_DIR + "/report.html"
if this.util.IsExist(reportFileName) {
if data, err := this.util.ReadBinFile(reportFileName); err != nil {
log.Error(err)
result.Message = err.Error()
w.Write([]byte(this.util.JsonEncodePretty(result)))
return
} else {
html = string(data)
if Config().SupportGroupManage {
html = strings.Replace(html, "{group}", "/"+Config().Group, 1)
} else {
html = strings.Replace(html, "{group}", "", 1)
}
w.Write([]byte(html))
return
}
} else {
w.Write([]byte(fmt.Sprintf("%s is not found", reportFileName)))
}
} else {
w.Write([]byte(this.GetClusterNotPermitMessage(r)))
}
}
func (this *Server) Repair(w http.ResponseWriter, r *http.Request) {
var (
force string
forceRepair bool
result JsonResult
)
result.Status = "ok"
r.ParseForm()
force = r.FormValue("force")
if force == "1" {
forceRepair = true
}
if this.IsPeer(r) {
go this.AutoRepair(forceRepair)
result.Message = "repair job start..."
w.Write([]byte(this.util.JsonEncodePretty(result)))
} else {
result.Message = this.GetClusterNotPermitMessage(r)
w.Write([]byte(this.util.JsonEncodePretty(result)))
}
}
func (this *Server) Status(w http.ResponseWriter, r *http.Request) {
var (
status JsonResult
sts map[string]interface{}
today string
sumset mapset.Set
ok bool
v interface{}
err error
appDir string
diskInfo *disk.UsageStat
memInfo *mem.VirtualMemoryStat
)
memStat := new(runtime.MemStats)
runtime.ReadMemStats(memStat)
today = this.util.GetToDay()
sts = make(map[string]interface{})
sts["Fs.QueueFromPeers"] = len(this.queueFromPeers)
sts["Fs.QueueToPeers"] = len(this.queueToPeers)
sts["Fs.QueueFileLog"] = len(this.queueFileLog)
for _, k := range []string{CONST_FILE_Md5_FILE_NAME, CONST_Md5_ERROR_FILE_NAME, CONST_Md5_QUEUE_FILE_NAME} {
k2 := fmt.Sprintf("%s_%s", today, k)
if v, ok = this.sumMap.GetValue(k2); ok {
sumset = v.(mapset.Set)
if k == CONST_Md5_QUEUE_FILE_NAME {
sts["Fs.QueueSetSize"] = sumset.Cardinality()
}
if k == CONST_Md5_ERROR_FILE_NAME {
sts["Fs.ErrorSetSize"] = sumset.Cardinality()
}
if k == CONST_FILE_Md5_FILE_NAME {
sts["Fs.FileSetSize"] = sumset.Cardinality()
}
}
}
sts["Fs.AutoRepair"] = Config().AutoRepair
sts["Fs.QueueUpload"] = len(this.queueUpload)
sts["Fs.RefreshInterval"] = Config().RefreshInterval
sts["Fs.Peers"] = Config().Peers
sts["Fs.Local"] = this.host
sts["Fs.FileStats"] = this.GetStat()
sts["Fs.ShowDir"] = Config().ShowDir
sts["Sys.NumGoroutine"] = runtime.NumGoroutine()
sts["Sys.NumCpu"] = runtime.NumCPU()
sts["Sys.Alloc"] = memStat.Alloc
sts["Sys.TotalAlloc"] = memStat.TotalAlloc
sts["Sys.HeapAlloc"] = memStat.HeapAlloc
sts["Sys.Frees"] = memStat.Frees
sts["Sys.HeapObjects"] = memStat.HeapObjects
sts["Sys.NumGC"] = memStat.NumGC
sts["Sys.GCCPUFraction"] = memStat.GCCPUFraction
sts["Sys.GCSys"] = memStat.GCSys
//sts["Sys.MemInfo"] = memStat
appDir, err = filepath.Abs(".")
if err != nil {
log.Error(err)
}
diskInfo, err = disk.Usage(appDir)
if err != nil {
log.Error(err)
}
sts["Sys.DiskInfo"] = diskInfo
memInfo, err = mem.VirtualMemory()
if err != nil {
log.Error(err)
}
sts["Sys.MemInfo"] = memInfo
status.Status = "ok"
status.Data = sts
w.Write([]byte(this.util.JsonEncodePretty(status)))
}
func (this *Server) HeartBeat(w http.ResponseWriter, r *http.Request) {
}
func (this *Server) Index(w http.ResponseWriter, r *http.Request) {
var (
uploadUrl string
uploadBigUrl string
uppy string
)
uploadUrl = "/upload"
uploadBigUrl = CONST_BIG_UPLOAD_PATH_SUFFIX
if Config().EnableWebUpload {
if Config().SupportGroupManage {
uploadUrl = fmt.Sprintf("/%s/upload", Config().Group)
uploadBigUrl = fmt.Sprintf("/%s%s", Config().Group, CONST_BIG_UPLOAD_PATH_SUFFIX)
}
uppy = `<html>
<head>
<meta charset="utf-8" />
<title>go-fastdfs</title>
<style>form { bargin } .form-line { display:block;height: 30px;margin:8px; } #stdUpload {background: #fafafa;border-radius: 10px;width: 745px; }</style>
<link href="https://transloadit.edgly.net/releases/uppy/v0.30.0/dist/uppy.min.css" rel="stylesheet"></head>
<body>
<div>标准上传(强列建议使用这种方式)</div>
<div id="stdUpload">
<form action="%s" method="post" enctype="multipart/form-data">
<span class="form-line">文件(file):
<input type="file" id="file" name="file" /></span>
<span class="form-line">场景(scene):
<input type="text" id="scene" name="scene" value="%s" /></span>
<span class="form-line">文件名(filename):
<input type="text" id="filename" name="filename" value="" /></span>
<span class="form-line">输出(output):
<input type="text" id="output" name="output" value="json2" title="json|text|json2" /></span>
<span class="form-line">自定义路径(path):
<input type="text" id="path" name="path" value="" /></span>
<span class="form-line">google认证码(code):
<input type="text" id="code" name="code" value="" /></span>
<span class="form-line">自定义认证(auth_token):
<input type="text" id="auth_token" name="auth_token" value="" /></span>
<input type="submit" name="submit" value="upload" />
</form>
</div>
<div>断点续传(如果文件很大时可以考虑)</div>
<div>
<div id="drag-drop-area"></div>
<script src="https://transloadit.edgly.net/releases/uppy/v0.30.0/dist/uppy.min.js"></script>
<script>var uppy = Uppy.Core().use(Uppy.Dashboard, {
inline: true,
target: '#drag-drop-area'
}).use(Uppy.Tus, {
endpoint: '%s'
})
uppy.on('complete', (result) => {
// console.log(result) console.log('Upload complete! Weve uploaded these files:', result.successful)
})
//uppy.setMeta({ auth_token: '9ee60e59-cb0f-4578-aaba-29b9fc2919ca',callback_url:'http://127.0.0.1/callback' ,filename:'自定义文件名','path':'自定义path',scene:'自定义场景' })//这里是传递上传的认证参数,callback_url参数中 id为文件的ID,info 文转的基本信息json
uppy.setMeta({ auth_token: '9ee60e59-cb0f-4578-aaba-29b9fc2919ca',callback_url:'http://127.0.0.1/callback'})//自定义参数与普通上传类似(虽然支持自定义,建议不要自定义,海量文件情况下,自定义很可能给自已给埋坑)
</script>
</div>
</body>
</html>`
uppyFileName := STATIC_DIR + "/uppy.html"
if this.util.IsExist(uppyFileName) {
if data, err := this.util.ReadBinFile(uppyFileName); err != nil {
log.Error(err)
} else {
uppy = string(data)
}
} else {
this.util.WriteFile(uppyFileName, uppy)
}
fmt.Fprintf(w,
fmt.Sprintf(uppy, uploadUrl, Config().DefaultScene, uploadBigUrl))
} else {
w.Write([]byte("web upload deny"))
}
}
func init() {
flag.Parse()
if *v {
fmt.Printf("%s\n%s\n%s\n%s\n", VERSION, BUILD_TIME, GO_VERSION, GIT_VERSION)
os.Exit(0)
}
appDir, e1 := filepath.Abs(filepath.Dir(os.Args[0]))
curDir, e2 := filepath.Abs(".")
if e1 == nil && e2 == nil && appDir != curDir && !strings.Contains(appDir, "go-build") {
msg := fmt.Sprintf("please change directory to '%s' start fileserver\n", appDir)
msg = msg + fmt.Sprintf("请切换到 '%s' 目录启动 fileserver ", appDir)
log.Warn(msg)
fmt.Println(msg)
os.Exit(1)
}
DOCKER_DIR = os.Getenv("GO_FASTDFS_DIR")
if DOCKER_DIR != "" {
if !strings.HasSuffix(DOCKER_DIR, "/") {
DOCKER_DIR = DOCKER_DIR + "/"
}
}
STORE_DIR = DOCKER_DIR + STORE_DIR_NAME
CONF_DIR = DOCKER_DIR + CONF_DIR_NAME
DATA_DIR = DOCKER_DIR + DATA_DIR_NAME
LOG_DIR = DOCKER_DIR + LOG_DIR_NAME
STATIC_DIR = DOCKER_DIR + STATIC_DIR_NAME
LARGE_DIR_NAME = "haystack"
LARGE_DIR = STORE_DIR + "/haystack"
CONST_LEVELDB_FILE_NAME = DATA_DIR + "/fileserver.db"
CONST_LOG_LEVELDB_FILE_NAME = DATA_DIR + "/log.db"
CONST_STAT_FILE_NAME = DATA_DIR + "/stat.json"
CONST_CONF_FILE_NAME = CONF_DIR + "/cfg.json"
CONST_SERVER_CRT_FILE_NAME = CONF_DIR + "/server.crt"
CONST_SERVER_KEY_FILE_NAME = CONF_DIR + "/server.key"
CONST_SEARCH_FILE_NAME = DATA_DIR + "/search.txt"
FOLDERS = []string{DATA_DIR, STORE_DIR, CONF_DIR, STATIC_DIR}
logAccessConfigStr = strings.Replace(logAccessConfigStr, "{DOCKER_DIR}", DOCKER_DIR, -1)
logConfigStr = strings.Replace(logConfigStr, "{DOCKER_DIR}", DOCKER_DIR, -1)
for _, folder := range FOLDERS {
os.MkdirAll(folder, 0775)
}
server = NewServer()
peerId := fmt.Sprintf("%d", server.util.RandInt(0, 9))
if !server.util.FileExists(CONST_CONF_FILE_NAME) {
var ip string
if ip = os.Getenv("GO_FASTDFS_IP"); ip == "" {
ip = server.util.GetPulicIP()
}
peer := "http://" + ip + ":8080"
cfg := fmt.Sprintf(cfgJson, peerId, peer, peer)
server.util.WriteFile(CONST_CONF_FILE_NAME, cfg)
}
if logger, err := log.LoggerFromConfigAsBytes([]byte(logConfigStr)); err != nil {
panic(err)
} else {
log.ReplaceLogger(logger)
}
if _logacc, err := log.LoggerFromConfigAsBytes([]byte(logAccessConfigStr)); err == nil {
logacc = _logacc
log.Info("succes init log access")
} else {
log.Error(err.Error())
}
ParseConfig(CONST_CONF_FILE_NAME)
if Config().QueueSize == 0 {
Config().QueueSize = CONST_QUEUE_SIZE
}
if Config().PeerId == "" {
Config().PeerId = peerId
}
if Config().SupportGroupManage {
staticHandler = http.StripPrefix("/"+Config().Group+"/", http.FileServer(http.Dir(STORE_DIR)))
} else {
staticHandler = http.StripPrefix("/", http.FileServer(http.Dir(STORE_DIR)))
}
server.initComponent(false)
}
func (this *Server) test() {
testLock := func() {
wg := sync.WaitGroup{}
tt := func(i int, wg *sync.WaitGroup) {
//if server.lockMap.IsLock("xx") {
// return
//}
//fmt.Println("timeer len",len(server.lockMap.Get()))
//time.Sleep(time.Nanosecond*10)
server.lockMap.LockKey("xx")
defer server.lockMap.UnLockKey("xx")
//time.Sleep(time.Nanosecond*1)
//fmt.Println("xx", i)
wg.Done()
}
go func() {
for {
time.Sleep(time.Second * 1)
fmt.Println("timeer len", len(server.lockMap.Get()), server.lockMap.Get())
}
}()
fmt.Println(len(server.lockMap.Get()))
for i := 0; i < 10000; i++ {
wg.Add(1)
go tt(i, &wg)
}
fmt.Println(len(server.lockMap.Get()))
fmt.Println(len(server.lockMap.Get()))
server.lockMap.LockKey("abc")
fmt.Println("lock")
time.Sleep(time.Second * 5)
server.lockMap.UnLockKey("abc")
server.lockMap.LockKey("abc")
server.lockMap.UnLockKey("abc")
}
_ = testLock
testFile := func() {
var (
err error
f *os.File
)
f, err = os.OpenFile("tt", os.O_CREATE|os.O_RDWR, 0777)
if err != nil {
fmt.Println(err)
}
f.WriteAt([]byte("1"), 100)
f.Seek(0, 2)
f.Write([]byte("2"))
//fmt.Println(f.Seek(0, 2))
//fmt.Println(f.Seek(3, 2))
//fmt.Println(f.Seek(3, 0))
//fmt.Println(f.Seek(3, 1))
//fmt.Println(f.Seek(3, 0))
//f.Write([]byte("1"))
}
_ = testFile
//testFile()
//testLock()
}
type hookDataStore struct {
tusd.DataStore
}
type httpError struct {
error
statusCode int
}
func (err httpError) StatusCode() int {
return err.statusCode
}
func (err httpError) Body() []byte {
return []byte(err.Error())
}
func (store hookDataStore) NewUpload(info tusd.FileInfo) (id string, err error) {
var (
jsonResult JsonResult
)
if Config().AuthUrl != "" {
if auth_token, ok := info.MetaData["auth_token"]; !ok {
msg := "token auth fail,auth_token is not in http header Upload-Metadata," +
"in uppy uppy.setMeta({ auth_token: '9ee60e59-cb0f-4578-aaba-29b9fc2919ca' })"
log.Error(msg, fmt.Sprintf("current header:%v", info.MetaData))
return "", httpError{error: errors.New(msg), statusCode: 401}
} else {
req := httplib.Post(Config().AuthUrl)
req.Param("auth_token", auth_token)
req.SetTimeout(time.Second*5, time.Second*10)
content, err := req.String()
content = strings.TrimSpace(content)
if strings.HasPrefix(content, "{") && strings.HasSuffix(content, "}") {
if err = json.Unmarshal([]byte(content), &jsonResult); err != nil {
log.Error(err)
return "", httpError{error: errors.New(err.Error() + content), statusCode: 401}
}
if jsonResult.Data != "ok" {
return "", httpError{error: errors.New(content), statusCode: 401}
}
} else {
if err != nil {
log.Error(err)
return "", err
}
if strings.TrimSpace(content) != "ok" {
return "", httpError{error: errors.New(content), statusCode: 401}
}
}
}
}
return store.DataStore.NewUpload(info)
}
func (this *Server) initTus() {
var (
err error
fileLog *os.File
bigDir string
)
BIG_DIR := STORE_DIR + "/_big/" + Config().PeerId
os.MkdirAll(BIG_DIR, 0775)
os.MkdirAll(LOG_DIR, 0775)
store := filestore.FileStore{
Path: BIG_DIR,
}
if fileLog, err = os.OpenFile(LOG_DIR+"/tusd.log", os.O_CREATE|os.O_RDWR, 0666); err != nil {
log.Error(err)
panic("initTus")
}
go func() {
for {
if fi, err := fileLog.Stat(); err != nil {
log.Error(err)
} else {
if fi.Size() > 1024*1024*500 {
//500M
this.util.CopyFile(LOG_DIR+"/tusd.log", LOG_DIR+"/tusd.log.2")
fileLog.Seek(0, 0)
fileLog.Truncate(0)
fileLog.Seek(0, 2)
}
}
time.Sleep(time.Second * 30)
}
}()
l := slog.New(fileLog, "[tusd] ", slog.LstdFlags)
bigDir = CONST_BIG_UPLOAD_PATH_SUFFIX
if Config().SupportGroupManage {
bigDir = fmt.Sprintf("/%s%s", Config().Group, CONST_BIG_UPLOAD_PATH_SUFFIX)
}
composer := tusd.NewStoreComposer()
// support raw tus upload and download
store.GetReaderExt = func(id string) (io.Reader, error) {
var (
offset int64
err error
length int
buffer []byte
fi *FileInfo
fn string
)
if fi, err = this.GetFileInfoFromLevelDB(id); err != nil {
log.Error(err)
return nil, err
} else {
if Config().AuthUrl != "" {
fileResult := this.util.JsonEncodePretty(this.BuildFileResult(fi, nil))
bufferReader := bytes.NewBuffer([]byte(fileResult))
return bufferReader, nil
}
fn = fi.Name
if fi.ReName != "" {
fn = fi.ReName
}
fp := DOCKER_DIR + fi.Path + "/" + fn
if this.util.FileExists(fp) {
log.Info(fmt.Sprintf("download:%s", fp))
return os.Open(fp)
}
ps := strings.Split(fp, ",")
if len(ps) > 2 && this.util.FileExists(ps[0]) {
if length, err = strconv.Atoi(ps[2]); err != nil {
return nil, err
}
if offset, err = strconv.ParseInt(ps[1], 10, 64); err != nil {
return nil, err
}
if buffer, err = this.util.ReadFileByOffSet(ps[0], offset, length); err != nil {
return nil, err
}
if buffer[0] == '1' {
bufferReader := bytes.NewBuffer(buffer[1:])
return bufferReader, nil
} else {
msg := "data no sync"
log.Error(msg)
return nil, errors.New(msg)
}
}
return nil, errors.New(fmt.Sprintf("%s not found", fp))
}
}
store.UseIn(composer)
SetupPreHooks := func(composer *tusd.StoreComposer) {
composer.UseCore(hookDataStore{
DataStore: composer.Core,
})
}
SetupPreHooks(composer)
handler, err := tusd.NewHandler(tusd.Config{
Logger: l,
BasePath: bigDir,
StoreComposer: composer,
NotifyCompleteUploads: true,
RespectForwardedHeaders: true,
})
notify := func(handler *tusd.Handler) {
for {
select {
case info := <-handler.CompleteUploads:
log.Info("CompleteUploads", info)
name := ""
pathCustom := ""
scene := Config().DefaultScene
if v, ok := info.MetaData["filename"]; ok {
name = v
}
if v, ok := info.MetaData["scene"]; ok {
scene = v
}
if v, ok := info.MetaData["path"]; ok {
pathCustom = v
}
var err error
md5sum := ""
oldFullPath := BIG_DIR + "/" + info.ID + ".bin"
infoFullPath := BIG_DIR + "/" + info.ID + ".info"
if md5sum, err = this.util.GetFileSumByName(oldFullPath, Config().FileSumArithmetic); err != nil {
log.Error(err)
continue
}
ext := path.Ext(name)
filename := md5sum + ext
if name != "" {
filename = name
}
if Config().RenameFile {
filename = md5sum + ext
}
timeStamp := time.Now().Unix()
fpath := time.Now().Format("/20060102/15/04/")
if pathCustom != "" {
fpath = "/" + strings.Replace(pathCustom, ".", "", -1) + "/"
}
newFullPath := STORE_DIR + "/" + scene + fpath + Config().PeerId + "/" + filename
if pathCustom != "" {
newFullPath = STORE_DIR + "/" + scene + fpath + filename
}
if fi, err := this.GetFileInfoFromLevelDB(md5sum); err != nil {
log.Error(err)
} else {
tpath := this.GetFilePathByInfo(fi, true)
if fi.Md5 != "" && this.util.FileExists(tpath) {
if _, err := this.SaveFileInfoToLevelDB(info.ID, fi, this.ldb); err != nil {
log.Error(err)
}
log.Info(fmt.Sprintf("file is found md5:%s", fi.Md5))
log.Info("remove file:", oldFullPath)
log.Info("remove file:", infoFullPath)
os.Remove(oldFullPath)
os.Remove(infoFullPath)
continue
}
}
fpath2 := ""
fpath2 = STORE_DIR_NAME + "/" + Config().DefaultScene + fpath + Config().PeerId
if pathCustom != "" {
fpath2 = STORE_DIR_NAME + "/" + Config().DefaultScene + fpath
fpath2 = strings.TrimRight(fpath2, "/")
}
os.MkdirAll(DOCKER_DIR+fpath2, 0775)
fileInfo := &FileInfo{
Name: name,
Path: fpath2,
ReName: filename,
Size: info.Size,
TimeStamp: timeStamp,
Md5: md5sum,
Peers: []string{this.host},
OffSet: -1,
}
if err = os.Rename(oldFullPath, newFullPath); err != nil {
log.Error(err)
continue
}
log.Info(fileInfo)
os.Remove(infoFullPath)
if _, err = this.SaveFileInfoToLevelDB(info.ID, fileInfo, this.ldb); err != nil {
//assosiate file id
log.Error(err)
}
this.SaveFileMd5Log(fileInfo, CONST_FILE_Md5_FILE_NAME)
go this.postFileToPeer(fileInfo)
callBack := func(info tusd.FileInfo, fileInfo *FileInfo) {
if callback_url, ok := info.MetaData["callback_url"]; ok {
req := httplib.Post(callback_url)
req.SetTimeout(time.Second*10, time.Second*10)
req.Param("info", server.util.JsonEncodePretty(fileInfo))
req.Param("id", info.ID)
if _, err := req.String(); err != nil {
log.Error(err)
}
}
}
go callBack(info, fileInfo)
}
}
}
go notify(handler)
if err != nil {
log.Error(err)
}
http.Handle(bigDir, http.StripPrefix(bigDir, handler))
}
func (this *Server) FormatStatInfo() {
var (
data []byte
err error
count int64
stat map[string]interface{}
)
if this.util.FileExists(CONST_STAT_FILE_NAME) {
if data, err = this.util.ReadBinFile(CONST_STAT_FILE_NAME); err != nil {
log.Error(err)
} else {
if err = json.Unmarshal(data, &stat); err != nil {
log.Error(err)
} else {
for k, v := range stat {
switch v.(type) {
case float64:
vv := strings.Split(fmt.Sprintf("%f", v), ".")[0]
if count, err = strconv.ParseInt(vv, 10, 64); err != nil {
log.Error(err)
} else {
this.statMap.Put(k, count)
}
default:
this.statMap.Put(k, v)
}
}
}
}
} else {
this.RepairStatByDate(this.util.GetToDay())
}
}
func (this *Server) initRedis() *redis.Pool {
pool := &redis.Pool{
MaxIdle: Config().Redis.MaxIdle,
MaxActive: Config().Redis.MaxActive,
IdleTimeout: time.Duration(Config().Redis.IdleTimeout) * time.Second,
Wait: true,
Dial: func() (redis.Conn, error) {
conn, err := redis.Dial("tcp", Config().Redis.Address,
redis.DialConnectTimeout(time.Duration(Config().Redis.ConnectTimeout)*time.Second),
redis.DialPassword(Config().Redis.Pwd),
redis.DialDatabase(Config().Redis.DB),
)
if err != nil {
fmt.Println(err)
log.Error(err)
}
return conn, err
},
TestOnBorrow: func(c redis.Conn, t time.Time) error {
_, err := c.Do("ping")
if err != nil {
log.Error(err)
return err
}
return err
},
}
return pool
}
func (this *Server) Routing(w http.ResponseWriter, r *http.Request) {
realPath := r.URL.Path
for re, service := range this.routers {
if m := re.FindStringSubmatch(realPath); m != nil {
//p := re.String()
//if strings.Index(p, STATIC_DIR_NAME) != -1 {
// p = p[0 : strings.LastIndex(p, "/")+1]
// p = strings.Replace(realPath, p, "", 1) //realPath /group1/static/uppy.html
// if b, e := Asset(p); e == nil { // p uppy.html
// w.Write(b)
// return
// }
//}
service(w, r)
return
}
}
this.Download(w, r)
}
func (this *Server) RegisterRouting(pattern *regexp.Regexp, handler func(http.ResponseWriter, *http.Request)) {
this.routers[pattern] = handler
}
func (this *Server) initComponent(isReload bool) {
var (
ip string
)
if ip = os.Getenv("GO_FASTDFS_IP"); ip == "" {
ip = this.util.GetPulicIP()
}
if Config().Host == "" {
if len(strings.Split(Config().Addr, ":")) == 2 {
server.host = fmt.Sprintf("http://%s:%s", ip, strings.Split(Config().Addr, ":")[1])
Config().Host = server.host
}
} else {
if strings.HasPrefix(Config().Host, "http") {
server.host = Config().Host
} else {
server.host = "http://" + Config().Host
}
}
ex, _ := regexp.Compile("\\d+\\.\\d+\\.\\d+\\.\\d+")
var peers []string
for _, peer := range Config().Peers {
if this.util.Contains(ip, ex.FindAllString(peer, -1)) ||
this.util.Contains("127.0.0.1", ex.FindAllString(peer, -1)) {
continue
}
if strings.HasPrefix(peer, "http") {
peers = append(peers, peer)
} else {
peers = append(peers, "http://"+peer)
}
}
Config().Peers = peers
if !isReload {
this.FormatStatInfo()
if Config().EnableTus {
this.initTus()
}
}
for _, s := range Config().Scenes {
kv := strings.Split(s, ":")
if len(kv) == 2 {
this.sceneMap.Put(kv[0], kv[1])
}
}
if Config().EnableRedis {
this.rp = this.initRedis()
}
if Config().ReadTimeout == 0 {
Config().ReadTimeout = 60 * 10
}
if Config().WriteTimeout == 0 {
Config().WriteTimeout = 60 * 10
}
if Config().SyncWorker == 0 {
Config().SyncWorker = 200
}
if Config().UploadWorker == 0 {
Config().UploadWorker = runtime.NumCPU() + 4
if runtime.NumCPU() < 4 {
Config().UploadWorker = 8
}
}
if Config().UploadQueueSize == 0 {
Config().UploadQueueSize = 200
}
if Config().RetryCount == 0 {
Config().RetryCount = 3
}
if Config().SyncDelay == 0 {
Config().SyncDelay = 60
}
if Config().WatchChanSize == 0 {
Config().WatchChanSize = 100000
}
}
type HttpHandler struct {
}
func (HttpHandler) ServeHTTP(res http.ResponseWriter, req *http.Request) {
status_code := "200"
defer func(t time.Time) {
logStr := fmt.Sprintf("[Access] %s | %s | %s | %s | %s |%s",
time.Now().Format("2006/01/02 - 15:04:05"),
//res.Header(),
time.Since(t).String(),
server.util.GetClientIp(req),
req.Method,
status_code,
req.RequestURI,
)
logacc.Info(logStr)
}(time.Now())
defer func() {
if err := recover(); err != nil {
status_code = "500"
res.WriteHeader(500)
print(err)
buff := debug.Stack()
log.Error(err)
log.Error(string(buff))
}
}()
if Config().EnableCrossOrigin {
server.CrossOrigin(res, req)
}
http.DefaultServeMux.ServeHTTP(res, req)
}
func (this *Server) Start() {
go func() {
for {
this.CheckFileAndSendToPeer(this.util.GetToDay(), CONST_Md5_ERROR_FILE_NAME, false)
//fmt.Println("CheckFileAndSendToPeer")
time.Sleep(time.Second * time.Duration(Config().RefreshInterval))
//this.util.RemoveEmptyDir(STORE_DIR)
}
}()
go this.CleanAndBackUp()
go this.CheckClusterStatus()
go this.LoadQueueSendToPeer()
go this.ConsumerPostToPeer()
go this.ConsumerLog()
go this.ConsumerDownLoad()
go this.ConsumerUpload()
go this.RemoveDownloading()
if Config().EnableFsnotify {
go this.WatchFilesChange()
}
//go this.LoadSearchDict()
if Config().EnableMigrate {
go this.RepairFileInfoFromFile()
}
if Config().AutoRepair {
go func() {
for {
time.Sleep(time.Minute * 3)
this.AutoRepair(false)
time.Sleep(time.Minute * 60)
}
}()
}
groupRoute := ""
if Config().SupportGroupManage {
groupRoute = "/" + Config().Group
}
go func() { // force free memory
for {
time.Sleep(time.Minute * 1)
debug.FreeOSMemory()
}
}()
uploadPage := "upload.html"
if groupRoute == "" {
http.HandleFunc(fmt.Sprintf("/%s", uploadPage), this.Index)
} else {
http.HandleFunc(fmt.Sprintf("%s", groupRoute), this.Download)
http.HandleFunc(fmt.Sprintf("%s/%s", groupRoute, uploadPage), this.Index)
}
http.HandleFunc(fmt.Sprintf("%s/check_files_exist", groupRoute), this.CheckFilesExist)
http.HandleFunc(fmt.Sprintf("%s/check_file_exist", groupRoute), this.CheckFileExist)
http.HandleFunc(fmt.Sprintf("%s/upload", groupRoute), this.Upload)
http.HandleFunc(fmt.Sprintf("%s/delete", groupRoute), this.RemoveFile)
http.HandleFunc(fmt.Sprintf("%s/get_file_info", groupRoute), this.GetFileInfo)
http.HandleFunc(fmt.Sprintf("%s/sync", groupRoute), this.Sync)
http.HandleFunc(fmt.Sprintf("%s/stat", groupRoute), this.Stat)
http.HandleFunc(fmt.Sprintf("%s/repair_stat", groupRoute), this.RepairStatWeb)
http.HandleFunc(fmt.Sprintf("%s/status", groupRoute), this.Status)
http.HandleFunc(fmt.Sprintf("%s/repair", groupRoute), this.Repair)
http.HandleFunc(fmt.Sprintf("%s/report", groupRoute), this.Report)
http.HandleFunc(fmt.Sprintf("%s/backup", groupRoute), this.BackUp)
http.HandleFunc(fmt.Sprintf("%s/search", groupRoute), this.Search)
http.HandleFunc(fmt.Sprintf("%s/list_dir", groupRoute), this.ListDir)
http.HandleFunc(fmt.Sprintf("%s/remove_empty_dir", groupRoute), this.RemoveEmptyDir)
http.HandleFunc(fmt.Sprintf("%s/repair_fileinfo", groupRoute), this.RepairFileInfo)
http.HandleFunc(fmt.Sprintf("%s/reload", groupRoute), this.Reload)
http.HandleFunc(fmt.Sprintf("%s/syncfile_info", groupRoute), this.SyncFileInfo)
http.HandleFunc(fmt.Sprintf("%s/get_md5s_by_date", groupRoute), this.GetMd5sForWeb)
http.HandleFunc(fmt.Sprintf("%s/receive_md5s", groupRoute), this.ReceiveMd5s)
http.HandleFunc(fmt.Sprintf("%s/gen_google_secret", groupRoute), this.GenGoogleSecret)
http.HandleFunc(fmt.Sprintf("%s/gen_google_code", groupRoute), this.GenGoogleCode)
http.HandleFunc(fmt.Sprintf("%s", "/"), this.Routing) //default handler
http.Handle(fmt.Sprintf("%s/%s/", groupRoute, STATIC_DIR_NAME), http.StripPrefix(fmt.Sprintf("%s/%s/", groupRoute, STATIC_DIR_NAME), http.FileServer(http.Dir(STATIC_DIR_NAME))))
//server.RegisterRouting(regexp.MustCompile(fmt.Sprintf("%s/%s/.*", groupRoute, STATIC_DIR_NAME)), http.FileServer(http.Dir(STATIC_DIR_NAME)).ServeHTTP)
http.HandleFunc("/"+Config().Group+"/", this.Download)
fmt.Println("Listen on " + Config().Addr)
if Config().EnableHttps {
err := http.ListenAndServeTLS(Config().Addr, CONST_SERVER_CRT_FILE_NAME, CONST_SERVER_KEY_FILE_NAME, new(HttpHandler))
log.Error(err)
fmt.Println(err)
} else {
srv := &http.Server{
Addr: Config().Addr,
Handler: new(HttpHandler),
ReadTimeout: time.Duration(Config().ReadTimeout) * time.Second,
ReadHeaderTimeout: time.Duration(Config().ReadHeaderTimeout) * time.Second,
WriteTimeout: time.Duration(Config().WriteTimeout) * time.Second,
IdleTimeout: time.Duration(Config().IdleTimeout) * time.Second,
}
err := srv.ListenAndServe()
log.Error(err)
fmt.Println(err)
}
}
func main() {
server.Start()
}