add service ratelimit support

This commit is contained in:
barnett 2019-06-14 15:19:53 +08:00
parent d877d4781c
commit c797894103
26 changed files with 501 additions and 161 deletions

View File

@ -917,7 +917,7 @@ func (s *ServiceAction) EnvAttr(action string, at *dbmodel.TenantServiceEnvVar)
func (s *ServiceAction) PortVar(action, tenantID, serviceID string, vps *api_model.ServicePorts, oldPort int) error {
crt, err := db.GetManager().TenantServicePluginRelationDao().CheckSomeModelPluginByServiceID(
serviceID,
dbmodel.UpNetPlugin,
dbmodel.InBoundNetPlugin,
)
if err != nil {
return err
@ -982,7 +982,7 @@ func (s *ServiceAction) PortVar(action, tenantID, serviceID string, vps *api_mod
if crt {
pluginPort, err := db.GetManager().TenantServicesStreamPluginPortDao().GetPluginMappingPortByServiceIDAndContainerPort(
serviceID,
dbmodel.UpNetPlugin,
dbmodel.InBoundNetPlugin,
oldPort,
)
goon := true
@ -1027,7 +1027,7 @@ func (s *ServiceAction) PortOuter(tenantName, serviceID string, containerPort in
}
hasUpStream, err := db.GetManager().TenantServicePluginRelationDao().CheckSomeModelPluginByServiceID(
serviceID,
dbmodel.UpNetPlugin,
dbmodel.InBoundNetPlugin,
)
if err != nil {
return nil, "", fmt.Errorf("get plugin relations error: %s", err.Error())
@ -1047,7 +1047,7 @@ func (s *ServiceAction) PortOuter(tenantName, serviceID string, containerPort in
if hasUpStream {
pluginPort, err := db.GetManager().TenantServicesStreamPluginPortDao().GetPluginMappingPortByServiceIDAndContainerPort(
serviceID,
dbmodel.UpNetPlugin,
dbmodel.InBoundNetPlugin,
containerPort,
)
if err != nil {
@ -1065,7 +1065,7 @@ func (s *ServiceAction) PortOuter(tenantName, serviceID string, containerPort in
}
if err := db.GetManager().TenantServicesStreamPluginPortDaoTransactions(tx).DeletePluginMappingPortByContainerPort(
serviceID,
dbmodel.UpNetPlugin,
dbmodel.InBoundNetPlugin,
containerPort,
); err != nil {
tx.Rollback()
@ -1108,7 +1108,7 @@ func (s *ServiceAction) PortOuter(tenantName, serviceID string, containerPort in
if hasUpStream {
pluginPort, err := db.GetManager().TenantServicesStreamPluginPortDao().GetPluginMappingPortByServiceIDAndContainerPort(
serviceID,
dbmodel.UpNetPlugin,
dbmodel.InBoundNetPlugin,
containerPort,
)
var pPort int
@ -1117,7 +1117,7 @@ func (s *ServiceAction) PortOuter(tenantName, serviceID string, containerPort in
ppPort, err := db.GetManager().TenantServicesStreamPluginPortDaoTransactions(tx).SetPluginMappingPort(
p.TenantID,
serviceID,
dbmodel.UpNetPlugin,
dbmodel.InBoundNetPlugin,
containerPort,
)
if err != nil {
@ -1163,7 +1163,7 @@ func (s *ServiceAction) PortInner(tenantName, serviceID, operation string, port
}
hasUpStream, err := db.GetManager().TenantServicePluginRelationDao().CheckSomeModelPluginByServiceID(
serviceID,
dbmodel.UpNetPlugin,
dbmodel.InBoundNetPlugin,
)
if err != nil {
return fmt.Errorf("get plugin relations error: %s", err.Error())
@ -1180,7 +1180,7 @@ func (s *ServiceAction) PortInner(tenantName, serviceID, operation string, port
if hasUpStream {
pluginPort, err := db.GetManager().TenantServicesStreamPluginPortDao().GetPluginMappingPortByServiceIDAndContainerPort(
serviceID,
dbmodel.UpNetPlugin,
dbmodel.InBoundNetPlugin,
port,
)
if err != nil {
@ -1197,7 +1197,7 @@ func (s *ServiceAction) PortInner(tenantName, serviceID, operation string, port
}
if err := db.GetManager().TenantServicesStreamPluginPortDaoTransactions(tx).DeletePluginMappingPortByContainerPort(
serviceID,
dbmodel.UpNetPlugin,
dbmodel.InBoundNetPlugin,
port,
); err != nil {
tx.Rollback()
@ -1223,7 +1223,7 @@ func (s *ServiceAction) PortInner(tenantName, serviceID, operation string, port
if hasUpStream {
pluginPort, err := db.GetManager().TenantServicesStreamPluginPortDao().GetPluginMappingPortByServiceIDAndContainerPort(
serviceID,
dbmodel.UpNetPlugin,
dbmodel.InBoundNetPlugin,
port,
)
var pPort int
@ -1232,7 +1232,7 @@ func (s *ServiceAction) PortInner(tenantName, serviceID, operation string, port
ppPort, err := db.GetManager().TenantServicesStreamPluginPortDaoTransactions(tx).SetPluginMappingPort(
p.TenantID,
serviceID,
dbmodel.UpNetPlugin,
dbmodel.InBoundNetPlugin,
port,
)
if err != nil {

View File

@ -64,7 +64,7 @@ func (s *ServiceAction) TenantServiceDeletePluginRelation(tenantID, serviceID, p
return util.CreateAPIHandleErrorFromDBError("delete service plugin config failure", err)
}
plugin, _ := db.GetManager().TenantPluginDao().GetPluginByID(pluginID, tenantID)
if plugin != nil && plugin.PluginModel == dbmodel.UpNetPlugin {
if plugin != nil && checkPluginHaveInbound(plugin.PluginModel) {
if err := db.GetManager().TenantServicesStreamPluginPortDaoTransactions(tx).DeleteAllPluginMappingPortByServiceID(serviceID); err != nil {
if err != gorm.ErrRecordNotFound {
tx.Rollback()
@ -100,7 +100,7 @@ func (s *ServiceAction) SetTenantServicePluginRelation(tenantID, serviceID strin
return nil, util.CreateAPIHandleErrorFromDBError("plugin version get error ", err)
}
var openPorts = make(map[int]bool)
if plugin.PluginModel == dbmodel.UpNetPlugin {
if checkPluginHaveInbound(plugin.PluginModel) {
ports, err := db.GetManager().TenantServicesPortDao().GetPortsByServiceID(serviceID)
if err != nil {
return nil, util.CreateAPIHandleErrorFromDBError("get ports by service id", err)
@ -113,12 +113,12 @@ func (s *ServiceAction) SetTenantServicePluginRelation(tenantID, serviceID strin
}
tx := db.GetManager().Begin()
if configs := pss.Body.ConfigEnvs.ComplexEnvs; configs != nil {
if configs.BasePorts != nil && plugin.PluginModel == dbmodel.UpNetPlugin {
if configs.BasePorts != nil && checkPluginHaveInbound(plugin.PluginModel) {
for _, p := range configs.BasePorts {
pluginPort, err := db.GetManager().TenantServicesStreamPluginPortDaoTransactions(tx).SetPluginMappingPort(
tenantID,
serviceID,
dbmodel.UpNetPlugin,
dbmodel.InBoundNetPlugin,
p.Port,
)
if err != nil {
@ -197,6 +197,9 @@ func (s *ServiceAction) normalEnvs(tx *gorm.DB, serviceID, pluginID string, envs
}
return nil
}
func checkPluginHaveInbound(model string) bool {
return model == dbmodel.InBoundNetPlugin || model == dbmodel.InBoundAndOutBoundNetPlugin
}
//UpdateVersionEnv UpdateVersionEnv
func (s *ServiceAction) UpdateVersionEnv(uve *api_model.SetVersionEnv) *util.APIHandleError {
@ -212,12 +215,12 @@ func (s *ServiceAction) UpdateVersionEnv(uve *api_model.SetVersionEnv) *util.API
}
}
if uve.Body.ConfigEnvs.ComplexEnvs != nil {
if uve.Body.ConfigEnvs.ComplexEnvs.BasePorts != nil && plugin.PluginModel == dbmodel.UpNetPlugin {
if uve.Body.ConfigEnvs.ComplexEnvs.BasePorts != nil && checkPluginHaveInbound(plugin.PluginModel) {
for _, p := range uve.Body.ConfigEnvs.ComplexEnvs.BasePorts {
pluginPort, err := db.GetManager().TenantServicesStreamPluginPortDaoTransactions(tx).SetPluginMappingPort(
uve.Body.TenantID,
uve.Body.ServiceID,
dbmodel.UpNetPlugin,
dbmodel.InBoundNetPlugin,
p.Port,
)
if err != nil {

View File

@ -399,7 +399,7 @@ func getRecommendedMemory(lang code.Lang) int {
if lang == code.PHP {
return 512
}
return 128
return 512
}
func (d *SourceCodeParse) errappend(pe ParseError) {

View File

@ -33,9 +33,9 @@ import (
v2 "github.com/envoyproxy/go-control-plane/envoy/api/v2"
)
var testClusterID = "5dbea040f5cb437c82e3bf02943fb02e_c5618104b2aa4c508390e4f18f316500_gr439125"
var testClusterID = "2bf54c5a0b5a48a890e2dda8635cb507_aff929446a7e48bea94c75447ed40b09_grc9e8e3"
var testXDSHost = "39.96.17.249:6101"
var testXDSHost = "192.168.195.1:6101"
// var testClusterID = "6ab5725e1ca34cfba7762b7ac10c0dee_9d379258e0bc4fc581331780b0541ac6_grc69d9c"
//var testXDSHost = "127.0.0.1:6101"

View File

@ -95,8 +95,10 @@ type Conf struct {
ServiceManager string
EnableInitStart bool
AutoRegistNode bool
DockerCli *dockercli.Client
EtcdCli *client.Client
//enable collect docker container log
EnableCollectLog bool
DockerCli *dockercli.Client
EtcdCli *client.Client
}
//StatsdConfig StatsdConfig
@ -144,6 +146,7 @@ func (a *Conf) AddFlags(fs *pflag.FlagSet) {
fs.BoolVar(&a.EnableInitStart, "enable-init-start", false, "Whether the node daemon launches docker and etcd service")
fs.BoolVar(&a.AutoRegistNode, "auto-registnode", true, "Whether auto regist node info to cluster where node is not found")
fs.BoolVar(&a.AutoScheduler, "auto-scheduler", true, "Whether auto set node unscheduler where current node is unhealth")
fs.BoolVar(&a.EnableCollectLog, "enabel-collect-log", true, "Whether to collect container logs")
fs.DurationVar(&a.AutoUnschedulerUnHealthDuration, "autounscheduler-unhealthy-dura", 5*time.Minute, "Node unhealthy duration, after the automatic offline,if set 0,disable auto handle unscheduler.default is 5 Minute")
}

View File

@ -20,8 +20,9 @@ package dao
import (
"errors"
"github.com/goodrain/rainbond/db/model"
"time"
"github.com/goodrain/rainbond/db/model"
)
var (
@ -188,7 +189,7 @@ type TenantServiceRelationDao interface {
//TenantServicesStreamPluginPortDao TenantServicesStreamPluginPortDao
type TenantServicesStreamPluginPortDao interface {
Dao
GetPluginMappingPorts(serviceID string, pluginModel string) ([]*model.TenantServicesStreamPluginPort, error)
GetPluginMappingPorts(serviceID string) ([]*model.TenantServicesStreamPluginPort, error)
SetPluginMappingPort(
tenantID string,
serviceID string,

View File

@ -1491,8 +1491,8 @@ func (_mr *_MockTenantServicesStreamPluginPortDaoRecorder) UpdateModel(arg0 inte
return _mr.mock.ctrl.RecordCall(_mr.mock, "UpdateModel", arg0)
}
func (_m *MockTenantServicesStreamPluginPortDao) GetPluginMappingPorts(serviceID string, pluginModel string) ([]*model.TenantServicesStreamPluginPort, error) {
ret := _m.ctrl.Call(_m, "GetPluginMappingPorts", serviceID, pluginModel)
func (_m *MockTenantServicesStreamPluginPortDao) GetPluginMappingPorts(serviceID string) ([]*model.TenantServicesStreamPluginPort, error) {
ret := _m.ctrl.Call(_m, "GetPluginMappingPorts", serviceID)
ret0, _ := ret[0].([]*model.TenantServicesStreamPluginPort)
ret1, _ := ret[1].(error)
return ret0, ret1

View File

@ -37,7 +37,7 @@ type TenantPlugin struct {
GitURL string `gorm:"column:git_url" json:"git_url"`
//build mode
BuildModel string `gorm:"column:build_model" json:"build_model"`
//plugin model InitPlugin,UpNetPlugin,DownNetPlugin
//plugin model InitPlugin,InBoundNetPlugin,OutBoundNetPlugin
PluginModel string `gorm:"column:plugin_model" json:"plugin_model"`
//tenant id
TenantID string `gorm:"column:tenant_id" json:"tenant_id"`
@ -190,11 +190,14 @@ func (t *TenantServicesStreamPluginPort) TableName() string {
//InitPlugin 初始化插件
var InitPlugin = "init-plugin"
//UpNetPlugin 上游网络插件
var UpNetPlugin = "net-plugin:up"
//InBoundNetPlugin 入站治理网络插件
var InBoundNetPlugin = "net-plugin:up"
//DownNetPlugin 下游网络插件
var DownNetPlugin = "net-plugin:down"
//OutBoundNetPlugin 出站治理网络插件
var OutBoundNetPlugin = "net-plugin:down"
//InBoundAndOutBoundNetPlugin 出站和入站治理
var InBoundAndOutBoundNetPlugin = "net-plugin:in-and-out"
//GeneralPlugin 一般插件,默认分类,优先级最低
var GeneralPlugin = "general-plugin"

View File

@ -559,10 +559,9 @@ func (t *TenantServicesStreamPluginPortDaoImpl) UpdateModel(mo model.Interface)
//GetPluginMappingPorts GetPluginMappingPorts 降序排列
func (t *TenantServicesStreamPluginPortDaoImpl) GetPluginMappingPorts(
serviceID string, pluginModel string) ([]*model.TenantServicesStreamPluginPort, error) {
serviceID string) ([]*model.TenantServicesStreamPluginPort, error) {
var ports []*model.TenantServicesStreamPluginPort
if err := t.DB.Where("service_id=? and plugin_model=?",
serviceID, pluginModel).Order("plugin_port asc").Find(&ports).Error; err != nil {
if err := t.DB.Where("service_id=?", serviceID).Order("plugin_port asc").Find(&ports).Error; err != nil {
return nil, err
}
return ports, nil
@ -592,7 +591,7 @@ func (t *TenantServicesStreamPluginPortDaoImpl) SetPluginMappingPort(
serviceID string,
pluginModel string,
containerPort int) (int, error) {
ports, err := t.GetPluginMappingPorts(serviceID, pluginModel)
ports, err := t.GetPluginMappingPorts(serviceID)
if err != nil {
return 0, err
}

View File

@ -0,0 +1,12 @@
FROM envoyproxy/envoy:v1.9.0
LABEL "author"="zengqg@goodrain.com"
RUN apt-get update && apt-get install -y bash curl net-tools wget vim && \
wget https://github.com/barnettZQG/env2file/releases/download/0.1.1/env2file-linux -O /usr/bin/env2file
ADD . /root/
RUN chmod 755 /root/start.sh && chmod 755 /usr/bin/env2file
ENV ENVOY_BINARY="/usr/local/bin/envoy"
WORKDIR /root
CMD ["./start.sh"]

View File

@ -0,0 +1,5 @@
VERSION=5.1
image:
docker build -t rainbond/plugin_mesh:${VERSION} .
test:
docker run -e XDS_HOST_IP=192.168.1.112 -e TENANT_ID=5dbea040f5cb437c82e3bf02943fb02e -e PLUGIN_ID=c5618104b2aa4c508390e4f18f316500 -e SERVICE_NAME=gr439125 --rm -it rainbond/plugin_mesh:5.1

View File

@ -0,0 +1,17 @@
# envoy_discover_service
`PREFIX`  URL前缀path配置例如/api
`DOMAINS` 内网请求域名配置,基于配置的域名转发至下游应用
`LIMITS`  TCP限速配置范围02048于框体内填入数字若配置0则触熔断
`MaxPendingRequests` HTTP挂起请求配置范围02048于框体内填入数字配置0则立即挂起请求
`WEIGHT` 转发权重设置范围1~100该参数会判断多个拥有相同域名的下游服务来进行权重分配权重之和必须是100否则会导致无法访问
`HEADERS` HTTP请求头设置为k:v格式多个由“;”隔开例如header1:mm;header2:nn
`MaxRequests` 最大请求数限制默认为1024, 设置0为0请求
`MaxRetries` 最大重试次数默认为3, 设置0为0重试

View File

@ -0,0 +1,54 @@
admin:
access_log_path: /tmp/admin_access.log
address:
socket_address: { address: 0.0.0.0, port_value: ${MANAGE_PORT:65533} }
dynamic_resources:
lds_config:
api_config_source:
api_type: GRPC
grpc_services:
envoy_grpc:
cluster_name: rainbond_xds_cluster
cds_config:
api_config_source:
api_type: GRPC
grpc_services:
envoy_grpc:
cluster_name: rainbond_xds_cluster
static_resources:
clusters:
- name: rainbond_xds_cluster
connect_timeout: 0.25s
type: STATIC
lb_policy: ROUND_ROBIN
http2_protocol_options: {}
load_assignment:
cluster_name: rainbond_xds_cluster
endpoints:
- lb_endpoints:
- endpoint:
address:
socket_address:
address: ${XDS_HOST_IP:172.30.42.1}
port_value: ${XDS_HOST_PORT:6101}
- name: rate_limit_service_cluster
connect_timeout: 0.25s
type: STATIC
lb_policy: ROUND_ROBIN
http2_protocol_options: {}
load_assignment:
cluster_name: rate_limit_service_cluster
endpoints:
- lb_endpoints:
- endpoint:
address:
socket_address:
address: ${RATE_LIMIT_SERVER_HOST:127.0.0.1}
port_value: ${RATE_LIMIT_SERVER_PORT:8081}
# version 1.10.0 and newer do not support
rate_limit_service:
grpc_service:
envoy_grpc:
cluster_name: rate_limit_service_cluster

View File

@ -0,0 +1,5 @@
#!/bin/bash
set -e
env2file conversion -f /root/envoy_config.yaml
cluster_name=${TENANT_ID}_${PLUGIN_ID}_${SERVICE_NAME}
exec envoy -c /root/envoy_config.yaml --service-cluster ${cluster_name} --service-node ${cluster_name}

View File

@ -27,17 +27,22 @@ import (
core "github.com/envoyproxy/go-control-plane/envoy/api/v2/core"
listener "github.com/envoyproxy/go-control-plane/envoy/api/v2/listener"
route "github.com/envoyproxy/go-control-plane/envoy/api/v2/route"
http_rate_limit "github.com/envoyproxy/go-control-plane/envoy/config/filter/http/rate_limit/v2"
http_connection_manager "github.com/envoyproxy/go-control-plane/envoy/config/filter/network/http_connection_manager/v2"
tcp_proxy "github.com/envoyproxy/go-control-plane/envoy/config/filter/network/tcp_proxy/v2"
configratelimit "github.com/envoyproxy/go-control-plane/envoy/config/ratelimit/v2"
_type "github.com/envoyproxy/go-control-plane/envoy/type"
"github.com/envoyproxy/go-control-plane/pkg/util"
v1 "github.com/goodrain/rainbond/node/core/envoy/v1"
)
var defaultListenerAddress = "127.0.0.1"
//DefaultLocalhostListenerAddress -
var DefaultLocalhostListenerAddress = "127.0.0.1"
//CreateTCPListener listener builder
func CreateTCPListener(name, clusterName, address, statPrefix string, port uint32) *apiv2.Listener {
if address == "" {
address = defaultListenerAddress
address = DefaultLocalhostListenerAddress
}
tcpProxy := &tcp_proxy.TcpProxy{
StatPrefix: statPrefix,
@ -53,7 +58,7 @@ func CreateTCPListener(name, clusterName, address, statPrefix string, port uint3
listener.FilterChain{
Filters: []listener.Filter{
listener.Filter{
Name: "envoy.tcp_proxy",
Name: util.TCPProxy,
ConfigType: &listener.Filter_Config{
Config: MessageToStruct(tcpProxy),
},
@ -69,8 +74,54 @@ func CreateTCPListener(name, clusterName, address, statPrefix string, port uint3
return listener
}
//CreateHTTPListener create http manager listener
func CreateHTTPListener(name, address, statPrefix string, port uint32, routes ...route.VirtualHost) *apiv2.Listener {
//RateLimitOptions rate limit options
type RateLimitOptions struct {
Enable bool
Domain string
RateServerClusterName string
Stage uint32
}
//DefaultRateLimitServerClusterName default rate limit server cluster name
var DefaultRateLimitServerClusterName = "rate_limit_service_cluster"
//CreateHTTPRateLimit create http rate limit
func CreateHTTPRateLimit(option RateLimitOptions) *http_rate_limit.RateLimit {
httpRateLimit := &http_rate_limit.RateLimit{
Domain: option.Domain,
Stage: option.Stage,
RateLimitService: &configratelimit.RateLimitServiceConfig{
GrpcService: &core.GrpcService{
TargetSpecifier: &core.GrpcService_EnvoyGrpc_{
EnvoyGrpc: &core.GrpcService_EnvoyGrpc{
ClusterName: option.RateServerClusterName,
},
},
},
},
}
if err := httpRateLimit.Validate(); err != nil {
logrus.Errorf("create http rate limit failure %s", err.Error())
return nil
}
logrus.Debugf("service http rate limit for domain %s", httpRateLimit.Domain)
return httpRateLimit
}
//CreateHTTPConnectionManager create http connection manager
func CreateHTTPConnectionManager(name, statPrefix string, rateOpt *RateLimitOptions, routes ...route.VirtualHost) *http_connection_manager.HttpConnectionManager {
var httpFilters []*http_connection_manager.HttpFilter
if rateOpt != nil && rateOpt.Enable {
httpFilters = append(httpFilters, &http_connection_manager.HttpFilter{
Name: util.HTTPRateLimit,
ConfigType: &http_connection_manager.HttpFilter_Config{
Config: MessageToStruct(CreateHTTPRateLimit(*rateOpt)),
},
})
}
httpFilters = append(httpFilters, &http_connection_manager.HttpFilter{
Name: util.Router,
})
hcm := &http_connection_manager.HttpConnectionManager{
StatPrefix: statPrefix,
RouteSpecifier: &http_connection_manager.HttpConnectionManager_RouteConfig{
@ -79,23 +130,29 @@ func CreateHTTPListener(name, address, statPrefix string, port uint32, routes ..
VirtualHosts: routes,
},
},
HttpFilters: []*http_connection_manager.HttpFilter{
&http_connection_manager.HttpFilter{
Name: "envoy.router",
},
},
HttpFilters: httpFilters,
}
if err := hcm.Validate(); err != nil {
logrus.Errorf("validate http connertion manager config failure %s", err.Error())
return nil
}
return hcm
}
//CreateHTTPListener create http manager listener
func CreateHTTPListener(name, address, statPrefix string, port uint32, rateOpt *RateLimitOptions, routes ...route.VirtualHost) *apiv2.Listener {
hcm := CreateHTTPConnectionManager(name, statPrefix, rateOpt, routes...)
if hcm == nil {
logrus.Warningf("create http connection manager failure %s", name)
return nil
}
listener := &apiv2.Listener{
Name: name,
Address: core.Address{
Address: &core.Address_SocketAddress{
SocketAddress: &core.SocketAddress{
Protocol: core.TCP,
Address: defaultListenerAddress,
Address: address,
PortSpecifier: &core.SocketAddress_PortValue{
PortValue: port,
},
@ -107,7 +164,7 @@ func CreateHTTPListener(name, address, statPrefix string, port uint32, routes ..
listener.FilterChain{
Filters: []listener.Filter{
listener.Filter{
Name: "envoy.http_connection_manager",
Name: util.HTTPConnectionManager,
ConfigType: &listener.Filter_Config{
Config: MessageToStruct(hcm),
},
@ -178,14 +235,15 @@ func CreatOutlierDetection(options RainbondPluginOptions) *cluster.OutlierDetect
}
//CreateRouteVirtualHost create route virtual host
func CreateRouteVirtualHost(name string, domains []string, routes ...route.Route) *route.VirtualHost {
func CreateRouteVirtualHost(name string, domains []string, rateLimits []*route.RateLimit, routes ...route.Route) *route.VirtualHost {
pvh := &route.VirtualHost{
Name: name,
Domains: domains,
Routes: routes,
Name: name,
Domains: domains,
Routes: routes,
RateLimits: rateLimits,
}
if err := pvh.Validate(); err != nil {
logrus.Errorf("route virtualhost config validate failure %s", err.Error())
logrus.Errorf("route virtualhost config validate failure %s domains %s", err.Error(), domains)
return nil
}
return pvh
@ -268,28 +326,43 @@ func CreateEDSClusterConfig(serviceName string) *apiv2.Cluster_EdsClusterConfig
return edsClusterConfig
}
//ClusterOptions cluster options
type ClusterOptions struct {
Name string
ServiceName string
ClusterType apiv2.Cluster_DiscoveryType
MaxRequestsPerConnection *uint32
OutlierDetection *cluster.OutlierDetection
CircuitBreakers *cluster.CircuitBreakers
Hosts []*core.Address
HealthyPanicThreshold int64
}
//CreateCluster create cluster config
func CreateCluster(name, serviceName string, clusterType apiv2.Cluster_DiscoveryType,
outlierDetection *cluster.OutlierDetection,
circuitBreakers *cluster.CircuitBreakers,
hosts []*core.Address) *apiv2.Cluster {
func CreateCluster(options ClusterOptions) *apiv2.Cluster {
var edsClusterConfig *apiv2.Cluster_EdsClusterConfig
if clusterType == apiv2.Cluster_EDS {
edsClusterConfig = CreateEDSClusterConfig(serviceName)
if options.ClusterType == apiv2.Cluster_EDS {
edsClusterConfig = CreateEDSClusterConfig(options.ServiceName)
if edsClusterConfig == nil {
logrus.Errorf("create eds cluster config failure")
return nil
}
}
cluster := &apiv2.Cluster{
Name: name,
Type: clusterType,
Name: options.Name,
Type: options.ClusterType,
ConnectTimeout: time.Second * 250,
LbPolicy: apiv2.Cluster_ROUND_ROBIN,
EdsClusterConfig: edsClusterConfig,
Hosts: hosts,
OutlierDetection: outlierDetection,
CircuitBreakers: circuitBreakers,
Hosts: options.Hosts,
OutlierDetection: options.OutlierDetection,
CircuitBreakers: options.CircuitBreakers,
CommonLbConfig: &apiv2.Cluster_CommonLbConfig{
HealthyPanicThreshold: &_type.Percent{Value: float64(options.HealthyPanicThreshold) / 100},
},
}
if options.MaxRequestsPerConnection != nil {
cluster.MaxRequestsPerConnection = ConversionUInt32(*options.MaxRequestsPerConnection)
}
if err := cluster.Validate(); err != nil {
logrus.Errorf("validate cluster config failure %s", err.Error())

View File

@ -95,23 +95,38 @@ const (
KeyBaseEjectionTimeMS string = "BaseEjectionTimeMS"
//KeyMaxEjectionPercent MaxEjectionPercent key
KeyMaxEjectionPercent string = "MaxEjectionPercent"
// KeyMaxRequestsPerConnection Optional maximum requests for a single upstream connection. This parameter
// is respected by both the HTTP/1.1 and HTTP/2 connection pool
// implementations. If not specified, there is no limit. Setting this
// parameter to 1 will effectively disable keep alive.
KeyMaxRequestsPerConnection string = "MaxRequestsPerConnection"
// KeyHealthyPanicThreshold default 50,More than 50% of hosts are ejected and go into panic mode
// Panic mode will send traffic back to the failed host
KeyHealthyPanicThreshold string = "HealthyPanicThreshold"
)
//RainbondPluginOptions rainbond plugin config struct
type RainbondPluginOptions struct {
Prefix string
MaxConnections int
MaxRequests int
MaxPendingRequests int
MaxActiveRetries int
Headers v1.Headers
Domains []string
Weight uint32
//second
Interval int64
ConsecutiveErrors int
BaseEjectionTimeMS int64
MaxEjectionPercent int
Prefix string
MaxConnections int
MaxRequests int
MaxPendingRequests int
MaxActiveRetries int
Headers v1.Headers
Domains []string
Weight uint32
Interval int64
ConsecutiveErrors int
BaseEjectionTimeMS int64
MaxEjectionPercent int
MaxRequestsPerConnection *uint32
HealthyPanicThreshold int64
}
//RainbondInboundPluginOptions rainbond inbound plugin options
type RainbondInboundPluginOptions struct {
OpenLimit bool
LimitDomain string
}
//RouteBasicHash get basic hash for weight
@ -130,17 +145,18 @@ func (r RainbondPluginOptions) RouteBasicHash() string {
//if not exist,return default value
func GetOptionValues(sr map[string]interface{}) RainbondPluginOptions {
rpo := RainbondPluginOptions{
Prefix: "/",
MaxConnections: 1024,
MaxRequests: 1024,
MaxPendingRequests: 1024,
MaxActiveRetries: 3,
Domains: []string{"*"},
Weight: 100,
Interval: 10,
ConsecutiveErrors: 5,
BaseEjectionTimeMS: 30000,
MaxEjectionPercent: 10,
Prefix: "/",
MaxConnections: 1024,
MaxRequests: 1024,
MaxPendingRequests: 1024,
MaxActiveRetries: 3,
Domains: []string{"*"},
Weight: 100,
Interval: 10,
ConsecutiveErrors: 5,
BaseEjectionTimeMS: 30000,
MaxEjectionPercent: 10,
HealthyPanicThreshold: 50,
}
if sr == nil {
return rpo
@ -204,13 +220,45 @@ func GetOptionValues(sr map[string]interface{}) RainbondPluginOptions {
}
case KeyMaxEjectionPercent:
if i, err := strconv.Atoi(v.(string)); err == nil && i != 0 {
rpo.MaxEjectionPercent = i
if i > 100 {
rpo.MaxEjectionPercent = 100
} else {
rpo.MaxEjectionPercent = i
}
}
case KeyMaxRequestsPerConnection:
if i, err := strconv.Atoi(v.(string)); err == nil && i != 0 {
value := uint32(i)
rpo.MaxRequestsPerConnection = &value
}
case KeyHealthyPanicThreshold:
if i, err := strconv.Atoi(v.(string)); err == nil && i != 0 {
if i > 100 {
rpo.HealthyPanicThreshold = 100
} else {
rpo.HealthyPanicThreshold = int64(i)
}
}
}
}
return rpo
}
//GetRainbondInboundPluginOptions get rainbond inbound plugin options
func GetRainbondInboundPluginOptions(sr map[string]interface{}) (r RainbondInboundPluginOptions) {
for k, v := range sr {
switch k {
case "OPEN_LIMIT":
if strings.ToLower(v.(string)) == "yes" || strings.ToLower(v.(string)) == "true" {
r.OpenLimit = true
}
case "LIMIT_DOMAIN":
r.LimitDomain = v.(string)
}
}
return
}
//ParseLocalityLbEndpointsResource parse envoy xds server response ParseLocalityLbEndpointsResource
func ParseLocalityLbEndpointsResource(resources []types.Any) []v2.ClusterLoadAssignment {
var endpoints []v2.ClusterLoadAssignment

View File

@ -106,11 +106,6 @@ func (d *DiscoverAction) DiscoverService(serviceInfo string) (*envoyv1.SDSHost,
if len(addressList) == 0 {
addressList = item.Subsets[0].NotReadyAddresses
}
// rainbond create service only one port,so do not verify the port
// port := item.Subsets[0].Ports[0].Port
// if dPort != fmt.Sprintf("%d", port) {
// continue
// }
toport := int(services[key].Spec.Ports[0].Port)
if serviceAlias == destServiceAlias {
if originPort, ok := services[key].Labels["origin_port"]; ok {

View File

@ -185,8 +185,7 @@ func (m *ManagerService) UpOneServiceEndpoint(s *service.Service) {
}
hostIP := m.cluster.GetOptions().HostIP
for _, end := range s.Endpoints {
if strings.Replace(end.Port, " ", "", -1) == "" {
logrus.Warningf("ignore wrong endpoint: %v", end)
if end.Name == "" || strings.Replace(end.Port, " ", "", -1) == "" {
continue
}
key := end.Name + "/" + hostIP

View File

@ -20,6 +20,7 @@ package conver
import (
"fmt"
"strconv"
"github.com/Sirupsen/logrus"
v2 "github.com/envoyproxy/go-control-plane/envoy/api/v2"
@ -56,7 +57,7 @@ func OneNodeCluster(serviceAlias, namespace string, configs *corev1.ConfigMap, s
}
}
if len(clusters) == 0 {
logrus.Warn("create clusters zero length")
logrus.Warnf("create clusters zero length for service %s", serviceAlias)
}
return clusters, nil
}
@ -77,18 +78,25 @@ func upstreamClusters(serviceAlias, namespace string, dependsServices []*api_mod
continue
}
getOptions := func() (d envoyv2.RainbondPluginOptions) {
depServiceIndex := fmt.Sprintf("%s_%s_%s_%d", namespace, serviceAlias, GetServiceAliasByService(service), port.TargetPort.IntVal)
relPort, _ := strconv.Atoi(service.Labels["origin_port"])
if relPort == 0 {
relPort = int(port.TargetPort.IntVal)
}
depServiceIndex := fmt.Sprintf("%s_%s_%s_%d", namespace, serviceAlias, GetServiceAliasByService(service), relPort)
if _, ok := clusterConfig[depServiceIndex]; ok {
return envoyv2.GetOptionValues(clusterConfig[depServiceIndex].Options)
}
return envoyv2.GetOptionValues(nil)
}
clusterName := fmt.Sprintf("%s_%s_%s_%v", namespace, serviceAlias, GetServiceAliasByService(service), port.Port)
var clusterOption envoyv2.ClusterOptions
clusterOption.Name = fmt.Sprintf("%s_%s_%s_%v", namespace, serviceAlias, GetServiceAliasByService(service), port.Port)
options := getOptions()
outlierDetaction := envoyv2.CreatOutlierDetection(options)
circuitBreaker := envoyv2.CreateCircuitBreaker(options)
serviceName := fmt.Sprintf("%s_%s_%s_%v", namespace, serviceAlias, destServiceAlias, port.Port)
cluster := envoyv2.CreateCluster(clusterName, serviceName, v2.Cluster_EDS, outlierDetaction, circuitBreaker, nil)
clusterOption.OutlierDetection = envoyv2.CreatOutlierDetection(options)
clusterOption.CircuitBreakers = envoyv2.CreateCircuitBreaker(options)
clusterOption.ServiceName = fmt.Sprintf("%s_%s_%s_%v", namespace, serviceAlias, destServiceAlias, port.Port)
clusterOption.ClusterType = v2.Cluster_EDS
clusterOption.HealthyPanicThreshold = options.HealthyPanicThreshold
cluster := envoyv2.CreateCluster(clusterOption)
if cluster != nil {
cdsClusters = append(cdsClusters, cluster)
}
@ -103,8 +111,17 @@ func downstreamClusters(serviceAlias, namespace string, ports []*api_model.BaseP
port := ports[i]
address := envoyv2.CreateSocketAddress(port.Protocol, "127.0.0.1", uint32(port.Port))
clusterName := fmt.Sprintf("%s_%s_%v", namespace, serviceAlias, port.Port)
cluster := envoyv2.CreateCluster(clusterName, "", v2.Cluster_STATIC, nil,
envoyv2.CreateCircuitBreaker(envoyv2.GetOptionValues(port.Options)), []*core.Address{&address})
option := envoyv2.GetOptionValues(port.Options)
cluster := envoyv2.CreateCluster(envoyv2.ClusterOptions{
Name: clusterName,
ServiceName: "",
ClusterType: v2.Cluster_STATIC,
CircuitBreakers: envoyv2.CreateCircuitBreaker(option),
OutlierDetection: envoyv2.CreatOutlierDetection(option),
MaxRequestsPerConnection: option.MaxRequestsPerConnection,
Hosts: []*core.Address{&address},
HealthyPanicThreshold: option.HealthyPanicThreshold,
})
if cluster != nil {
cdsClusters = append(cdsClusters, cluster)
}

View File

@ -20,6 +20,7 @@ package conver
import (
"fmt"
"strconv"
"github.com/Sirupsen/logrus"
"github.com/pquerna/ffjson/ffjson"
@ -57,6 +58,7 @@ func OneNodeListerner(serviceAlias, namespace string, configs *corev1.ConfigMap,
if err := l.Validate(); err != nil {
logrus.Errorf("listener validate failure %s", err.Error())
} else {
logrus.Debugf("create listener %s for service %s", l.Name, serviceAlias)
listener = append(listener, l)
}
}
@ -66,12 +68,13 @@ func OneNodeListerner(serviceAlias, namespace string, configs *corev1.ConfigMap,
if err := l.Validate(); err != nil {
logrus.Errorf("listener validate failure %s", err.Error())
} else {
logrus.Debugf("create listener %s for service %s", l.Name, serviceAlias)
listener = append(listener, l)
}
}
}
if len(listener) == 0 {
logrus.Warn("create listener zero length")
logrus.Warnf("create listener zero length for service %s", serviceAlias)
}
return listener, nil
}
@ -93,15 +96,29 @@ func upstreamListener(serviceAlias, namespace string, dependsServices []*api_mod
continue
}
port := service.Spec.Ports[0].Port
var ListenPort = port
//listener real port
if value, ok := service.Labels["origin_port"]; ok {
origin, _ := strconv.Atoi(value)
if origin != 0 {
ListenPort = int32(origin)
}
}
clusterName := fmt.Sprintf("%s_%s_%s_%d", namespace, serviceAlias, GetServiceAliasByService(service), port)
destService := ListennerConfig[clusterName]
listennerName := fmt.Sprintf("%s_%s_%s_%d", namespace, serviceAlias, GetServiceAliasByService(service), ListenPort)
destService := ListennerConfig[listennerName]
statPrefix := fmt.Sprintf("%s_%s", serviceAlias, GetServiceAliasByService(service))
// Unique by listen port
if _, ok := portMap[port]; !ok {
if _, ok := portMap[ListenPort]; !ok {
listenerName := fmt.Sprintf("%s_%s_%d", namespace, serviceAlias, port)
plds := envoyv2.CreateTCPListener(listenerName, clusterName, "127.0.0.1", statPrefix, uint32(port))
ldsL = append(ldsL, plds)
portMap[port] = len(ldsL) - 1
listenner := envoyv2.CreateTCPListener(listenerName, clusterName, envoyv2.DefaultLocalhostListenerAddress, statPrefix, uint32(ListenPort))
if listenner != nil {
ldsL = append(ldsL, listenner)
} else {
logrus.Warningf("create tcp listenner %s failure", listenerName)
continue
}
portMap[ListenPort] = len(ldsL) - 1
}
portProtocol, _ := service.Labels["port_protocol"]
if destService != nil && destService.Protocol != "" {
@ -114,6 +131,8 @@ func upstreamListener(serviceAlias, namespace string, dependsServices []*api_mod
var options envoyv2.RainbondPluginOptions
if destService != nil {
options = envoyv2.GetOptionValues(destService.Options)
} else {
logrus.Warningf("destService is nil for service %s", serviceAlias)
}
hashKey := options.RouteBasicHash()
if oldroute, ok := uniqRoute[hashKey]; ok {
@ -135,7 +154,7 @@ func upstreamListener(serviceAlias, namespace string, dependsServices []*api_mod
route := envoyv2.CreateRoute(clusterName, options.Prefix, headerMatchers, options.Weight)
if route != nil {
pvh := envoyv2.CreateRouteVirtualHost(fmt.Sprintf("%s_%s_%s_%d", namespace, serviceAlias,
GetServiceAliasByService(service), port), options.Domains, *route)
GetServiceAliasByService(service), port), options.Domains, nil, *route)
if pvh != nil {
newVHL = append(newVHL, *pvh)
uniqRoute[hashKey] = route
@ -154,8 +173,12 @@ func upstreamListener(serviceAlias, namespace string, dependsServices []*api_mod
ldsL = append(ldsL[:i], ldsL[i+1:]...)
}
statsPrefix := fmt.Sprintf("%s_80", serviceAlias)
plds := envoyv2.CreateHTTPListener(fmt.Sprintf("%s_%s_http_80", namespace, serviceAlias), "127.0.0.1", statsPrefix, 80, newVHL...)
ldsL = append(ldsL, plds)
plds := envoyv2.CreateHTTPListener(fmt.Sprintf("%s_%s_http_80", namespace, serviceAlias), envoyv2.DefaultLocalhostListenerAddress, statsPrefix, 80, nil, newVHL...)
if plds != nil {
ldsL = append(ldsL, plds)
} else {
logrus.Warnf("create listenner %s failure", fmt.Sprintf("%s_%s_http_80", namespace, serviceAlias))
}
}
return
}
@ -170,8 +193,50 @@ func downstreamListener(serviceAlias, namespace string, ports []*api_model.BaseP
listenerName := clusterName
statsPrefix := fmt.Sprintf("%s_%d", serviceAlias, port)
if _, ok := portMap[port]; !ok {
listener := envoyv2.CreateTCPListener(listenerName, clusterName, "0.0.0.0", statsPrefix, uint32(p.ListenPort))
ls = append(ls, listener)
inboundConfig := envoyv2.GetRainbondInboundPluginOptions(p.Options)
if p.Protocol == "http" || p.Protocol == "https" {
var limit []*route.RateLimit
if inboundConfig.OpenLimit {
limit = []*route.RateLimit{
&route.RateLimit{
Actions: []*route.RateLimit_Action{
&route.RateLimit_Action{
ActionSpecifier: &route.RateLimit_Action_RemoteAddress_{
RemoteAddress: &route.RateLimit_Action_RemoteAddress{},
},
},
},
},
}
}
route := envoyv2.CreateRoute(clusterName, "/", nil, 100)
if route == nil {
logrus.Warning("create route cirtual route failure")
continue
}
virtuals := envoyv2.CreateRouteVirtualHost(listenerName, []string{"*"}, limit, *route)
if virtuals == nil {
logrus.Warning("create route cirtual failure")
continue
}
listener := envoyv2.CreateHTTPListener(listenerName, "0.0.0.0", statsPrefix, uint32(p.ListenPort), &envoyv2.RateLimitOptions{
Enable: inboundConfig.OpenLimit,
Domain: inboundConfig.LimitDomain,
RateServerClusterName: envoyv2.DefaultRateLimitServerClusterName,
Stage: 0,
}, *virtuals)
if listener != nil {
ls = append(ls, listener)
}
} else {
listener := envoyv2.CreateTCPListener(listenerName, clusterName, "0.0.0.0", statsPrefix, uint32(p.ListenPort))
if listener != nil {
ls = append(ls, listener)
} else {
logrus.Warningf("create tcp listener %s failure", listenerName)
continue
}
}
portMap[port] = 1
}
}

View File

@ -350,7 +350,8 @@ func (d *DiscoverServerManager) OnAdd(obj interface{}) {
d.eventChan <- event
}
func checkIsHandleResource(configMap *corev1.ConfigMap) bool {
if value, ok := configMap.Data["plugin-model"]; ok && (value == "net-plugin:down" || value == "net-plugin:up") {
if value, ok := configMap.Data["plugin-model"]; ok &&
(value == "net-plugin:up" || value == "net-plugin:down" || value == "net-plugin:in-and-out") {
return true
}
return false

View File

@ -120,12 +120,13 @@ func (n *NodeManager) Start(errchan chan error) error {
if err := n.controller.Online(); err != nil {
return err
}
if n.currentNode.Role.HasRule(client.ComputeNode) {
if n.currentNode.Role.HasRule(client.ComputeNode) && n.cfg.EnableCollectLog {
logrus.Infof("this node is %s node and enable collect conatiner log", n.currentNode.Role)
if err := n.clm.Start(); err != nil {
return err
}
} else {
logrus.Debug("this node is not compute node ,do not start container log manage")
logrus.Infof("this node(%s) is not compute node or disable collect container log ,do not start container log manage", n.currentNode.Role)
}
go n.monitor.Start(errchan)
go n.heartbeat()
@ -145,7 +146,7 @@ func (n *NodeManager) Stop() {
if n.healthy != nil {
n.healthy.Stop()
}
if n.clm != nil {
if n.clm != nil && n.currentNode.Role.HasRule(client.ComputeNode) && n.cfg.EnableCollectLog {
n.clm.Stop()
}
}

View File

@ -28,7 +28,7 @@ import (
"github.com/goodrain/rainbond/db/model"
"github.com/goodrain/rainbond/event"
"github.com/goodrain/rainbond/gateway/annotations/parser"
"github.com/goodrain/rainbond/worker/appm/types/v1"
v1 "github.com/goodrain/rainbond/worker/appm/types/v1"
corev1 "k8s.io/api/core/v1"
extensions "k8s.io/api/extensions/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -125,16 +125,14 @@ func (a *AppServiceBuild) Build() (*v1.K8sResources, error) {
if err != nil {
return nil, fmt.Errorf("find service port from db error %s", err.Error())
}
crt, err := a.checkUpstreamPluginRelation()
crt, err := checkUpstreamPluginRelation(a.serviceID, a.dbmanager)
if err != nil {
return nil, fmt.Errorf("get service upstream plugin relation error, %s", err.Error())
}
pp := make(map[int32]int)
if crt {
pluginPorts, err := a.dbmanager.TenantServicesStreamPluginPortDao().GetPluginMappingPorts(
a.serviceID,
model.UpNetPlugin,
)
a.serviceID)
if err != nil {
return nil, fmt.Errorf("find upstream plugin mapping port error, %s", err.Error())
}
@ -155,9 +153,13 @@ func (a *AppServiceBuild) Build() (*v1.K8sResources, error) {
if port.IsOuterService {
service := a.createOuterService(port)
services = append(services, service)
ings, secrs, err := a.ApplyRules(port, service)
relContainerPort := pp[int32(port.ContainerPort)]
if relContainerPort == 0 {
relContainerPort = port.ContainerPort
}
ings, secrs, err := a.ApplyRules(port.ServiceID, relContainerPort, port.ContainerPort, service)
if err != nil {
logrus.Debugf("error applying rules: %s", err.Error())
logrus.Errorf("error applying rules: %s", err.Error())
return nil, err
}
ingresses = append(ingresses, ings...)
@ -184,34 +186,34 @@ func (a *AppServiceBuild) Build() (*v1.K8sResources, error) {
}
// ApplyRules applies http rules and tcp rules
func (a AppServiceBuild) ApplyRules(port *model.TenantServicesPort,
func (a AppServiceBuild) ApplyRules(serviceID string, containerPort, pluginContainerPort int,
service *corev1.Service) ([]*extensions.Ingress, []*corev1.Secret, error) {
var ingresses []*extensions.Ingress
var secrets []*corev1.Secret
httpRules, err := a.dbmanager.HTTPRuleDao().GetHTTPRuleByServiceIDAndContainerPort(port.ServiceID,
port.ContainerPort)
httpRules, err := a.dbmanager.HTTPRuleDao().GetHTTPRuleByServiceIDAndContainerPort(serviceID, containerPort)
if err != nil {
logrus.Infof("Can't get HTTPRule corresponding to ServiceID(%s): %v", port.ServiceID, err)
logrus.Infof("Can't get HTTPRule corresponding to ServiceID(%s): %v", serviceID, err)
}
// create http ingresses
logrus.Debugf("find %d count http rule", len(httpRules))
if httpRules != nil && len(httpRules) > 0 {
for _, httpRule := range httpRules {
ing, sec, err := a.applyHTTPRule(httpRule, port, service)
ing, sec, err := a.applyHTTPRule(httpRule, containerPort, pluginContainerPort, service)
if err != nil {
logrus.Errorf("Unexpected error occurred while applying http rule: %v", err)
// skip the failed rule
continue
}
logrus.Debugf("create ingress %s", ing.Name)
ingresses = append(ingresses, ing)
secrets = append(secrets, sec)
}
}
// create tcp ingresses
tcpRules, err := a.dbmanager.TCPRuleDao().GetTCPRuleByServiceIDAndContainerPort(port.ServiceID,
port.ContainerPort)
tcpRules, err := a.dbmanager.TCPRuleDao().GetTCPRuleByServiceIDAndContainerPort(serviceID, containerPort)
if err != nil {
logrus.Infof("Can't get TCPRule corresponding to ServiceID(%s): %v", port.ServiceID, err)
logrus.Infof("Can't get TCPRule corresponding to ServiceID(%s): %v", serviceID, err)
}
if tcpRules != nil && len(tcpRules) > 0 {
for _, tcpRule := range tcpRules {
@ -229,7 +231,7 @@ func (a AppServiceBuild) ApplyRules(port *model.TenantServicesPort,
}
// applyTCPRule applies stream rule into ingress
func (a *AppServiceBuild) applyHTTPRule(rule *model.HTTPRule, port *model.TenantServicesPort,
func (a *AppServiceBuild) applyHTTPRule(rule *model.HTTPRule, containerPort, pluginContainerPort int,
service *corev1.Service) (ing *extensions.Ingress, sec *corev1.Secret, err error) {
// deal with empty path and domain
path := strings.Replace(rule.Path, " ", "", -1)
@ -238,7 +240,7 @@ func (a *AppServiceBuild) applyHTTPRule(rule *model.HTTPRule, port *model.Tenant
}
domain := strings.Replace(rule.Domain, " ", "", -1)
if domain == "" {
domain = createDefaultDomain(a.tenant.Name, a.service.ServiceAlias, port.ContainerPort)
domain = createDefaultDomain(a.tenant.Name, a.service.ServiceAlias, containerPort)
}
// create ingress
@ -259,7 +261,7 @@ func (a *AppServiceBuild) applyHTTPRule(rule *model.HTTPRule, port *model.Tenant
Path: path,
Backend: extensions.IngressBackend{
ServiceName: service.Name,
ServicePort: intstr.FromInt(port.ContainerPort),
ServicePort: intstr.FromInt(pluginContainerPort),
},
},
},
@ -382,12 +384,6 @@ func (a *AppServiceBuild) applyTCPRule(rule *model.TCPRule, service *corev1.Serv
return ing, nil
}
func (a *AppServiceBuild) checkUpstreamPluginRelation() (bool, error) {
return a.dbmanager.TenantServicePluginRelationDao().CheckSomeModelPluginByServiceID(
a.serviceID,
model.UpNetPlugin)
}
//CreateUpstreamPluginMappingPort 检查是否存在upstream插件接管入口网络
func (a *AppServiceBuild) CreateUpstreamPluginMappingPort(
ports []*model.TenantServicesPort,

View File

@ -70,13 +70,12 @@ func conversionServicePlugin(as *typesv1.AppService, dbmanager db.Manager) ([]v1
if as.GetPodTemplate() != nil && len(as.GetPodTemplate().Spec.Containers) > 0 {
mainContainer = as.GetPodTemplate().Spec.Containers[0]
}
var inBoundPlugin *model.TenantServicePluginRelation
for _, pluginR := range appPlugins {
//if plugin not enable,ignore it
if pluginR.Switch == false {
continue
}
//apply plugin dynamic config
ApplyPluginConfig(as, pluginR, dbmanager)
versionInfo, err := dbmanager.TenantPluginBuildVersionDao().GetLastBuildVersionByVersionID(pluginR.PluginID, pluginR.VersionID)
if err != nil {
return nil, nil, fmt.Errorf("do not found available plugin versions")
@ -107,7 +106,10 @@ func conversionServicePlugin(as *typesv1.AppService, dbmanager db.Manager) ([]v1
if err != nil {
return nil, nil, fmt.Errorf("get plugin model info failure %s", err.Error())
}
if pluginModel == model.DownNetPlugin {
if pluginModel == model.InBoundAndOutBoundNetPlugin || pluginModel == model.InBoundNetPlugin {
inBoundPlugin = pluginR
}
if pluginModel == model.OutBoundNetPlugin || pluginModel == model.InBoundAndOutBoundNetPlugin {
netPlugin = true
meshPluginID = pluginR.PluginID
}
@ -117,6 +119,23 @@ func conversionServicePlugin(as *typesv1.AppService, dbmanager db.Manager) ([]v1
containers = append(containers, pc)
}
}
var inboundPluginConfig *api_model.ResourceSpec
//apply plugin dynamic config
if inBoundPlugin != nil {
config, err := dbmanager.TenantPluginVersionConfigDao().GetPluginConfig(inBoundPlugin.ServiceID,
inBoundPlugin.PluginID)
if err != nil && err != gorm.ErrRecordNotFound {
logrus.Errorf("get service plugin config from db failure %s", err.Error())
}
var resourceConfig api_model.ResourceSpec
if err := json.Unmarshal([]byte(config.ConfigStr), &resourceConfig); err == nil {
inboundPluginConfig = &resourceConfig
}
}
//create plugin config to configmap
for i := range appPlugins {
ApplyPluginConfig(as, appPlugins[i], dbmanager, inboundPluginConfig)
}
var udpDep bool
//if need proxy but not install net plugin
if as.NeedProxy && !netPlugin {
@ -183,12 +202,31 @@ func createProbeMeshInitContainer(serviceID, pluginID, serviceAlias string, envs
}
//ApplyPluginConfig applyPluginConfig
func ApplyPluginConfig(as *typesv1.AppService, servicePluginRelation *model.TenantServicePluginRelation, dbmanager db.Manager) {
config, err := dbmanager.TenantPluginVersionConfigDao().GetPluginConfig(servicePluginRelation.ServiceID, servicePluginRelation.PluginID)
func ApplyPluginConfig(as *typesv1.AppService, servicePluginRelation *model.TenantServicePluginRelation,
dbmanager db.Manager, inboundPluginConfig *api_model.ResourceSpec) {
config, err := dbmanager.TenantPluginVersionConfigDao().GetPluginConfig(servicePluginRelation.ServiceID,
servicePluginRelation.PluginID)
if err != nil && err != gorm.ErrRecordNotFound {
logrus.Errorf("get service plugin config from db failure %s", err.Error())
}
if config != nil {
configStr := config.ConfigStr
//if have inbound plugin,will Propagate its listner port to other plug-ins
if inboundPluginConfig != nil {
var oldConfig api_model.ResourceSpec
if err := json.Unmarshal([]byte(configStr), &oldConfig); err == nil {
for i := range oldConfig.BasePorts {
for j := range inboundPluginConfig.BasePorts {
if oldConfig.BasePorts[i].Port == inboundPluginConfig.BasePorts[j].Port {
oldConfig.BasePorts[i].ListenPort = inboundPluginConfig.BasePorts[j].ListenPort
}
}
}
if newConfig, err := json.Marshal(&oldConfig); err == nil {
configStr = string(newConfig)
}
}
}
cm := &v1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("config-%s-%s", config.ServiceID, config.PluginID),
@ -198,7 +236,7 @@ func ApplyPluginConfig(as *typesv1.AppService, servicePluginRelation *model.Tena
}),
},
Data: map[string]string{
"plugin-config": config.ConfigStr,
"plugin-config": configStr,
"plugin-model": servicePluginRelation.PluginModel,
},
}
@ -252,7 +290,7 @@ func applyDefaultMeshPluginConfig(as *typesv1.AppService, dbmanager db.Manager)
},
Data: map[string]string{
"plugin-config": string(resJSON),
"plugin-model": model.DownNetPlugin,
"plugin-model": model.OutBoundNetPlugin,
},
}
as.SetConfigMap(cm)
@ -334,9 +372,9 @@ func createPluginEnvs(pluginID, tenantID, serviceAlias string, mainEnvs []v1.Env
func pluginWeight(pluginModel string) int {
switch pluginModel {
case model.UpNetPlugin:
case model.InBoundNetPlugin:
return 9
case model.DownNetPlugin:
case model.OutBoundNetPlugin:
return 8
case model.GeneralPlugin:
return 1

View File

@ -28,6 +28,7 @@ import (
"github.com/Sirupsen/logrus"
"github.com/goodrain/rainbond/builder"
"github.com/goodrain/rainbond/db"
"github.com/goodrain/rainbond/db/model"
dbmodel "github.com/goodrain/rainbond/db/model"
"github.com/goodrain/rainbond/node/nodem/client"
"github.com/goodrain/rainbond/util"
@ -730,9 +731,18 @@ func createResources(as *v1.AppService) corev1.ResourceRequirements {
}
func checkUpstreamPluginRelation(serviceID string, dbmanager db.Manager) (bool, error) {
inBoundOK, err := dbmanager.TenantServicePluginRelationDao().CheckSomeModelPluginByServiceID(
serviceID,
model.InBoundNetPlugin)
if err != nil {
return false, err
}
if inBoundOK {
return inBoundOK, nil
}
return dbmanager.TenantServicePluginRelationDao().CheckSomeModelPluginByServiceID(
serviceID,
dbmodel.UpNetPlugin)
model.InBoundAndOutBoundNetPlugin)
}
func createUpstreamPluginMappingPort(
ports []*dbmodel.TenantServicesPort,
@ -762,9 +772,7 @@ func createPorts(as *v1.AppService, dbmanager db.Manager) (ports []corev1.Contai
}
if crt {
pluginPorts, err := dbmanager.TenantServicesStreamPluginPortDao().GetPluginMappingPorts(
as.ServiceID,
dbmodel.UpNetPlugin,
)
as.ServiceID)
if err != nil {
logrus.Warningf("find upstream plugin mapping port error, %s", err.Error())
return

View File

@ -32,7 +32,7 @@ import (
"github.com/goodrain/rainbond/db/model"
"github.com/goodrain/rainbond/worker/appm/conversion"
"github.com/goodrain/rainbond/worker/appm/f"
"github.com/goodrain/rainbond/worker/appm/types/v1"
v1 "github.com/goodrain/rainbond/worker/appm/types/v1"
"github.com/jinzhu/gorm"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
@ -527,9 +527,6 @@ func (a *appRuntimeStore) OnAdd(obj interface{}) {
}
}
}
if node, ok := obj.(*corev1.Node); ok {
logrus.Debugf("Node: %s", node.Name)
}
}
//getAppService if creater is true, will create new app service where not found in store