[REV] merge master branch

This commit is contained in:
barnettZQG 2018-07-13 10:35:03 +08:00
commit dfee3abf01
14 changed files with 202 additions and 126 deletions

View File

@ -116,14 +116,18 @@ func (s *Manager) checkStatus() {
deployInfo, err := db.GetManager().K8sDeployReplicationDao().GetK8sDeployReplicationByService(serviceID)
if err != nil {
if err == gorm.ErrRecordNotFound {
s.SetStatus(serviceID, CLOSED)
if s.GetStatus(serviceID) != UNDEPLOY && s.GetStatus(serviceID) != DEPLOYING {
s.SetStatus(serviceID, CLOSED)
}
continue
}
logrus.Error("get deploy info error where check application status.", err.Error())
continue
}
if deployInfo == nil || len(deployInfo) == 0 {
s.SetStatus(serviceID, CLOSED)
if s.GetStatus(serviceID) != UNDEPLOY && s.GetStatus(serviceID) != DEPLOYING {
s.SetStatus(serviceID, CLOSED)
}
continue
}
switch deployInfo[0].ReplicationType {

View File

@ -107,6 +107,7 @@ func NewSouceCodeBuildItem(in []byte) *SourceCodeBuildItem {
Lang: gjson.GetBytes(in, "lang").String(),
Runtime: gjson.GetBytes(in, "runtime").String(),
BuildEnvs: be,
commit: &object.Commit{},
}
scb.CacheDir = fmt.Sprintf("/cache/build/%s/cache/%s", scb.TenantID, scb.ServiceID)
//scb.SourceDir = scb.CodeSouceInfo.GetCodeSourceDir()
@ -136,7 +137,7 @@ func (i *SourceCodeBuildItem) Run(timeout time.Duration) error {
rs, err := sources.GitCloneOrPull(i.CodeSouceInfo, rbi.GetCodeHome(), i.Logger, 5)
if err != nil {
logrus.Errorf("pull git code error: %s", err.Error())
i.Logger.Error(fmt.Sprintf("拉取代码失败,请重试"), map[string]string{"step": "builder-exector", "status": "failure"})
i.Logger.Error(fmt.Sprintf("拉取代码失败,请确保代码可以被正常下载"), map[string]string{"step": "builder-exector", "status": "failure"})
return err
}
//get last commit

View File

@ -190,7 +190,10 @@ func (b *BackupAPPRestore) restoreVersionAndData(backup *dbmodel.AppBackup, appS
logrus.Errorf("restore service(%s) volume(%s) data error.%s", app.ServiceID, volume.VolumeName, err.Error())
return err
}
os.MkdirAll(tmpDir, 0777)
//backup data is not exist because dir is empty.
//so create host path and continue
os.MkdirAll(volume.HostPath, 0777)
continue
}
//if app type is statefulset, change pod hostpath
if b.getServiceType(app.ServiceLabel) == util.StatefulServiceType {
@ -207,7 +210,13 @@ func (b *BackupAPPRestore) restoreVersionAndData(backup *dbmodel.AppBackup, appS
newpath := filepath.Join(util.GetParentDirectory(path), newName)
err := util.Rename(path, newpath)
if err != nil {
return err
if strings.Contains(err.Error(), "file exists") {
if err := util.MergeDir(path, newpath); err != nil {
return err
}
} else {
return err
}
}
if err := os.Chmod(newpath, 0777); err != nil {
return err
@ -216,7 +225,13 @@ func (b *BackupAPPRestore) restoreVersionAndData(backup *dbmodel.AppBackup, appS
}
err := util.Rename(tmpDir, util.GetParentDirectory(volume.HostPath))
if err != nil {
return err
if strings.Contains(err.Error(), "file exists") {
if err := util.MergeDir(tmpDir, util.GetParentDirectory(volume.HostPath)); err != nil {
return err
}
} else {
return err
}
}
if err := os.Chmod(volume.HostPath, 0777); err != nil {
return err
@ -227,20 +242,27 @@ func (b *BackupAPPRestore) restoreVersionAndData(backup *dbmodel.AppBackup, appS
dstDir := fmt.Sprintf("%s/data_%s/%s_common.zip", b.cacheDir, b.getOldServiceID(app.ServiceID), b.getOldServiceID(app.ServiceID))
tmpDir := fmt.Sprintf("/grdata/tmp/%s_%s", app.ServiceID, app.ServiceID)
if err := util.Unzip(dstDir, tmpDir); err != nil {
logrus.Errorf("restore service(%s) common data error.%s", app.ServiceID, err.Error())
return err
}
err := util.Rename(tmpDir, util.GetParentDirectory(app.Service.HostPath))
if err != nil {
if strings.Contains(err.Error(), "file exists") {
if err := util.MergeDir(tmpDir, util.GetParentDirectory(app.Service.HostPath)); err != nil {
if !strings.Contains(err.Error(), "no such file") {
logrus.Errorf("restore service(%s) default volume data error.%s", app.ServiceID, err.Error())
return err
}
//backup data is not exist because dir is empty.
//so create host path and continue
os.MkdirAll(app.Service.HostPath, 0777)
} else {
err := util.Rename(tmpDir, util.GetParentDirectory(app.Service.HostPath))
if err != nil {
if strings.Contains(err.Error(), "file exists") {
if err := util.MergeDir(tmpDir, util.GetParentDirectory(app.Service.HostPath)); err != nil {
return err
}
} else {
return err
}
}
return err
}
if err := os.Chmod(app.Service.HostPath, 0777); err != nil {
return err
if err := os.Chmod(app.Service.HostPath, 0777); err != nil {
return err
}
}
}
b.Logger.Info(fmt.Sprintf("完成恢复应用(%s)持久化数据", app.Service.ServiceAlias), map[string]string{"step": "restore_builder", "status": "success"})

View File

@ -117,6 +117,7 @@ type Conf struct {
Etcd client.Config
StatsdConfig StatsdConfig
UDPMonitorConfig UDPMonitorConfig
MinResyncPeriod time.Duration
}
//StatsdConfig StatsdConfig
@ -175,6 +176,7 @@ func (a *Conf) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&a.StatsdConfig.StatsdListenTCP, "statsd.listen-tcp", ":9125", "The TCP address on which to receive statsd metric lines. \"\" disables it.")
fs.StringVar(&a.StatsdConfig.MappingConfig, "statsd.mapping-config", "", "Metric mapping configuration file name.")
fs.IntVar(&a.StatsdConfig.ReadBuffer, "statsd.read-buffer", 0, "Size (in bytes) of the operating system's transmit read buffer associated with the UDP connection. Please make sure the kernel parameters net.core.rmem_max is set to a value greater than the value specified.")
fs.DurationVar(&a.MinResyncPeriod, "min-resync-period", time.Hour*12, "The resync period in reflectors will be random between MinResyncPeriod and 2*MinResyncPeriod")
}
//SetLog 设置log

View File

@ -33,6 +33,7 @@ import (
"github.com/goodrain/rainbond/node/nodeserver"
"github.com/goodrain/rainbond/node/statsd"
"github.com/prometheus/client_golang/prometheus"
"k8s.io/client-go/informers"
"k8s.io/client-go/pkg/api/v1"
"github.com/Sirupsen/logrus"
@ -69,15 +70,16 @@ func Run(c *option.Conf) error {
return fmt.Errorf("Connect to ETCD %s failed: %s",
c.Etcd.Endpoints, err)
}
if c.K8SConfPath != "" {
if err := k8s.NewK8sClient(c); err != nil {
return fmt.Errorf("Connect to K8S %s failed: %s",
c.K8SConfPath, err)
}
} else {
return fmt.Errorf("Connect to K8S %s failed: kubeconfig file not found",
c.K8SConfPath)
stop := make(chan struct{})
if err := k8s.NewK8sClient(c); err != nil {
return fmt.Errorf("Connect to K8S %s failed: %s",
c.K8SConfPath, err)
}
sharedInformers := informers.NewSharedInformerFactory(k8s.K8S, c.MinResyncPeriod)
sharedInformers.Core().V1().Services().Informer()
sharedInformers.Core().V1().Endpoints().Informer()
sharedInformers.Start(stop)
defer close(stop)
s, err := nodeserver.NewNodeServer(c) //todo 配置文件 done
if err != nil {
@ -118,7 +120,7 @@ func Run(c *option.Conf) error {
return err
}
//启动API服务
apiManager := api.NewManager(*s.Conf, s.HostNode, ms, exporter)
apiManager := api.NewManager(*s.Conf, s.HostNode, ms, exporter, sharedInformers)
if err := apiManager.Start(errChan); err != nil {
return err
}

View File

@ -25,6 +25,7 @@ import (
"github.com/goodrain/rainbond/node/core/config"
"github.com/goodrain/rainbond/node/core/service"
"github.com/goodrain/rainbond/node/masterserver"
"k8s.io/client-go/informers"
)
var datacenterConfig *config.DataCenterConfig
@ -37,9 +38,9 @@ var nodeService *service.NodeService
var discoverService *service.DiscoverAction
//Init 初始化
func Init(c *option.Conf, ms *masterserver.MasterServer) {
func Init(c *option.Conf, ms *masterserver.MasterServer, sharedInformers informers.SharedInformerFactory) {
if ms != nil {
prometheusService=service.CreatePrometheusService(c,ms)
prometheusService = service.CreatePrometheusService(c, ms)
taskService = service.CreateTaskService(c, ms)
taskTempService = service.CreateTaskTempService(c)
taskGroupService = service.CreateTaskGroupService(c, ms)
@ -47,7 +48,7 @@ func Init(c *option.Conf, ms *masterserver.MasterServer) {
nodeService = service.CreateNodeService(c, ms.Cluster)
}
appService = service.CreateAppService(c)
discoverService = service.CreateDiscoverActionManager(c)
discoverService = service.CreateDiscoverActionManager(c, sharedInformers)
}
//Exist 退出

View File

@ -28,6 +28,7 @@ import (
"github.com/goodrain/rainbond/node/statsd"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"k8s.io/client-go/informers"
"github.com/goodrain/rainbond/node/api/controller"
"github.com/goodrain/rainbond/node/api/model"
@ -59,10 +60,10 @@ type Manager struct {
}
//NewManager api manager
func NewManager(c option.Conf, node *model.HostNode, ms *masterserver.MasterServer, exporter *statsd.Exporter) *Manager {
func NewManager(c option.Conf, node *model.HostNode, ms *masterserver.MasterServer, exporter *statsd.Exporter, sharedInformers informers.SharedInformerFactory) *Manager {
r := router.Routers(c.RunMode)
ctx, cancel := context.WithCancel(context.Background())
controller.Init(&c, ms)
controller.Init(&c, ms, sharedInformers)
m := &Manager{
ctx: ctx,
cancel: cancel,

View File

@ -823,7 +823,7 @@ func CreateHTTPCommonListener(name string, vh ...*VirtualHost) *Listener {
//CreateTCPCommonListener create tcp simple common listener
//listen the specified port
//associate the specified cluster.
func CreateTCPCommonListener(clusterName string, address string) *Listener {
func CreateTCPCommonListener(listenerName, clusterName string, address string) *Listener {
ptr := &TCPRoute{
Cluster: clusterName,
}
@ -831,7 +831,7 @@ func CreateTCPCommonListener(clusterName string, address string) *Listener {
Routes: []*TCPRoute{ptr},
}
lcg := &TCPProxyFilterConfig{
StatPrefix: clusterName,
StatPrefix: listenerName,
RouteConfig: lrs,
}
lfs := &NetworkFilter{
@ -839,7 +839,7 @@ func CreateTCPCommonListener(clusterName string, address string) *Listener {
Config: lcg,
}
plds := &Listener{
Name: clusterName,
Name: listenerName,
Address: address,
Filters: []*NetworkFilter{lfs},
BindToPort: true,

View File

@ -19,8 +19,6 @@
package k8s
import (
"time"
"github.com/goodrain/rainbond/cmd/node/option"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
@ -41,7 +39,6 @@ func NewK8sClient(cfg *option.Conf) error {
if err != nil {
return err
}
config.Timeout = time.Second * 5
config.QPS = 50
config.Burst = 100
cli, err := kubernetes.NewForConfig(config)

View File

@ -20,8 +20,11 @@ package k8s
import (
"testing"
"time"
"github.com/goodrain/rainbond/cmd/node/option"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/informers"
)
func init() {
@ -36,3 +39,33 @@ func TestGetPodsByNodeName(t *testing.T) {
}
t.Log(pods)
}
func TestSharedInformerFactory(t *testing.T) {
sharedInformers := informers.NewSharedInformerFactory(K8S, time.Hour*10)
sharedInformers.Core().V1().Nodes().Informer()
sharedInformers.Core().V1().Services().Informer()
stop := make(chan struct{})
sharedInformers.Start(stop)
time.Sleep(time.Second * 30)
selector, err := labels.Parse("")
if err != nil {
t.Fatal(err)
}
for i := 0; i < 2; i++ {
nodes, err := sharedInformers.Core().V1().Nodes().Lister().List(selector)
if err != nil {
t.Fatal(err)
}
t.Log(nodes)
}
for i := 0; i < 2; i++ {
selector, _ := labels.Parse("name=gr87b487Service")
nodes, err := sharedInformers.Core().V1().Services().Lister().Services("824b2e9dcc4d461a852ddea20369d377").List(selector)
if err != nil {
t.Fatal(err)
}
t.Log(nodes)
time.Sleep(time.Second * 5)
}
}

View File

@ -30,24 +30,25 @@ import (
"github.com/goodrain/rainbond/api/util"
"github.com/goodrain/rainbond/cmd/node/option"
envoyv1 "github.com/goodrain/rainbond/node/core/envoy/v1"
"github.com/goodrain/rainbond/node/core/k8s"
"github.com/goodrain/rainbond/node/core/store"
"github.com/pquerna/ffjson/ffjson"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
v1 "k8s.io/client-go/pkg/api/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/informers"
)
//DiscoverAction DiscoverAction
type DiscoverAction struct {
conf *option.Conf
etcdCli *store.Client
conf *option.Conf
etcdCli *store.Client
sharedInformers informers.SharedInformerFactory
}
//CreateDiscoverActionManager CreateDiscoverActionManager
func CreateDiscoverActionManager(conf *option.Conf) *DiscoverAction {
func CreateDiscoverActionManager(conf *option.Conf, sharedInformers informers.SharedInformerFactory) *DiscoverAction {
return &DiscoverAction{
conf: conf,
etcdCli: store.DefalutClient,
conf: conf,
etcdCli: store.DefalutClient,
sharedInformers: sharedInformers,
}
}
@ -63,28 +64,34 @@ func (d *DiscoverAction) DiscoverService(serviceInfo string) (*envoyv1.SDSHost,
dPort := mm[3]
labelname := fmt.Sprintf("name=%sService", destServiceAlias)
endpoints, err := k8s.K8S.Core().Endpoints(namespace).List(metav1.ListOptions{LabelSelector: labelname})
//logrus.Debugf("labelname is %s, endpoints is %v, items is %v", labelname, endpoints, endpoints.Items)
selector, err := labels.Parse(labelname)
if err != nil {
return nil, util.CreateAPIHandleError(500, err)
}
services, err := k8s.K8S.Core().Services(namespace).List(metav1.ListOptions{LabelSelector: labelname})
endpoints, err := d.sharedInformers.Core().V1().Endpoints().Lister().Endpoints(namespace).List(selector)
if err != nil {
return nil, util.CreateAPIHandleError(500, err)
}
if len(endpoints.Items) == 0 {
services, err := d.sharedInformers.Core().V1().Services().Lister().Services(namespace).List(selector)
if err != nil {
return nil, util.CreateAPIHandleError(500, err)
}
if len(endpoints) == 0 {
if destServiceAlias == serviceAlias {
labelname := fmt.Sprintf("name=%sServiceOUT", destServiceAlias)
var err error
endpoints, err = k8s.K8S.Core().Endpoints(namespace).List(metav1.ListOptions{LabelSelector: labelname})
selector, err := labels.Parse(labelname)
if err != nil {
return nil, util.CreateAPIHandleError(500, err)
}
if len(endpoints.Items) == 0 {
endpoints, err = d.sharedInformers.Core().V1().Endpoints().Lister().Endpoints(namespace).List(selector)
if err != nil {
return nil, util.CreateAPIHandleError(500, err)
}
if len(endpoints) == 0 {
logrus.Debugf("outer endpoints items length is 0, continue")
return nil, util.CreateAPIHandleError(400, fmt.Errorf("outer have no endpoints"))
}
services, err = k8s.K8S.Core().Services(namespace).List(metav1.ListOptions{LabelSelector: labelname})
services, err = d.sharedInformers.Core().V1().Services().Lister().Services(namespace).List(selector)
if err != nil {
return nil, util.CreateAPIHandleError(500, err)
}
@ -93,7 +100,7 @@ func (d *DiscoverAction) DiscoverService(serviceInfo string) (*envoyv1.SDSHost,
}
}
var sdsL []*envoyv1.DiscoverHost
for key, item := range endpoints.Items {
for key, item := range endpoints {
if len(item.Subsets) < 1 {
continue
}
@ -105,9 +112,9 @@ func (d *DiscoverAction) DiscoverService(serviceInfo string) (*envoyv1.SDSHost,
if dPort != fmt.Sprintf("%d", port) {
continue
}
toport := int(services.Items[key].Spec.Ports[0].Port)
toport := int(services[key].Spec.Ports[0].Port)
if serviceAlias == destServiceAlias {
if originPort, ok := services.Items[key].Labels["origin_port"]; ok {
if originPort, ok := services[key].Labels["origin_port"]; ok {
origin, err := strconv.Atoi(originPort)
if err != nil {
return nil, util.CreateAPIHandleError(500, fmt.Errorf("have no origin_port"))
@ -176,14 +183,18 @@ func (d *DiscoverAction) upstreamClusters(serviceAlias, namespace string, depend
destService := dependsServices[i]
destServiceAlias := destService.DependServiceAlias
labelname := fmt.Sprintf("name=%sService", destServiceAlias)
services, err := k8s.K8S.Core().Services(namespace).List(metav1.ListOptions{LabelSelector: labelname})
selector, err := labels.Parse(labelname)
if err != nil {
return nil, util.CreateAPIHandleError(500, err)
}
if len(services.Items) == 0 {
services, err := d.sharedInformers.Core().V1().Services().Lister().Services(namespace).List(selector)
if err != nil {
return nil, util.CreateAPIHandleError(500, err)
}
if len(services) == 0 {
continue
}
for _, service := range services.Items {
for _, service := range services {
inner, ok := service.Labels["service_type"]
port := service.Spec.Ports[0]
if !ok || inner != "inner" {
@ -290,42 +301,40 @@ func (d *DiscoverAction) upstreamListener(serviceAlias, namespace string, depend
for i := range dependsServices {
destService := dependsServices[i]
destServiceAlias := destService.DependServiceAlias
labelname := fmt.Sprintf("name=%sService", destService.DependServiceAlias)
start := time.Now()
services, err := k8s.K8S.Core().Services(namespace).List(metav1.ListOptions{LabelSelector: labelname})
labelname := fmt.Sprintf("name=%sService", destServiceAlias)
selector, err := labels.Parse(labelname)
if err != nil {
return nil, util.CreateAPIHandleError(500, err)
}
services, err := d.sharedInformers.Core().V1().Services().Lister().Services(namespace).List(selector)
if err != nil {
return nil, util.CreateAPIHandleError(500, err)
}
fmt.Printf("get %s service cost time %s \n", destService.DependServiceAlias, time.Now().Sub(start).String())
if len(services.Items) == 0 {
if len(services) == 0 {
logrus.Debugf("inner endpoints items length is 0, continue")
continue
}
for _, service := range services.Items {
for _, service := range services {
inner, ok := service.Labels["service_type"]
if !ok || inner != "inner" {
continue
}
port := service.Spec.Ports[0].Port
clusterName := fmt.Sprintf("%s_%s_%s_%d", namespace, serviceAlias, destServiceAlias, port)
// Unique by listen port
if index, ok := portMap[port]; !ok {
clusterName := fmt.Sprintf("%s_%s_%s_%d", namespace, serviceAlias, destServiceAlias, port)
plds := envoyv1.CreateTCPCommonListener(clusterName, fmt.Sprintf("tcp://127.0.0.1:%d", port))
if _, ok := portMap[port]; !ok {
listenerName := fmt.Sprintf("%s_%s_%d", namespace, serviceAlias, port)
plds := envoyv1.CreateTCPCommonListener(listenerName, clusterName, fmt.Sprintf("tcp://127.0.0.1:%d", port))
ldsL = append(ldsL, plds)
portMap[port] = len(ldsL) - 1
} else if index != -1 {
clusterName := fmt.Sprintf("%s_%s_%d", namespace, serviceAlias, port)
plds := envoyv1.CreateTCPCommonListener(clusterName, fmt.Sprintf("tcp://127.0.0.1:%d", port))
ldsL[index] = plds
//only create one cluster for same port
portMap[port] = -1
}
portProtocol, ok := service.Labels["port_protocol"]
if !ok {
portProtocol = destService.Protocol
}
if portProtocol != "" {
clusterName := fmt.Sprintf("%s_%s_%s_%d", namespace, serviceAlias, destServiceAlias, port)
//TODO: support more protocol
switch portProtocol {
case "http", "https":
@ -378,7 +387,7 @@ func (d *DiscoverAction) downstreamListener(serviceAlias, namespace string, port
port := int32(p.Port)
clusterName := fmt.Sprintf("%s_%s_%d", namespace, serviceAlias, port)
if _, ok := portMap[port]; !ok {
plds := envoyv1.CreateTCPCommonListener(clusterName, fmt.Sprintf("tcp://0.0.0.0:%d", p.ListenPort))
plds := envoyv1.CreateTCPCommonListener(clusterName, clusterName, fmt.Sprintf("tcp://0.0.0.0:%d", p.ListenPort))
ldsL = append(ldsL, plds)
portMap[port] = 1
}
@ -415,43 +424,6 @@ func (d *DiscoverAction) ToolsGetSourcesEnv(
return []byte{}, nil
}
//ToolsGetK8SServiceList GetK8SServiceList
func (d *DiscoverAction) ToolsGetK8SServiceList(uuid string) (*v1.ServiceList, error) {
serviceList, err := k8s.K8S.Core().Services(uuid).List(metav1.ListOptions{})
if err != nil {
return nil, err
}
return serviceList, nil
}
//ToolsGetMainPodEnvs ToolsGetMainPodEnvs
func (d *DiscoverAction) ToolsGetMainPodEnvs(namespace, serviceAlias string) (
*[]v1.EnvVar,
*util.APIHandleError) {
labelname := fmt.Sprintf("name=%s", serviceAlias)
pods, err := k8s.K8S.Core().Pods(namespace).List(metav1.ListOptions{LabelSelector: labelname})
logrus.Debugf("service_alias %s pod is %v", serviceAlias, pods)
if err != nil {
return nil, util.CreateAPIHandleError(500, err)
}
if len(pods.Items) == 0 {
return nil,
util.CreateAPIHandleError(404, fmt.Errorf("have no pod for discover"))
}
if len(pods.Items[0].Spec.Containers) < 2 {
return nil,
util.CreateAPIHandleError(404, fmt.Errorf("have no net plugins for discover"))
}
for _, c := range pods.Items[0].Spec.Containers {
for _, e := range c.Env {
if e.Name == "PLUGIN_MOEL" && strings.Contains(e.Value, "net-plugin") {
return &c.Env, nil
}
}
}
return nil, util.CreateAPIHandleError(404, fmt.Errorf("have no envs for plugin"))
}
//ToolsGetRainbondResources get plugin configs from etcd
//if not exist return error
func (d *DiscoverAction) ToolsGetRainbondResources(namespace, sourceAlias, pluginID string) (*api_model.ResourceSpec, error) {

View File

@ -29,6 +29,7 @@ import (
"os/exec"
"path"
"path/filepath"
"reflect"
"strconv"
"strings"
"sync"
@ -383,7 +384,7 @@ func walkDir(dir string, wg *sync.WaitGroup, fileSizes chan<- int64, concurrent
//sema is a counting semaphore for limiting concurrency in listDir
var sema = make(chan struct{}, 20)
// 列出指定目录下的所有条目
//读取目录dir下的文件信息
func listDir(dir string) []os.FileInfo {
sema <- struct{}{}
defer func() { <-sema }()
@ -461,22 +462,23 @@ func Zip(source, target string) error {
if err != nil {
return err
}
header, err := zip.FileInfoHeader(info)
if err != nil {
return err
}
if baseDir != "" {
header.Name = filepath.Join(baseDir, strings.TrimPrefix(path, source))
}
if info.IsDir() {
header.Name += "/"
} else {
header.Method = zip.Deflate
}
//set file uid and
elem := reflect.ValueOf(info.Sys()).Elem()
uid := elem.FieldByName("Uid").Uint()
gid := elem.FieldByName("Gid").Uint()
header.Comment = fmt.Sprintf("%d/%d", uid, gid)
writer, err := archive.CreateHeader(header)
if err != nil {
return err
@ -514,6 +516,16 @@ func Unzip(archive, target string) error {
path := filepath.Join(target, file.Name)
if file.FileInfo().IsDir() {
os.MkdirAll(path, file.Mode())
if file.Comment != "" && strings.Contains(file.Comment, "/") {
guid := strings.Split(file.Comment, "/")
if len(guid) == 2 {
uid, _ := strconv.Atoi(guid[0])
gid, _ := strconv.Atoi(guid[1])
if err := os.Chown(path, uid, gid); err != nil {
return err
}
}
}
return nil
}
@ -532,6 +544,16 @@ func Unzip(archive, target string) error {
if _, err := io.Copy(targetFile, fileReader); err != nil {
return err
}
if file.Comment != "" && strings.Contains(file.Comment, "/") {
guid := strings.Split(file.Comment, "/")
if len(guid) == 2 {
uid, _ := strconv.Atoi(guid[0])
gid, _ := strconv.Atoi(guid[1])
if err := os.Chown(path, uid, gid); err != nil {
return err
}
}
}
return nil
}
if err := run(); err != nil {
@ -572,6 +594,7 @@ func Rename(old, new string) error {
}
//MergeDir MergeDir
//if Subdirectories already exist, Don't replace
func MergeDir(fromdir, todir string) error {
files, err := ioutil.ReadDir(fromdir)
if err != nil {
@ -579,7 +602,9 @@ func MergeDir(fromdir, todir string) error {
}
for _, f := range files {
if err := os.Rename(path.Join(fromdir, f.Name()), path.Join(todir, f.Name())); err != nil {
return err
if !strings.Contains(err.Error(), "file exists") {
return err
}
}
}
return nil

View File

@ -51,13 +51,13 @@ func TestGetDirSizeByCmd(t *testing.T) {
}
func TestZip(t *testing.T) {
if err := Zip("/tmp/test/", "/tmp/test.zip"); err != nil {
if err := Zip("/tmp/cache", "/tmp/cache.zip"); err != nil {
t.Fatal(err)
}
}
func TestUnzip(t *testing.T) {
if err := Unzip("/tmp/test.aaa.zip", "/tmp/rainbond"); err != nil {
if err := Unzip("/tmp/cache.zip", "/tmp/cache0"); err != nil {
t.Fatal(err)
}
}

View File

@ -37,6 +37,8 @@ type Demuxer struct {
max int
pending []byte
bufSize int
buf []byte
// Progress is where the progress messages are stored
Progress Progress
@ -49,11 +51,16 @@ func NewDemuxer(t Type, r io.Reader) *Demuxer {
max = MaxPackedSize
}
// buffer default size 8KB
size := 8 * 1024
return &Demuxer{
t: t,
r: r,
max: max,
s: pktline.NewScanner(r),
t: t,
r: r,
max: max,
bufSize: size,
buf: make([]byte, size),
s: pktline.NewScanner(r),
}
}
@ -110,10 +117,19 @@ func (d *Demuxer) nextPackData() ([]byte, error) {
return nil, io.EOF
}
content = d.s.Bytes()
payload := d.s.Bytes()
size := len(payload)
size := len(content)
if size <= 1 {
if d.bufSize < size {
content = make([]byte, size)
copy(content, payload)
} else {
length := copy(d.buf, payload)
content = d.buf[:length]
}
size = len(content)
if size < 1 {
return nil, nil
} else if size > d.max {
return nil, ErrMaxPackedExceeded