mirror of
https://gitee.com/sjqzhang/go-fastdfs.git
synced 2024-11-30 02:07:45 +08:00
320 lines
8.0 KiB
Go
320 lines
8.0 KiB
Go
package server
|
|
|
|
import (
|
|
"bytes"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
slog "log"
|
|
"net/http"
|
|
"os"
|
|
"path"
|
|
"regexp"
|
|
"runtime"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/astaxie/beego/httplib"
|
|
"github.com/busyfree/tusd/pkg/filestore"
|
|
"github.com/busyfree/tusd/pkg/handler"
|
|
log "github.com/sjqzhang/seelog"
|
|
)
|
|
|
|
func (c *Server) initTus() {
|
|
var (
|
|
err error
|
|
fileLog *os.File
|
|
bigDir string
|
|
)
|
|
BIG_DIR := STORE_DIR + "/_big/" + Config().PeerId
|
|
os.MkdirAll(BIG_DIR, 0775)
|
|
os.MkdirAll(LOG_DIR, 0775)
|
|
store := filestore.FileStore{
|
|
Path: BIG_DIR,
|
|
}
|
|
if fileLog, err = os.OpenFile(LOG_DIR+"/tusd.log", os.O_CREATE|os.O_RDWR, 0666); err != nil {
|
|
log.Error(err)
|
|
panic("initTus")
|
|
}
|
|
go func() {
|
|
for {
|
|
if fi, err := fileLog.Stat(); err != nil {
|
|
log.Error(err)
|
|
} else {
|
|
if fi.Size() > 1024*1024*500 {
|
|
//500M
|
|
c.util.CopyFile(LOG_DIR+"/tusd.log", LOG_DIR+"/tusd.log.2")
|
|
fileLog.Seek(0, 0)
|
|
fileLog.Truncate(0)
|
|
fileLog.Seek(0, 2)
|
|
}
|
|
}
|
|
time.Sleep(time.Second * 30)
|
|
}
|
|
}()
|
|
l := slog.New(fileLog, "[tusd] ", slog.LstdFlags)
|
|
bigDir = CONST_BIG_UPLOAD_PATH_SUFFIX
|
|
if Config().SupportGroupManage {
|
|
bigDir = fmt.Sprintf("/%s%s", Config().Group, CONST_BIG_UPLOAD_PATH_SUFFIX)
|
|
}
|
|
composer := handler.NewStoreComposer()
|
|
// support raw tus upload and download
|
|
store.GetReaderExt = func(id string) (io.Reader, error) {
|
|
var (
|
|
offset int64
|
|
err error
|
|
length int
|
|
buffer []byte
|
|
fi *FileInfo
|
|
fn string
|
|
)
|
|
if fi, err = c.GetFileInfoFromLevelDB(id); err != nil {
|
|
log.Error(err)
|
|
return nil, err
|
|
} else {
|
|
if Config().AuthUrl != "" {
|
|
fileResult := c.util.JsonEncodePretty(c.BuildFileResult(fi, nil))
|
|
bufferReader := bytes.NewBuffer([]byte(fileResult))
|
|
return bufferReader, nil
|
|
}
|
|
fn = fi.Name
|
|
if fi.ReName != "" {
|
|
fn = fi.ReName
|
|
}
|
|
fp := DOCKER_DIR + fi.Path + "/" + fn
|
|
if c.util.FileExists(fp) {
|
|
log.Info(fmt.Sprintf("download:%s", fp))
|
|
return os.Open(fp)
|
|
}
|
|
ps := strings.Split(fp, ",")
|
|
if len(ps) > 2 && c.util.FileExists(ps[0]) {
|
|
if length, err = strconv.Atoi(ps[2]); err != nil {
|
|
return nil, err
|
|
}
|
|
if offset, err = strconv.ParseInt(ps[1], 10, 64); err != nil {
|
|
return nil, err
|
|
}
|
|
if buffer, err = c.util.ReadFileByOffSet(ps[0], offset, length); err != nil {
|
|
return nil, err
|
|
}
|
|
if buffer[0] == '1' {
|
|
bufferReader := bytes.NewBuffer(buffer[1:])
|
|
return bufferReader, nil
|
|
} else {
|
|
msg := "data no sync"
|
|
log.Error(msg)
|
|
return nil, errors.New(msg)
|
|
}
|
|
}
|
|
return nil, errors.New(fmt.Sprintf("%s not found", fp))
|
|
}
|
|
}
|
|
store.UseIn(composer)
|
|
SetupPreHooks := func(composer *handler.StoreComposer) {
|
|
composer.UseCore(hookDataStore{
|
|
DataStore: composer.Core,
|
|
})
|
|
}
|
|
SetupPreHooks(composer)
|
|
h, err := handler.NewHandler(handler.Config{
|
|
Logger: l,
|
|
BasePath: bigDir,
|
|
StoreComposer: composer,
|
|
NotifyCompleteUploads: true,
|
|
RespectForwardedHeaders: true,
|
|
})
|
|
notify := func(h *handler.Handler) {
|
|
for {
|
|
select {
|
|
case info := <-h.CompleteUploads:
|
|
callBack := func(info handler.FileInfo, fileInfo *FileInfo) {
|
|
if callback_url, ok := info.MetaData["callback_url"]; ok {
|
|
req := httplib.Post(callback_url)
|
|
req.SetTimeout(time.Second*10, time.Second*10)
|
|
req.Param("info", server.util.JsonEncodePretty(fileInfo))
|
|
req.Param("id", info.ID)
|
|
if _, err := req.String(); err != nil {
|
|
log.Error(err)
|
|
}
|
|
}
|
|
}
|
|
log.Info("CompleteUploads", info)
|
|
name := ""
|
|
pathCustom := ""
|
|
scene := Config().DefaultScene
|
|
if v, ok := info.Upload.MetaData["filename"]; ok {
|
|
name = v
|
|
}
|
|
if v, ok := info.Upload.MetaData["scene"]; ok {
|
|
scene = v
|
|
}
|
|
if v, ok := info.Upload.MetaData["path"]; ok {
|
|
pathCustom = v
|
|
}
|
|
var err error
|
|
md5sum := ""
|
|
oldFullPath := BIG_DIR + "/" + info.Upload.ID + ".bin"
|
|
infoFullPath := BIG_DIR + "/" + info.Upload.ID + ".info"
|
|
if md5sum, err = c.util.GetFileSumByName(oldFullPath, Config().FileSumArithmetic); err != nil {
|
|
log.Error(err)
|
|
continue
|
|
}
|
|
ext := path.Ext(name)
|
|
filename := md5sum + ext
|
|
if name != "" {
|
|
filename = name
|
|
}
|
|
if Config().RenameFile {
|
|
filename = md5sum + ext
|
|
}
|
|
timeStamp := time.Now().Unix()
|
|
fpath := time.Now().Format("/20060102/15/04/")
|
|
if pathCustom != "" {
|
|
fpath = "/" + strings.Replace(pathCustom, ".", "", -1) + "/"
|
|
}
|
|
newFullPath := STORE_DIR + "/" + scene + fpath + Config().PeerId + "/" + filename
|
|
if pathCustom != "" {
|
|
newFullPath = STORE_DIR + "/" + scene + fpath + filename
|
|
}
|
|
if fi, err := c.GetFileInfoFromLevelDB(md5sum); err != nil {
|
|
log.Error(err)
|
|
} else {
|
|
tpath := c.GetFilePathByInfo(fi, true)
|
|
if fi.Md5 != "" && c.util.FileExists(tpath) {
|
|
var err error
|
|
var fileInfo *FileInfo
|
|
if fileInfo, err = c.SaveFileInfoToLevelDB(info.Upload.ID, fi, c.ldb); err != nil {
|
|
log.Error(err)
|
|
}
|
|
log.Info(fmt.Sprintf("file is found md5:%s", fi.Md5))
|
|
log.Info("remove file:", oldFullPath)
|
|
log.Info("remove file:", infoFullPath)
|
|
os.Remove(oldFullPath)
|
|
os.Remove(infoFullPath)
|
|
go callBack(info.Upload, fileInfo)
|
|
continue
|
|
}
|
|
}
|
|
fpath2 := ""
|
|
fpath2 = STORE_DIR_NAME + "/" + Config().DefaultScene + fpath + Config().PeerId
|
|
if pathCustom != "" {
|
|
fpath2 = STORE_DIR_NAME + "/" + Config().DefaultScene + fpath
|
|
fpath2 = strings.TrimRight(fpath2, "/")
|
|
}
|
|
|
|
os.MkdirAll(DOCKER_DIR+fpath2, 0775)
|
|
fileInfo := &FileInfo{
|
|
Name: name,
|
|
Path: fpath2,
|
|
ReName: filename,
|
|
Size: info.Upload.Size,
|
|
TimeStamp: timeStamp,
|
|
Md5: md5sum,
|
|
Peers: []string{c.host},
|
|
OffSet: -1,
|
|
}
|
|
if err = os.Rename(oldFullPath, newFullPath); err != nil {
|
|
log.Error(err)
|
|
continue
|
|
}
|
|
log.Info(fileInfo)
|
|
os.Remove(infoFullPath)
|
|
if _, err = c.SaveFileInfoToLevelDB(info.Upload.ID, fileInfo, c.ldb); err != nil {
|
|
//assosiate file id
|
|
log.Error(err)
|
|
}
|
|
c.SaveFileMd5Log(fileInfo, CONST_FILE_Md5_FILE_NAME)
|
|
go c.postFileToPeer(fileInfo)
|
|
|
|
go callBack(info.Upload, fileInfo)
|
|
}
|
|
}
|
|
}
|
|
go notify(h)
|
|
if err != nil {
|
|
log.Error(err)
|
|
}
|
|
http.Handle(bigDir, http.StripPrefix(bigDir, h))
|
|
}
|
|
|
|
func (c *Server) initComponent(isReload bool) {
|
|
var (
|
|
ip string
|
|
)
|
|
if ip = os.Getenv("GO_FASTDFS_IP"); ip == "" {
|
|
ip = c.util.GetPulicIP()
|
|
}
|
|
if Config().Host == "" {
|
|
if len(strings.Split(Config().Addr, ":")) == 2 {
|
|
server.host = fmt.Sprintf("http://%s:%s", ip, strings.Split(Config().Addr, ":")[1])
|
|
Config().Host = server.host
|
|
}
|
|
} else {
|
|
if strings.HasPrefix(Config().Host, "http") {
|
|
server.host = Config().Host
|
|
} else {
|
|
server.host = "http://" + Config().Host
|
|
}
|
|
}
|
|
ex, _ := regexp.Compile("\\d+\\.\\d+\\.\\d+\\.\\d+")
|
|
var peers []string
|
|
for _, peer := range Config().Peers {
|
|
if c.util.Contains(ip, ex.FindAllString(peer, -1)) ||
|
|
c.util.Contains("127.0.0.1", ex.FindAllString(peer, -1)) {
|
|
continue
|
|
}
|
|
if strings.HasPrefix(peer, "http") {
|
|
peers = append(peers, peer)
|
|
} else {
|
|
peers = append(peers, "http://"+peer)
|
|
}
|
|
}
|
|
Config().Peers = peers
|
|
if !isReload {
|
|
c.FormatStatInfo()
|
|
if Config().EnableTus {
|
|
c.initTus()
|
|
}
|
|
}
|
|
for _, s := range Config().Scenes {
|
|
kv := strings.Split(s, ":")
|
|
if len(kv) == 2 {
|
|
c.sceneMap.Put(kv[0], kv[1])
|
|
}
|
|
}
|
|
if Config().ReadTimeout == 0 {
|
|
Config().ReadTimeout = 60 * 10
|
|
}
|
|
if Config().WriteTimeout == 0 {
|
|
Config().WriteTimeout = 60 * 10
|
|
}
|
|
if Config().SyncWorker == 0 {
|
|
Config().SyncWorker = 200
|
|
}
|
|
if Config().UploadWorker == 0 {
|
|
Config().UploadWorker = runtime.NumCPU() + 4
|
|
if runtime.NumCPU() < 4 {
|
|
Config().UploadWorker = 8
|
|
}
|
|
}
|
|
if Config().UploadQueueSize == 0 {
|
|
Config().UploadQueueSize = 200
|
|
}
|
|
if Config().RetryCount == 0 {
|
|
Config().RetryCount = 3
|
|
}
|
|
if Config().SyncDelay == 0 {
|
|
Config().SyncDelay = 60
|
|
}
|
|
if Config().WatchChanSize == 0 {
|
|
Config().WatchChanSize = 100000
|
|
}
|
|
if Config().ImageMaxHeight == 0 {
|
|
Config().ImageMaxHeight = 2000
|
|
}
|
|
if Config().ImageMaxWidth == 0 {
|
|
Config().ImageMaxWidth = 2000
|
|
}
|
|
}
|