[ADD] support app circuit breaker

This commit is contained in:
barnett 2018-04-23 15:07:37 +08:00
parent 98a811b8a7
commit 386e0f926d
6 changed files with 259 additions and 216 deletions

View File

@ -132,7 +132,8 @@ func GitClone(csi CodeSourceInfo, sourceDir string, logger event.Logger, timeout
URL: csi.RepositoryURL,
Progress: progress,
SingleBranch: true,
RecurseSubmodules: git.DefaultSubmoduleRecursionDepth,
Tags: git.NoTags,
RecurseSubmodules: git.NoRecurseSubmodules,
Depth: 1,
}
if csi.Branch != "" {
@ -258,7 +259,7 @@ func GitPull(csi CodeSourceInfo, sourceDir string, logger event.Logger, timeout
opts := &git.PullOptions{
Progress: progress,
SingleBranch: true,
Depth: 3,
Depth: 1,
}
if csi.Branch != "" {
opts.ReferenceName = plumbing.ReferenceName(fmt.Sprintf("refs/heads/%s", csi.Branch))

View File

@ -21,6 +21,7 @@ package sources
import (
"io"
"testing"
"time"
"github.com/goodrain/rainbond/pkg/event"
)
@ -31,16 +32,19 @@ func init() {
})
}
func TestGitClone(t *testing.T) {
start := time.Now()
csi := CodeSourceInfo{
RepositoryURL: "https://github.com/goodrain/rainbond-docs.git",
Branch: "master",
}
//logger := event.GetManager().GetLogger("system")
res, err := GitClone(csi, "/tmp/rainbonddoc2", nil, 1)
res, err := GitClone(csi, "/tmp/rainbonddoc3", nil, 1)
if err != nil {
t.Fatal(err)
}
t.Logf("%+v", res)
t.Logf("Take %d ms", time.Now().Unix()-start.Unix())
commit, err := GetLastCommit(res)
t.Logf("%+v %+v", commit, err)
}
func TestGitPull(t *testing.T) {
@ -66,7 +70,7 @@ func TestGitPullOrClone(t *testing.T) {
Branch: "publiccloud",
}
//logger := event.GetManager().GetLogger("system")
res, err := GitCloneOrPull(csi, "/tmp/goodrainweb", nil, 1)
res, err := GitCloneOrPull(csi, "/tmp/goodrainweb2", nil, 1)
if err != nil {
t.Fatal(err)
}

View File

@ -1,46 +0,0 @@
// Copyright (C) 2014-2018 Goodrain Co., Ltd.
// RAINBOND, Application Management Platform
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version. For any non-GPL usage of Rainbond,
// one or multiple Commercial Licenses authorized by Goodrain Co., Ltd.
// must be obtained first.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package model
const (
//PREFIX PREFIX
PREFIX string = "PREFIX"
//HEADERS HEADERS
HEADERS string = "HEADERS"
//DOMAINS DOMAINS
DOMAINS string = "DOMAINS"
//MaxConnections The maximum number of connections that Envoy will make to the upstream cluster. If not specified, the default is 1024.
MaxConnections string = "MaxConnections"
//MaxPendingRequests The maximum number of pending requests that Envoy will allow to the upstream cluster. If not specified, the default is 1024
MaxPendingRequests string = "MaxPendingRequests"
//MaxRequests The maximum number of parallel requests that Envoy will make to the upstream cluster. If not specified, the default is 1024.
MaxRequests string = "MaxRequests"
//MaxActiveRetries The maximum number of parallel retries that Envoy will allow to the upstream cluster. If not specified, the default is 3.
MaxActiveRetries string = "MaxActiveRetries"
//UPSTREAM upStream
UPSTREAM string = "upStream"
//DOWNSTREAM downStream
DOWNSTREAM string = "downStream"
//WEIGHT WEIGHT
WEIGHT string = "WEIGHT"
//MODELWEIGHT MODEL_WEIGHT
MODELWEIGHT string = "weight_model"
//MODELPREFIX MODEL_PREFIX
MODELPREFIX string = "prefix_model"
)

View File

@ -0,0 +1,47 @@
// RAINBOND, Application Management Platform
// Copyright (C) 2014-2017 Goodrain Co., Ltd.
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version. For any non-GPL usage of Rainbond,
// one or multiple Commercial Licenses authorized by Goodrain Co., Ltd.
// must be obtained first.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package v1
//CreatOutlierDetection create outlierDetection
//https://www.envoyproxy.io/docs/envoy/latest/api-v1/cluster_manager/cluster_outlier_detection#config-cluster-manager-cluster-outlier-detection
func CreatOutlierDetection(options map[string]interface{}) *OutlierDetection {
if _, ok := options[KeyMaxConnections]; !ok {
return nil
}
var od OutlierDetection
od.ConsecutiveErrors = GetOptionValues(KeyConsecutiveErrors, options).(int)
od.IntervalMS = GetOptionValues(KeyIntervalMS, options).(int64)
od.BaseEjectionTimeMS = GetOptionValues(KeyBaseEjectionTimeMS, options).(int64)
od.MaxEjectionPercent = GetOptionValues(KeyMaxEjectionPercent, options).(int)
return &od
}
//CreateCircuitBreaker create circuitBreaker
//https://www.envoyproxy.io/docs/envoy/latest/api-v1/cluster_manager/cluster_circuit_breakers#config-cluster-manager-cluster-circuit-breakers-v1
func CreateCircuitBreaker(options map[string]interface{}) *CircuitBreaker {
if _, ok := options[KeyMaxConnections]; !ok {
return nil
}
var cb CircuitBreaker
cb.Default.MaxConnections = GetOptionValues(KeyMaxConnections, options).(int)
cb.Default.MaxRequests = GetOptionValues(KeyMaxRequests, options).(int)
cb.Default.MaxRetries = GetOptionValues(KeyMaxActiveRetries, options).(int)
cb.Default.MaxPendingRequests = GetOptionValues(KeyMaxPendingRequests, options).(int)
return &cb
}

View File

@ -0,0 +1,189 @@
// RAINBOND, Application Management Platform
// Copyright (C) 2014-2017 Goodrain Co., Ltd.
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version. For any non-GPL usage of Rainbond,
// one or multiple Commercial Licenses authorized by Goodrain Co., Ltd.
// must be obtained first.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package v1
import (
"strconv"
"strings"
"github.com/Sirupsen/logrus"
)
const (
//KeyPrefix request path prefix
KeyPrefix string = "Prefix"
//KeyHeaders request http headers
KeyHeaders string = "Headers"
//KeyDomains request domains
KeyDomains string = "Domains"
//KeyMaxConnections The maximum number of connections that Envoy will make to the upstream cluster. If not specified, the default is 1024.
KeyMaxConnections string = "MaxConnections"
//KeyMaxPendingRequests The maximum number of pending requests that Envoy will allow to the upstream cluster. If not specified, the default is 1024
KeyMaxPendingRequests string = "MaxPendingRequests"
//KeyMaxRequests The maximum number of parallel requests that Envoy will make to the upstream cluster. If not specified, the default is 1024.
KeyMaxRequests string = "MaxRequests"
//KeyMaxActiveRetries The maximum number of parallel retries that Envoy will allow to the upstream cluster. If not specified, the default is 3.
KeyMaxActiveRetries string = "MaxActiveRetries"
//KeyUpStream upStream
KeyUpStream string = "upStream"
//KeyDownStream downStream
KeyDownStream string = "downStream"
//KeyWeight WEIGHT
KeyWeight string = "Weight"
//KeyWeightModel MODEL_WEIGHT
KeyWeightModel string = "weight_model"
//KeyPrefixModel MODEL_PREFIX
KeyPrefixModel string = "prefix_model"
//KeyIntervalMS IntervalMS key
KeyIntervalMS string = "IntervalMS"
//KeyConsecutiveErrors ConsecutiveErrors key
KeyConsecutiveErrors string = "ConsecutiveErrors"
//KeyBaseEjectionTimeMS BaseEjectionTimeMS key
KeyBaseEjectionTimeMS string = "BaseEjectionTimeMS"
//KeyMaxEjectionPercent MaxEjectionPercent key
KeyMaxEjectionPercent string = "MaxEjectionPercent"
)
//GetOptionValues get value from options
//if not exist,return default value
func GetOptionValues(kind string, sr map[string]interface{}) interface{} {
switch kind {
case KeyPrefix:
if prefix, ok := sr[KeyPrefix]; ok {
return prefix
}
return "/"
case KeyMaxConnections:
if circuit, ok := sr[KeyMaxConnections]; ok {
cc, err := strconv.Atoi(circuit.(string))
if err != nil {
logrus.Errorf("strcon circuit error")
return 1024
}
return cc
}
return 1024
case KeyMaxRequests:
if maxRequest, ok := sr[KeyMaxRequests]; ok {
mrt, err := strconv.Atoi(maxRequest.(string))
if err != nil {
logrus.Errorf("strcon max request error")
return 1024
}
return mrt
}
return 1024
case KeyMaxPendingRequests:
if maxPendingRequests, ok := sr[KeyMaxPendingRequests]; ok {
mpr, err := strconv.Atoi(maxPendingRequests.(string))
if err != nil {
logrus.Errorf("strcon max pending request error")
return 1024
}
return mpr
}
return 1024
case KeyMaxActiveRetries:
if maxRetries, ok := sr[KeyMaxActiveRetries]; ok {
mxr, err := strconv.Atoi(maxRetries.(string))
if err != nil {
logrus.Errorf("strcon max retry error")
return 3
}
return mxr
}
return 3
case KeyHeaders:
var np []Header
if headers, ok := sr[KeyHeaders]; ok {
parents := strings.Split(headers.(string), ";")
for _, h := range parents {
headers := strings.Split(h, ":")
//has_header:no 默认
if len(headers) == 2 {
if headers[0] == "has_header" && headers[1] == "no" {
continue
}
ph := Header{
Name: headers[0],
Value: headers[1],
}
np = append(np, ph)
}
}
}
return np
case KeyDomains:
if domain, ok := sr[KeyDomains]; ok {
if strings.Contains(domain.(string), ",") {
mm := strings.Split(domain.(string), ",")
return mm
}
return []string{domain.(string)}
}
return []string{"*"}
case KeyWeight:
if weight, ok := sr[KeyWeight]; ok {
w, err := strconv.Atoi(weight.(string))
if err != nil {
return 100
}
return w
}
return 100
case KeyIntervalMS:
if in, ok := sr[KeyIntervalMS]; ok {
w, err := strconv.Atoi(in.(string))
if err != nil {
return 10000
}
return w
}
return 10000
case KeyConsecutiveErrors:
if in, ok := sr[KeyConsecutiveErrors]; ok {
w, err := strconv.Atoi(in.(string))
if err != nil {
return 5
}
return w
}
return 5
case KeyBaseEjectionTimeMS:
if in, ok := sr[KeyBaseEjectionTimeMS]; ok {
w, err := strconv.Atoi(in.(string))
if err != nil {
return 30000
}
return w
}
return 30000
case KeyMaxEjectionPercent:
if in, ok := sr[KeyMaxEjectionPercent]; ok {
w, err := strconv.Atoi(in.(string))
if err != nil || w > 100 {
return 10
}
return w
}
return 10
default:
return nil
}
}

View File

@ -28,7 +28,6 @@ import (
"github.com/goodrain/rainbond/cmd/node/option"
api_model "github.com/goodrain/rainbond/pkg/api/model"
"github.com/goodrain/rainbond/pkg/api/util"
node_model "github.com/goodrain/rainbond/pkg/node/api/model"
envoyv1 "github.com/goodrain/rainbond/pkg/node/core/envoy/v1"
"github.com/goodrain/rainbond/pkg/node/core/k8s"
"github.com/goodrain/rainbond/pkg/node/core/store"
@ -185,11 +184,13 @@ func (d *DiscoverAction) upstreamClusters(serviceAlias, namespace string, depend
continue
}
pcds := &envoyv1.Cluster{
Name: fmt.Sprintf("%s_%s_%s_%v", namespace, serviceAlias, destServiceAlias, port.Port),
Name: fmt.Sprintf("%s_%s_%v", namespace, serviceAlias, port.Port),
Type: "sds",
ConnectTimeoutMs: 250,
LbType: "round_robin",
ServiceName: fmt.Sprintf("%s_%s_%s_%v", namespace, serviceAlias, destServiceAlias, port.Port),
OutlierDetection: envoyv1.CreatOutlierDetection(destService.Options),
CircuitBreaker: envoyv1.CreateCircuitBreaker(destService.Options),
}
cdsClusters = append(cdsClusters, pcds)
continue
@ -202,18 +203,6 @@ func (d *DiscoverAction) upstreamClusters(serviceAlias, namespace string, depend
//only local port
func (d *DiscoverAction) downstreamClusters(serviceAlias, namespace string, ports []*api_model.BasePort) (cdsClusters envoyv1.Clusters, err *util.APIHandleError) {
for _, port := range ports {
maxConnection := d.ToolsGetRouterItem("", node_model.MaxConnections, port.Options).(int)
maxRequests := d.ToolsGetRouterItem("", node_model.MaxRequests, port.Options).(int)
maxRetries := d.ToolsGetRouterItem("", node_model.MaxActiveRetries, port.Options).(int)
maxPendingRequests := d.ToolsGetRouterItem("", node_model.MaxPendingRequests, port.Options).(int)
cb := &envoyv1.CircuitBreaker{
Default: envoyv1.DefaultCBPriority{
MaxConnections: maxConnection,
MaxPendingRequests: maxPendingRequests,
MaxRequests: maxRequests,
MaxRetries: maxRetries,
},
}
localhost := fmt.Sprintf("tcp://127.0.0.1:%d", port.Port)
pcds := &envoyv1.Cluster{
Name: fmt.Sprintf("%s_%s_%v", namespace, serviceAlias, port.Port),
@ -221,7 +210,7 @@ func (d *DiscoverAction) downstreamClusters(serviceAlias, namespace string, port
ConnectTimeoutMs: 250,
LbType: "round_robin",
Hosts: []envoyv1.Host{envoyv1.Host{URL: localhost}},
CircuitBreaker: cb,
CircuitBreaker: envoyv1.CreateCircuitBreaker(port.Options),
}
cdsClusters = append(cdsClusters, pcds)
continue
@ -287,14 +276,13 @@ func (d *DiscoverAction) upstreamListener(serviceAlias, namespace string, depend
continue
}
for _, service := range services.Items {
serviceType, ok := service.Labels["service_type"]
if !ok || serviceType != "inner" {
if destServiceAlias != serviceAlias {
continue
}
inner, ok := service.Labels["service_type"]
if !ok || inner != "inner" {
continue
}
port := service.Spec.Ports[0].Port
clusterName := fmt.Sprintf("%s_%s_%s_%d", namespace, serviceAlias, destServiceAlias, port)
// Unique by listen port
clusterName := fmt.Sprintf("%s_%s_%d", namespace, serviceAlias, port)
if _, ok := portMap[port]; !ok {
if v, ok := destService.Options["LISTEN"]; !ok || v == "true" {
plds := envoyv1.CreateTCPCommonListener(clusterName, fmt.Sprintf("tcp://127.0.0.1:%d", port))
@ -313,18 +301,18 @@ func (d *DiscoverAction) upstreamListener(serviceAlias, namespace string, depend
options := destService.Options
var prs envoyv1.HTTPRoute
prs.TimeoutMS = 0
prs.Prefix = d.ToolsGetRouterItem(destServiceAlias, node_model.PREFIX, options).(string)
prs.Prefix = envoyv1.GetOptionValues(envoyv1.KeyPrefix, options).(string)
wcn := &envoyv1.WeightedClusterEntry{
Name: clusterName,
Weight: d.ToolsGetRouterItem(destServiceAlias, node_model.WEIGHT, options).(int),
Weight: envoyv1.GetOptionValues(envoyv1.KeyWeight, options).(int),
}
prs.WeightedClusters = &envoyv1.WeightedCluster{
Clusters: []*envoyv1.WeightedClusterEntry{wcn},
}
prs.Headers = d.ToolsGetRouterItem(destServiceAlias, node_model.HEADERS, options).([]envoyv1.Header)
prs.Headers = envoyv1.GetOptionValues(envoyv1.KeyHeaders, options).([]envoyv1.Header)
pvh := &envoyv1.VirtualHost{
Name: fmt.Sprintf("%s_%s_%s_%d", namespace, serviceAlias, destServiceAlias, port),
Domains: d.ToolsGetRouterItem(destServiceAlias, node_model.DOMAINS, options).([]string),
Domains: envoyv1.GetOptionValues(envoyv1.KeyDomains, options).([]string),
Routes: []*envoyv1.HTTPRoute{&prs},
}
vhL = append(vhL, pvh)
@ -372,51 +360,6 @@ func Duplicate(a interface{}) (ret []interface{}) {
return ret
}
//CheckSameDomainAndPrefix check there is same domain or prefix
func (d *DiscoverAction) CheckSameDomainAndPrefix(resources *api_model.ResourceSpec) map[string]string {
baseServices := resources.BaseServices
domainL := make(map[string]string)
if len(baseServices) == 0 {
logrus.Debugf("has no base services resources")
return domainL
}
filterL := make(map[string]int)
for _, bs := range baseServices {
l := len(filterL)
domainName, _ := bs.Options[node_model.DOMAINS].(string)
filterL[domainName] = 0
if len(filterL) == l {
domainL[domainName] = "use"
}
}
for d := range domainL {
prefixM := make(map[string]int)
for _, bs := range baseServices {
domainName, _ := bs.Options[node_model.DOMAINS].(string)
if domainName == d {
prefix, _ := bs.Options[node_model.PREFIX].(string)
prefixM[prefix] = 0
}
// if strings.Contains(domainName, ","){
// mm := strings.Split(domainName, ",")
// for _, n := range mm {
// if n == d {
// prefix, _ := bs.Options[node_model.PREFIX].(string)
// prefixM[prefix] = 0
// continue
// }
// }
// }
}
if len(prefixM) == 1 {
domainL[d] = node_model.MODELWEIGHT
} else {
domainL[d] = node_model.MODELPREFIX
}
}
return domainL
}
//ToolsGetSourcesEnv rds
func (d *DiscoverAction) ToolsGetSourcesEnv(
namespace, sourceAlias, envName string) ([]byte, *util.APIHandleError) {
@ -470,101 +413,6 @@ func (d *DiscoverAction) ToolsGetMainPodEnvs(namespace, serviceAlias string) (
return nil, util.CreateAPIHandleError(404, fmt.Errorf("have no envs for plugin"))
}
//ToolsBuildPieceLDS ToolsBuildPieceLDS
func (d *DiscoverAction) ToolsBuildPieceLDS() {}
//ToolsGetRouterItem ToolsGetRouterItem
func (d *DiscoverAction) ToolsGetRouterItem(
destAlias, kind string, sr map[string]interface{}) interface{} {
switch kind {
case node_model.PREFIX:
if prefix, ok := sr[node_model.PREFIX]; ok {
return prefix
}
return "/"
case node_model.MaxConnections:
if circuit, ok := sr[node_model.MaxConnections]; ok {
cc, err := strconv.Atoi(circuit.(string))
if err != nil {
logrus.Errorf("strcon circuit error")
return 1024
}
return cc
}
return 1024
case node_model.MaxRequests:
if maxRequest, ok := sr[node_model.MaxRequests]; ok {
mrt, err := strconv.Atoi(maxRequest.(string))
if err != nil {
logrus.Errorf("strcon max request error")
return 1024
}
return mrt
}
return 1024
case node_model.MaxPendingRequests:
if maxPendingRequests, ok := sr[node_model.MaxPendingRequests]; ok {
mpr, err := strconv.Atoi(maxPendingRequests.(string))
if err != nil {
logrus.Errorf("strcon max pending request error")
return 1024
}
return mpr
}
return 1024
case node_model.MaxActiveRetries:
if maxRetries, ok := sr[node_model.MaxActiveRetries]; ok {
mxr, err := strconv.Atoi(maxRetries.(string))
if err != nil {
logrus.Errorf("strcon max retry error")
return 3
}
return mxr
}
return 3
case node_model.HEADERS:
var np []envoyv1.Header
if headers, ok := sr[node_model.HEADERS]; ok {
parents := strings.Split(headers.(string), ";")
for _, h := range parents {
headers := strings.Split(h, ":")
//has_header:no 默认
if len(headers) == 2 {
if headers[0] == "has_header" && headers[1] == "no" {
continue
}
ph := envoyv1.Header{
Name: headers[0],
Value: headers[1],
}
np = append(np, ph)
}
}
}
return np
case node_model.DOMAINS:
if domain, ok := sr[node_model.DOMAINS]; ok {
if strings.Contains(domain.(string), ",") {
mm := strings.Split(domain.(string), ",")
return mm
}
return []string{domain.(string)}
}
return []string{destAlias}
case node_model.WEIGHT:
if weight, ok := sr[node_model.WEIGHT]; ok {
w, err := strconv.Atoi(weight.(string))
if err != nil {
return 100
}
return w
}
return 100
default:
return nil
}
}
//ToolsGetRainbondResources get plugin configs from etcd
//if not exist return error
func (d *DiscoverAction) ToolsGetRainbondResources(namespace, sourceAlias, pluginID string) (*api_model.ResourceSpec, error) {