[REV]Update rainbond lds cds sds server

This commit is contained in:
pujielan 2017-12-14 16:33:45 +08:00
parent 1aa710b977
commit 7861db5f98
2 changed files with 137 additions and 131 deletions

View File

@ -86,6 +86,38 @@ type SoureBody struct {
//json format
}
//ResourceSpec 资源结构体
type ResourceSpec struct {
BasePorts []*BasePort `json:"base_ports"`
BaseServices []*BaseService `json:"base_services"`
BaseNormal BaseEnv `json:"base_normal"`
}
//BasePort 基于当前应用端口结构
type BasePort struct {
ServiceAlias string `json:"service_alias"`
ServiceID string `json:"service_id"`
Port int `json:"port"`
Protocol string `json:"protocol"`
Options map[string]interface{} `json:"options"`
}
//BaseService 基于依赖应用及端口结构
type BaseService struct {
ServiceAlias string `json:"service_alias"`
ServiceID string `json:"service_id"`
DependServiceAlias string `json:"depend_service_alias"`
DependServiceID string `json:"depend_service_id"`
Port int `json:"port"`
Protocol string `json:"protocol"`
Options map[string]interface{} `json:"options"`
}
//BaseEnv 无平台定义类型普通kv
type BaseEnv struct {
Options map[string]interface{} `json:"options"`
}
//Item source值,键值对形式
type Item struct {
Key string `json:"key" validate:"key"`

View File

@ -56,10 +56,10 @@ func (d *DiscoverAction) DiscoverService(serviceInfo string) (*node_model.SDS, *
return nil, util.CreateAPIHandleError(400, fmt.Errorf("service_name is not in good format"))
}
namespace := mm[0]
serviceAlias := mm[1]
destServiceAlias := mm[2]
dPort := mm[3]
//deployVersion := mm[3]
//pluginID := mm[1]
serviceAlias := mm[2]
destServiceAlias := mm[3]
dPort := mm[4]
labelname := fmt.Sprintf("name=%sService", destServiceAlias)
endpoints, err := k8s.K8S.Core().Endpoints(namespace).List(metav1.ListOptions{LabelSelector: labelname})
@ -112,16 +112,23 @@ func (d *DiscoverAction) DiscoverService(serviceInfo string) (*node_model.SDS, *
func (d *DiscoverAction) DiscoverListeners(
tenantService, serviceCluster string) (*node_model.LDS, *util.APIHandleError) {
nn := strings.Split(tenantService, "_")
if len(nn) != 2 {
if len(nn) != 3 {
return nil, util.CreateAPIHandleError(400,
fmt.Errorf("namesapces and service_alias not in good format"))
}
namespace := nn[0]
serviceAlias := nn[1]
pluginID := nn[1]
serviceAlias := nn[2]
mm := strings.Split(serviceCluster, "_")
if len(mm) == 0 {
return nil, util.CreateAPIHandleError(400, fmt.Errorf("service_name is not in good format"))
}
resources, err := d.ToolsGetRainbondResources(namespace, serviceAlias, pluginID)
if err != nil && !strings.Contains(err.Error(), "is not exist") {
logrus.Warnf("in lds get env %s error: %v", namespace+serviceAlias+pluginID, err)
return nil, util.CreateAPIHandleError(500, fmt.Errorf(
"get env %s error: %v", namespace+serviceAlias+pluginID, err))
}
//TODO: console控制尽量不把小于1000的端口给用户使用
var vhL []*node_model.PieceHTTPVirtualHost
var ldsL []*node_model.PieceLDS
@ -151,13 +158,13 @@ func (d *DiscoverAction) DiscoverListeners(
switch portProtocol {
case "stream":
ptr := &node_model.PieceTCPRoute{
Cluster: fmt.Sprintf("%s_%s_%s_%d", namespace, serviceAlias, destServiceAlias, port),
Cluster: fmt.Sprintf("%s_%s_%s_%s_%d", namespace, pluginID, serviceAlias, destServiceAlias, port),
}
lrs := &node_model.LDSTCPRoutes{
Routes: []*node_model.PieceTCPRoute{ptr},
}
lcg := &node_model.LDSTCPConfig{
StatPrefix: fmt.Sprintf("%s_%s_%s_%d", namespace, serviceAlias, destServiceAlias, port),
StatPrefix: fmt.Sprintf("%s_%s_%s_%s_%d", namespace, pluginID, serviceAlias, destServiceAlias, port),
RouteConfig: lrs,
}
lfs := &node_model.LDSFilters{
@ -165,46 +172,31 @@ func (d *DiscoverAction) DiscoverListeners(
Config: lcg,
}
plds := &node_model.PieceLDS{
Name: fmt.Sprintf("%s_%s_%s_%d", namespace, serviceAlias, destServiceAlias, port),
Name: fmt.Sprintf("%s_%s_%s_%s_%d", namespace, pluginID, serviceAlias, destServiceAlias, port),
Address: fmt.Sprintf("tcp://0.0.0.0:%d", port),
Filters: []*node_model.LDSFilters{lfs},
}
//TODO:front model/upsteam
// if destServiceAlias == serviceAlias {
// envName := fmt.Sprintf("%s_%d", serviceAlias, port)
// var sr api_model.NetUpStreamRules
// mr, err := d.ToolsGetStreamRules(namespace, node_model.UPSTREAM, envName, &sr)
// if err != nil {
// return nil, util.CreateAPIHandleError(500, err)
// }
// if mr != nil {
// sr = *mr.(*api_model.NetUpStreamRules)
// }
// plds.Address = fmt.Sprintf("tcp://0.0.0.0:%d", sr.MapPort)
// }
ldsL = append(ldsL, plds)
continue
case "http":
if destServiceAlias == serviceAlias {
//主容器应用
var vhLThin []*node_model.PieceHTTPVirtualHost
envName := fmt.Sprintf("%s_%d", destServiceAlias, port)
var sr api_model.NetDownStreamRules
mr, err := d.ToolsGetStreamRules(namespace, node_model.DOWNSTREAM, envName, &sr)
if err != nil && !strings.Contains(err.Error(), "is not exist") {
logrus.Warnf("get env %s error, %v", envName, err)
continue
}
if mr != nil {
sr = *mr.(*api_model.NetDownStreamRules)
options := make(map[string]interface{})
if resources != nil {
for _, bp := range resources.BasePorts {
if bp.ServiceAlias == serviceAlias && int32(bp.Port) == port {
options = bp.Options
}
}
}
prs := &node_model.PieceHTTPRoutes{
TimeoutMS: 0,
Prefix: d.ToolsGetRouterItem(destServiceAlias, node_model.PREFIX, &sr).(string),
Cluster: fmt.Sprintf("%s_%s_%s_%d", namespace, serviceAlias, destServiceAlias, port),
Prefix: d.ToolsGetRouterItem(destServiceAlias, node_model.PREFIX, options).(string),
Cluster: fmt.Sprintf("%s_%s_%s_%s_%d", namespace, pluginID, serviceAlias, destServiceAlias, port),
}
pvh := &node_model.PieceHTTPVirtualHost{
Name: fmt.Sprintf("%s_%s_%s_%d", namespace, serviceAlias, destServiceAlias, port),
Name: fmt.Sprintf("%s_%s_%s_%s_%d", namespace, pluginID, serviceAlias, destServiceAlias, port),
//Domains: d.ToolsGetRouterItem(destServiceAlias, node_model.DOMAINS, &sr).([]string),
//TODO: 主容器应用domain默认为*
Domains: []string{"*"},
@ -237,27 +229,25 @@ func (d *DiscoverAction) DiscoverListeners(
//修改http-port console 完成
ldsL = append(ldsL, plds)
} else {
//非主容易应用
envName := fmt.Sprintf("%s_%d", destServiceAlias, port)
var sr api_model.NetDownStreamRules
mr, err := d.ToolsGetStreamRules(namespace, node_model.DOWNSTREAM, envName, &sr)
if err != nil && !strings.Contains(err.Error(), "is not exist") {
logrus.Warnf("get env %s error, %v", envName, err)
continue
}
if mr != nil {
sr = *mr.(*api_model.NetDownStreamRules)
//非主容器应用
options := make(map[string]interface{})
if resources != nil {
for _, bp := range resources.BaseServices {
if bp.DependServiceAlias == destServiceAlias && int32(bp.Port) == port {
options = bp.Options
}
}
}
prs := &node_model.PieceHTTPRoutes{
TimeoutMS: 0,
Prefix: d.ToolsGetRouterItem(destServiceAlias, node_model.PREFIX, &sr).(string),
Cluster: fmt.Sprintf("%s_%s_%s_%d", namespace, serviceAlias, destServiceAlias, port),
Prefix: d.ToolsGetRouterItem(destServiceAlias, node_model.PREFIX, options).(string),
Cluster: fmt.Sprintf("%s_%s_%s_%s_%d", namespace, pluginID, serviceAlias, destServiceAlias, port),
//Headers: d.ToolsGetRouterItem(destServiceAlias,
// node_model.HEADERS, &sr).([]*node_model.PieceHeader),
}
pvh := &node_model.PieceHTTPVirtualHost{
Name: fmt.Sprintf("%s_%s_%s_%d", namespace, serviceAlias, destServiceAlias, port),
Domains: d.ToolsGetRouterItem(destServiceAlias, node_model.DOMAINS, &sr).([]string),
Name: fmt.Sprintf("%s_%s_%s_%s_%d", namespace, pluginID, serviceAlias, destServiceAlias, port),
Domains: d.ToolsGetRouterItem(destServiceAlias, node_model.DOMAINS, options).([]string),
Routes: []*node_model.PieceHTTPRoutes{prs},
}
vhL = append(vhL, pvh)
@ -270,20 +260,7 @@ func (d *DiscoverAction) DiscoverListeners(
}
}
if len(vhL) != 0 {
envName := fmt.Sprintf("%s_http_port", serviceAlias)
var sr int
httpPort, err := d.ToolsGetStreamRules(namespace, node_model.DOWNSTREAM, envName, &sr)
if err != nil {
if strings.Contains(err.Error(), "is not exist") {
httpPort = 80
} else {
logrus.Errorf("get http port error, %v", err)
return nil, util.CreateAPIHandleError(500, err)
}
}
if httpPort == nil {
httpPort = 80
}
httpPort := 80
hsf := &node_model.HTTPSingleFileter{
Type: "decoder",
Name: "router",
@ -304,7 +281,7 @@ func (d *DiscoverAction) DiscoverListeners(
}
plds := &node_model.PieceLDS{
Name: fmt.Sprintf("%s_%s_http_80", namespace, serviceAlias),
Address: fmt.Sprintf("tcp://0.0.0.0:%d", httpPort.(int)),
Address: fmt.Sprintf("tcp://0.0.0.0:%d", httpPort),
Filters: []*node_model.LDSFilters{lfs},
}
//修改http-port console 完成
@ -321,11 +298,18 @@ func (d *DiscoverAction) DiscoverClusters(
tenantService,
serviceCluster string) (*node_model.CDS, *util.APIHandleError) {
nn := strings.Split(tenantService, "_")
if len(nn) != 2 {
if len(nn) != 3 {
return nil, util.CreateAPIHandleError(400, fmt.Errorf("namesapces and service_alias not in good format"))
}
namespace := nn[0]
serviceAlias := nn[1]
pluginID := nn[1]
serviceAlias := nn[2]
resources, err := d.ToolsGetRainbondResources(namespace, serviceAlias, pluginID)
if err != nil && !strings.Contains(err.Error(), "is not exist") {
logrus.Warnf("in lds get env %s error: %v", namespace+serviceAlias+pluginID, err)
return nil, util.CreateAPIHandleError(500, fmt.Errorf(
"get env %s error: %v", namespace+serviceAlias+pluginID, err))
}
mm := strings.Split(serviceCluster, "_")
if len(mm) == 0 {
return nil, util.CreateAPIHandleError(400, fmt.Errorf("service_name is not in good format"))
@ -340,28 +324,36 @@ func (d *DiscoverAction) DiscoverClusters(
selfCount := 0
for _, service := range services.Items {
inner, ok := service.Labels["service_type"]
port := service.Spec.Ports[0]
options := make(map[string]interface{})
if (!ok || inner != "inner") && serviceAlias != destServiceAlias {
continue
}
if (serviceAlias == destServiceAlias) && selfCount == 1 {
continue
}
if serviceAlias == destServiceAlias {
if resources != nil {
for _, bp := range resources.BasePorts {
if bp.ServiceAlias == serviceAlias && int32(bp.Port) == port.Port {
options = bp.Options
}
}
}
} else {
if resources != nil {
for _, bp := range resources.BaseServices {
if bp.DependServiceAlias == destServiceAlias && int32(bp.Port) == port.Port {
options = bp.Options
}
}
}
}
selfCount++
port := service.Spec.Ports[0]
envName := fmt.Sprintf("%s_%d", destServiceAlias, port.Port)
var sr api_model.NetDownStreamRules
mr, err := d.ToolsGetStreamRules(namespace, node_model.DOWNSTREAM, envName, &sr)
if err != nil && !strings.Contains(err.Error(), "is not exist") {
logrus.Warnf("trans k %v error, %v", envName, err)
continue
}
if mr != nil {
sr = *mr.(*api_model.NetDownStreamRules)
}
circuits := d.ToolsGetRouterItem(destServiceAlias, node_model.LIMITS, &sr).(int)
maxRequests := d.ToolsGetRouterItem(destServiceAlias, node_model.MaxRequests, &sr).(int)
maxRetries := d.ToolsGetRouterItem(destServiceAlias, node_model.MaxRetries, &sr).(int)
maxPendingRequests := d.ToolsGetRouterItem(destServiceAlias, node_model.MaxPendingRequests, &sr).(int)
circuits := d.ToolsGetRouterItem(destServiceAlias, node_model.LIMITS, options).(int)
maxRequests := d.ToolsGetRouterItem(destServiceAlias, node_model.MaxRequests, options).(int)
maxRetries := d.ToolsGetRouterItem(destServiceAlias, node_model.MaxRetries, options).(int)
maxPendingRequests := d.ToolsGetRouterItem(destServiceAlias, node_model.MaxPendingRequests, options).(int)
cb := &node_model.CircuitBreakers{
Default: &node_model.MaxConnections{
MaxConnections: circuits,
@ -371,11 +363,11 @@ func (d *DiscoverAction) DiscoverClusters(
},
}
pcds := &node_model.PieceCDS{
Name: fmt.Sprintf("%s_%s_%s_%v", namespace, serviceAlias, destServiceAlias, port.Port),
Name: fmt.Sprintf("%s_%s_%s_%s_%v", namespace, pluginID, serviceAlias, destServiceAlias, port.Port),
Type: "sds",
ConnectTimeoutMS: 250,
LBType: "round_robin",
ServiceName: fmt.Sprintf("%s_%s_%s_%v", namespace, serviceAlias, destServiceAlias, port.Port),
ServiceName: fmt.Sprintf("%s_%s_%s_%s_%v", namespace, pluginID, serviceAlias, destServiceAlias, port.Port),
CircuitBreakers: cb,
}
cdsL = append(cdsL, pcds)
@ -452,98 +444,80 @@ func (d *DiscoverAction) ToolsBuildPieceLDS() {}
//ToolsGetRouterItem ToolsGetRouterItem
func (d *DiscoverAction) ToolsGetRouterItem(
destAlias, kind string,
sr *api_model.NetDownStreamRules) interface{} {
sr map[string]interface{}) interface{} {
switch kind {
case node_model.PREFIX:
if sr.Prefix != "" {
return sr.Prefix
if prefix, ok := sr[node_model.PREFIX]; ok {
return prefix
}
return "/"
case node_model.LIMITS:
if sr.Limit != 0 {
if sr.Limit == 10250 {
if circuit, ok := sr[node_model.LIMITS]; ok {
if circuit == 10250 {
return 0
}
return sr.Limit
return circuit
}
return 1024
case node_model.MaxRequests:
if sr.MaxRequests != 0 {
if sr.MaxRequests == 10250 {
if maxRequest, ok := sr[node_model.MaxRequests]; ok {
if maxRequest == 10250 {
return 0
}
return sr.MaxRequests
return maxRequest
}
return 1024
case node_model.MaxPendingRequests:
if sr.MaxPendingRequests != 0 {
if sr.MaxPendingRequests == 10250 {
if maxPendingRequests, ok := sr[node_model.MaxPendingRequests]; ok {
if maxPendingRequests == 10250 {
return 0
}
return sr.MaxPendingRequests
return maxPendingRequests
}
return 1024
case node_model.MaxRetries:
if sr.MaxRetries > 0 && sr.MaxRetries < 10 {
if sr.MaxRetries == 11 {
if maxRetries, ok := sr[node_model.MaxRetries]; ok {
if maxRetries.(int) > 0 && maxRetries.(int) < 10 {
return maxRetries
}
if maxRetries == 11 {
return 0
}
return sr.MaxRetries
}
return 3
case node_model.HEADERS:
var phL []*node_model.PieceHeader
if sr.Header != nil {
for _, h := range sr.Header {
ph := &node_model.PieceHeader{
Name: h.Key,
Value: h.Value,
}
phL = append(phL, ph)
}
}
ph := &node_model.PieceHeader{
Name: "Connection",
Value: "keep-alive",
}
phL = append(phL, ph)
return phL
return ""
case node_model.DOMAINS:
if sr.Domain != nil {
return sr.Domain
}
if sr.ServiceAlias != "" {
return []string{destAlias, sr.ServiceAlias}
if domain, ok := sr[node_model.DOMAINS]; ok {
if destAlias != "" {
return []string{destAlias, domain.(string)}
}
return []string{domain.(string)}
}
return []string{destAlias}
}
return ""
}
//ToolsGetStreamRules ToolsStreamRules
func (d *DiscoverAction) ToolsGetStreamRules(
namespace, sourceAlias, envName string,
rule interface{}) (interface{}, error) {
k := fmt.Sprintf("/resources/define/%s/%s/%s", namespace, sourceAlias, envName)
//ToolsGetRainbondResources 获取rainbond自定义resources
func (d *DiscoverAction) ToolsGetRainbondResources(
namespace, sourceAlias, pluginID string) (*api_model.ResourceSpec, error) {
k := fmt.Sprintf("/resources/define/%s/%s/%s", namespace, sourceAlias, pluginID)
resp, err := d.etcdCli.Get(k)
if err != nil {
logrus.Errorf("get etcd value error, %v", err)
return nil, util.CreateAPIHandleError(500, err)
}
var ss api_model.SourceSpec
var rs api_model.ResourceSpec
if resp.Count != 0 {
v := resp.Kvs[0].Value
if err := ffjson.Unmarshal(v, &ss); err != nil {
if err := ffjson.Unmarshal(v, &rs); err != nil {
logrus.Errorf("unmashal etcd v error, %v", err)
return nil, util.CreateAPIHandleError(500, err)
}
} else {
logrus.Debugf("key %s is not exist,", envName)
logrus.Debugf("key %s is not exist,", k)
return nil, nil
}
if err := ffjson.Unmarshal([]byte(ss.SourceBody.EnvVal.(string)), &rule); err != nil {
logrus.Errorf("umashal value error, %v", err)
return nil, err
}
return rule, nil
return &rs, nil
}