go-fastdfs/server/init.go

340 lines
8.6 KiB
Go

package server
import (
"bytes"
"errors"
"fmt"
"io"
slog "log"
"net/http"
"os"
"path"
"regexp"
"runtime"
"strconv"
"strings"
"time"
"github.com/astaxie/beego/httplib"
log "github.com/sjqzhang/seelog"
"github.com/sjqzhang/tusd"
"github.com/sjqzhang/tusd/filestore"
)
func (c *Server) initTus() {
var (
err error
fileLog *os.File
bigDir string
)
BIG_DIR := STORE_DIR + "/_big/" + Config().PeerId
os.MkdirAll(BIG_DIR, 0775)
os.MkdirAll(LOG_DIR, 0775)
store := filestore.FileStore{
Path: BIG_DIR,
}
if fileLog, err = os.OpenFile(LOG_DIR+"/tusd.log", os.O_CREATE|os.O_RDWR, 0666); err != nil {
log.Error(err)
panic("initTus")
}
go func() {
for {
if fi, err := fileLog.Stat(); err != nil {
log.Error(err)
} else {
if fi.Size() > 1024*1024*500 {
//500M
c.util.CopyFile(LOG_DIR+"/tusd.log", LOG_DIR+"/tusd.log.2")
fileLog.Seek(0, 0)
fileLog.Truncate(0)
fileLog.Seek(0, 2)
}
}
time.Sleep(time.Second * 30)
}
}()
l := slog.New(fileLog, "[tusd] ", slog.LstdFlags)
bigDir = CONST_BIG_UPLOAD_PATH_SUFFIX
if Config().SupportGroupManage {
bigDir = fmt.Sprintf("/%s%s", Config().Group, CONST_BIG_UPLOAD_PATH_SUFFIX)
}
composer := tusd.NewStoreComposer()
composer.UsesTerminater = true
// support raw tus upload and download
store.GetReaderExt = func(id string) (io.Reader, error) {
var (
offset int64
err error
length int
buffer []byte
fi *FileInfo
fn string
)
if fi, err = c.GetFileInfoFromLevelDB(id); err != nil {
log.Error(err)
return nil, err
} else {
if Config().AuthUrl != "" {
fileResult := c.util.JsonEncodePretty(c.BuildFileResult(fi, nil))
bufferReader := bytes.NewBuffer([]byte(fileResult))
return bufferReader, nil
}
fn = fi.Name
if fi.ReName != "" {
fn = fi.ReName
}
fp := DOCKER_DIR + fi.Path + "/" + fn
if c.util.FileExists(fp) {
log.Info(fmt.Sprintf("download:%s", fp))
return os.Open(fp)
}
ps := strings.Split(fp, ",")
if len(ps) > 2 && c.util.FileExists(ps[0]) {
if length, err = strconv.Atoi(ps[2]); err != nil {
return nil, err
}
if offset, err = strconv.ParseInt(ps[1], 10, 64); err != nil {
return nil, err
}
if buffer, err = c.util.ReadFileByOffSet(ps[0], offset, length); err != nil {
return nil, err
}
if buffer[0] == '1' {
bufferReader := bytes.NewBuffer(buffer[1:])
return bufferReader, nil
} else {
msg := "data no sync"
log.Error(msg)
return nil, errors.New(msg)
}
}
return nil, fmt.Errorf("%s not found", fp)
}
}
store.UseIn(composer)
SetupPreHooks := func(composer *tusd.StoreComposer) {
composer.UseCore(hookDataStore{
DataStore: composer.Core,
})
}
SetupPreHooks(composer)
h, err := tusd.NewHandler(tusd.Config{
Logger: l,
BasePath: bigDir,
StoreComposer: composer,
NotifyCompleteUploads: true,
RespectForwardedHeaders: true,
})
notify := func(h *tusd.Handler) {
for {
select {
case info := <-h.CompleteUploads:
callBack := func(info tusd.FileInfo, fileInfo *FileInfo) {
if callback_url, ok := info.MetaData["callback_url"]; ok {
req := httplib.Post(callback_url)
req.SetTimeout(time.Second*10, time.Second*10)
req.Param("info", server.util.JsonEncodePretty(fileInfo))
req.Param("id", info.ID)
if _, err := req.String(); err != nil {
log.Error(err)
}
}
}
log.Info("CompleteUploads", info)
name := ""
pathCustom := ""
scene := Config().DefaultScene
if v, ok := info.MetaData["filename"]; ok {
name = v
}
if v, ok := info.MetaData["scene"]; ok {
scene = v
}
if v, ok := info.MetaData["path"]; ok {
pathCustom = v
}
var err error
md5sum := ""
oldFullPath := BIG_DIR + "/" + info.ID + ".bin"
infoFullPath := BIG_DIR + "/" + info.ID + ".info"
if md5sum, err = c.util.GetFileSumByName(oldFullPath, Config().FileSumArithmetic); err != nil {
log.Error(err)
continue
}
ext := path.Ext(name)
filename := md5sum + ext
if name != "" {
filename = name
}
if Config().RenameFile {
filename = md5sum + ext
}
timeStamp := time.Now().Unix()
fpath := time.Now().Format("/20060102/15/04/")
if pathCustom != "" {
fpath = "/" + strings.Replace(pathCustom, ".", "", -1) + "/"
}
newFullPath := STORE_DIR + "/" + scene + fpath + Config().PeerId + "/" + filename
if pathCustom != "" {
newFullPath = STORE_DIR + "/" + scene + fpath + filename
}
if fi, err := c.GetFileInfoFromLevelDB(md5sum); err != nil {
log.Error(err)
} else {
tpath := c.GetFilePathByInfo(fi, true)
if fi.Md5 != "" && c.util.FileExists(tpath) {
var err error
var fileInfo *FileInfo
if fileInfo, err = c.SaveFileInfoToLevelDB(info.ID, fi, c.ldb); err != nil {
log.Error(err)
}
log.Info(fmt.Sprintf("file is found md5:%s", fi.Md5))
log.Info("remove file:", oldFullPath)
log.Info("remove file:", infoFullPath)
os.Remove(oldFullPath)
os.Remove(infoFullPath)
go callBack(info, fileInfo)
continue
}
}
fpath2 := ""
fpath2 = STORE_DIR_NAME + "/" + Config().DefaultScene + fpath + Config().PeerId
if pathCustom != "" {
fpath2 = STORE_DIR_NAME + "/" + Config().DefaultScene + fpath
fpath2 = strings.TrimRight(fpath2, "/")
}
os.MkdirAll(DOCKER_DIR+fpath2, 0775)
fileInfo := &FileInfo{
Name: name,
Path: fpath2,
ReName: filename,
Size: info.Size,
TimeStamp: timeStamp,
Md5: md5sum,
Peers: []string{c.host},
OffSet: -1,
}
if err = os.Rename(oldFullPath, newFullPath); err != nil {
log.Error(err)
continue
}
log.Info(fileInfo)
os.Remove(infoFullPath)
if _, err = c.SaveFileInfoToLevelDB(info.ID, fileInfo, c.ldb); err != nil {
//assosiate file id
log.Error(err)
}
c.SaveFileMd5Log(fileInfo, CONST_FILE_Md5_FILE_NAME)
go c.postFileToPeer(fileInfo)
go callBack(info, fileInfo)
}
}
}
h.AddMiddleWare(func() http.HandlerFunc {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method == http.MethodPost {
meta := tusd.ParseMetadataHeader(r.Header.Get("Upload-Metadata"))
if name, ok := meta["filename"]; ok {
if len(Config().Extensions) > 0 && !c.util.Contains(path.Ext(strings.ToLower(name)), Config().Extensions) {
r.Header.Set("Suspend", "True")
}
}
}
})
}())
go notify(h)
if err != nil {
log.Error(err)
}
//http.Handle(bigDir, http.StripPrefix(bigDir, h))
mux.Handle(bigDir, http.StripPrefix(bigDir, h))
}
func (c *Server) initComponent(isReload bool) {
var (
ip string
)
if ip = os.Getenv("GO_FASTDFS_IP"); ip == "" {
ip = c.util.GetPulicIP()
}
if Config().Host == "" {
if len(strings.Split(Config().Addr, ":")) == 2 {
server.host = fmt.Sprintf("http://%s:%s", ip, strings.Split(Config().Addr, ":")[1])
Config().Host = server.host
}
} else {
if strings.HasPrefix(Config().Host, "http") {
server.host = Config().Host
} else {
server.host = "http://" + Config().Host
}
}
ex, _ := regexp.Compile("\\d+\\.\\d+\\.\\d+\\.\\d+")
var peers []string
for _, peer := range Config().Peers {
if c.util.Contains(ip, ex.FindAllString(peer, -1)) ||
c.util.Contains("127.0.0.1", ex.FindAllString(peer, -1)) {
continue
}
if strings.HasPrefix(peer, "http") {
peers = append(peers, peer)
} else {
peers = append(peers, "http://"+peer)
}
}
Config().Peers = peers
if !isReload {
if Config().EnablePprofDebug {
mux = http.DefaultServeMux
} else {
mux = http.NewServeMux()
}
c.FormatStatInfo()
if Config().EnableTus {
c.initTus()
}
}
for _, s := range Config().Scenes {
kv := strings.Split(s, ":")
if len(kv) == 2 {
c.sceneMap.Put(kv[0], kv[1])
}
}
if Config().ReadTimeout == 0 {
Config().ReadTimeout = 60 * 10
}
if Config().WriteTimeout == 0 {
Config().WriteTimeout = 60 * 10
}
if Config().SyncWorker == 0 {
Config().SyncWorker = 200
}
if Config().UploadWorker == 0 {
Config().UploadWorker = runtime.NumCPU() + 4
if runtime.NumCPU() < 4 {
Config().UploadWorker = 8
}
}
if Config().UploadQueueSize == 0 {
Config().UploadQueueSize = 200
}
if Config().RetryCount == 0 {
Config().RetryCount = 3
}
if Config().SyncDelay == 0 {
Config().SyncDelay = 60
}
if Config().WatchChanSize == 0 {
Config().WatchChanSize = 100000
}
if Config().ImageMaxHeight == 0 {
Config().ImageMaxHeight = 2000
}
if Config().ImageMaxWidth == 0 {
Config().ImageMaxWidth = 2000
}
}