Merge branch 'master' into multi

This commit is contained in:
huangrh 2019-04-01 16:14:16 +08:00
commit 217f79dbb4
49 changed files with 1165 additions and 780 deletions

View File

@ -142,7 +142,12 @@ type Gatewayer interface {
RuleConfig(w http.ResponseWriter, r *http.Request)
}
// ThirdPartyServicer is a interface for defining methods for third-party service.
// ThirdPartyServicer is an interface for defining methods for third-party service.
type ThirdPartyServicer interface {
Endpoints(w http.ResponseWriter, r *http.Request)
}
// Labeler is an interface for defining methods to get information of labels.
type Labeler interface {
Labels(w http.ResponseWriter, r *http.Request)
}

View File

@ -226,6 +226,7 @@ func (v2 *V2) clusterRouter() chi.Router {
func (v2 *V2) resourcesRouter() chi.Router {
r := chi.NewRouter()
r.Get("/labels", controller.GetManager().Labels)
r.Post("/tenants", controller.GetManager().TenantResources)
r.Post("/services", controller.GetManager().ServiceResources)
r.Get("/tenants/sum", controller.GetManager().SumTenants)

View File

@ -1,6 +1,7 @@
package controller
import (
"encoding/json"
"fmt"
"net/http"

42
api/controller/labels.go Normal file
View File

@ -0,0 +1,42 @@
// RAINBOND, Application Management Platform
// Copyright (C) 2014-2017 Goodrain Co., Ltd.
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version. For any non-GPL usage of Rainbond,
// one or multiple Commercial Licenses authorized by Goodrain Co., Ltd.
// must be obtained first.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package controller
import (
"github.com/goodrain/rainbond/cmd/api/option"
httputil "github.com/goodrain/rainbond/util/http"
"net/http"
)
// LabelController implements Labeler.
type LabelController struct {
optconfig *option.Config
}
// Labels - get -> list labels
func (l *LabelController) Labels(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case "GET":
l.listLabels(w, r)
}
}
func (l *LabelController) listLabels(w http.ResponseWriter, r *http.Request) {
httputil.ReturnSuccess(r, w, l.optconfig.EnableFeature)
}

View File

@ -48,6 +48,7 @@ type V2Manager interface {
api.AppInterface
api.Gatewayer
api.ThirdPartyServicer
api.Labeler
}
var defaultV2Manager V2Manager
@ -79,5 +80,7 @@ func NewManager(conf option.Config, statusCli *client.AppRuntimeSyncClient) (*V2
v2r.GatewayStruct.MQClient = mqClient
v2r.GatewayStruct.cfg = &conf
v2r.LabelController.optconfig = &conf
return &v2r, nil
}

View File

@ -41,7 +41,7 @@ import (
"github.com/jinzhu/gorm"
"github.com/pquerna/ffjson/ffjson"
"github.com/renstorm/fuzzysearch/fuzzy"
"github.com/thedevsaddam/govalidator"
validator "github.com/thedevsaddam/govalidator"
)
//V2Routes v2Routes
@ -52,6 +52,7 @@ type V2Routes struct {
AppStruct
GatewayStruct
ThirdPartyServiceController
LabelController
}
//Show test
@ -600,14 +601,12 @@ func (t *TenantStruct) CreateService(w http.ResponseWriter, r *http.Request) {
// schema:
// "$ref": "#/responses/commandResponse"
// description: 统一返回格式
logrus.Debugf("trans create service service")
var ss api_model.ServiceStruct
body, err := ioutil.ReadAll(r.Body)
if err != nil {
httputil.ReturnError(r, w, 500, err.Error())
return
}
err = ffjson.Unmarshal(body, &ss)
if err != nil {
httputil.ReturnError(r, w, 500, err.Error())
@ -1615,7 +1614,7 @@ func (t *TenantStruct) AddProbe(w http.ResponseWriter, r *http.Request) {
tspD.FailureThreshold = tsp.FailureThreshold
tspD.HTTPHeader = tsp.HTTPHeader
tspD.InitialDelaySecond = tsp.InitialDelaySecond
tspD.IsUsed = tsp.IsUsed
tspD.IsUsed = &tsp.IsUsed
tspD.Mode = tsp.Mode
tspD.Path = tsp.Path
tspD.PeriodSecond = tsp.PeriodSecond
@ -1665,7 +1664,7 @@ func (t *TenantStruct) UpdateProbe(w http.ResponseWriter, r *http.Request) {
tspD.FailureThreshold = tsp.FailureThreshold
tspD.HTTPHeader = tsp.HTTPHeader
tspD.InitialDelaySecond = tsp.InitialDelaySecond
tspD.IsUsed = tsp.IsUsed
tspD.IsUsed = &tsp.IsUsed
tspD.Mode = tsp.Mode
tspD.Path = tsp.Path
tspD.PeriodSecond = tsp.PeriodSecond

View File

@ -54,7 +54,12 @@ func (g *GatewayAction) AddHTTPRule(req *apimodel.AddHTTPRuleStruct) (string, er
ServiceID: req.ServiceID,
ContainerPort: req.ContainerPort,
Domain: req.Domain,
Path: req.Path,
Path: func() string {
if !strings.HasPrefix(req.Path, "/") {
return "/" + req.Path
}
return req.Path
}(),
Header: req.Header,
Cookie: req.Cookie,
Weight: req.Weight,
@ -164,7 +169,12 @@ func (g *GatewayAction) UpdateHTTPRule(req *apimodel.UpdateHTTPRuleStruct) (stri
if req.Domain != "" {
rule.Domain = req.Domain
}
rule.Path = req.Path
rule.Path = func() string {
if !strings.HasPrefix(req.Path, "/") {
return "/" + req.Path
}
return req.Path
}()
rule.Header = req.Header
rule.Cookie = req.Cookie
rule.Weight = req.Weight
@ -611,6 +621,12 @@ func (g *GatewayAction) RuleConfig(req *apimodel.RuleConfigReq) error {
})
setheaders := make(map[string]string)
for _, item := range req.Body.SetHeaders {
if strings.TrimSpace(item.Key) == ""{
continue
}
if strings.TrimSpace(item.Value) == "" {
item.Value = "empty"
}
// filter same key
setheaders["set-header-"+item.Key] = item.Value
}

View File

@ -100,9 +100,9 @@ func (t *ThirdPartyServiceHanlder) ListEndpoints(sid string) ([]*model.EndpointR
for _, item := range endpoints {
m[item.UUID] = &model.EndpointResp{
EpID: item.UUID,
IP: func(ip string, port int) string {
if port != 0 {
return fmt.Sprintf("%s:%d", ip, port)
IP: func(ip string, p int) string {
if p != 0 {
return fmt.Sprintf("%s:%d", ip, p)
}
return ip
}(item.IP, item.Port),
@ -126,13 +126,8 @@ func (t *ThirdPartyServiceHanlder) ListEndpoints(sid string) ([]*model.EndpointR
continue
}
m[item.Uuid] = &model.EndpointResp{
EpID: item.Uuid,
IP: func(ip string, port int32) string {
if port != 0 {
return fmt.Sprintf("%s:%d", ip, port)
}
return ip
}(item.Ip, item.Port),
EpID: item.Uuid,
IP: item.Ip,
Status: item.Status,
IsOnline: true,
IsStatic: false,

View File

@ -21,7 +21,6 @@ package region
import (
"bytes"
"fmt"
"github.com/goodrain/rainbond/api/model"
"github.com/goodrain/rainbond/api/util"
dbmodel "github.com/goodrain/rainbond/db/model"

View File

@ -25,8 +25,9 @@ type ServiceDeployInfo struct {
Deployment string `protobuf:"bytes,3,opt,name=deployment,proto3" json:"deployment,omitempty"`
Pods map[string]string `protobuf:"bytes,4,rep,name=pods,proto3" json:"pods,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
Services map[string]string `protobuf:"bytes,5,rep,name=services,proto3" json:"services,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
Secrets map[string]string `protobuf:"bytes,6,rep,name=secrets,proto3" json:"secrets,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
Ingresses map[string]string `protobuf:"bytes,7,rep,name=ingresses,proto3" json:"ingresses,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
Replicatset map[string]string `protobuf:"bytes,8,rep,name=replicatset,proto3" json:"replicatset,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
Status string `protobuf:"bytes,9,opt,name=status,proto3" json:"status,omitempty"`
Endpoints map[string]string `protobuf:"bytes,6,rep,name=endpoints,proto3" json:"endpoints,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
Secrets map[string]string `protobuf:"bytes,7,rep,name=secrets,proto3" json:"secrets,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
Ingresses map[string]string `protobuf:"bytes,8,rep,name=ingresses,proto3" json:"ingresses,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
Replicatset map[string]string `protobuf:"bytes,9,rep,name=replicatset,proto3" json:"replicatset,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
Status string `protobuf:"bytes,10,opt,name=status,proto3" json:"status,omitempty"`
}

View File

@ -136,11 +136,11 @@ func (e *exectorManager) AddTask(task *pb.TaskMessage) error {
}
func (e *exectorManager) exec(task *pb.TaskMessage) error {
creater, ok := workerCreaterList[task.TaskType]
creator, ok := workerCreaterList[task.TaskType]
if !ok {
return fmt.Errorf("`%s` tasktype can't support", task.TaskType)
}
worker, err := creater(task.TaskBody, e)
worker, err := creator(task.TaskBody, e)
if err != nil {
logrus.Errorf("create worker for builder error.%s", err)
return err

View File

@ -19,16 +19,14 @@
package exector
import (
"time"
"fmt"
"io/ioutil"
"os"
"os/exec"
"regexp"
"strconv"
"strings"
"regexp"
"time"
"github.com/Sirupsen/logrus"
"github.com/docker/docker/client"
@ -39,7 +37,7 @@ import (
"github.com/goodrain/rainbond/util"
"github.com/pkg/errors"
"github.com/tidwall/gjson"
"gopkg.in/yaml.v2"
yaml "gopkg.in/yaml.v2"
)
var re = regexp.MustCompile(`\s`)
@ -270,7 +268,7 @@ func (i *ExportApp) exportImage(serviceDir string, app gjson.Result) error {
}
//change save app image name
imageName := sources.ImageNameWithNamespaceHandle(image)
saveImageName := fmt.Sprintf("%s/%s:%s", "goodrain.me", imageName.Name, imageName.Tag)
saveImageName := fmt.Sprintf("%s/%s:%s", builder.REGISTRYDOMAIN, imageName.Name, imageName.Tag)
if err := sources.ImageTag(i.DockerClient, image, saveImageName, i.Logger, 2); err != nil {
return err
}
@ -351,7 +349,7 @@ func (i *ExportApp) savePlugins() error {
}
//change save app image name
imageName := sources.ImageNameWithNamespaceHandle(image)
saveImageName := fmt.Sprintf("%s/%s:%s", "goodrain.me", imageName.Name, imageName.Tag)
saveImageName := fmt.Sprintf("%s/%s:%s", builder.REGISTRYDOMAIN, imageName.Name, imageName.Tag)
if err := sources.ImageTag(i.DockerClient, image, saveImageName, i.Logger, 2); err != nil {
return err
}
@ -498,6 +496,7 @@ type Service struct {
Volumes []string `yaml:"volumes,omitempty"`
Command string `yaml:"command,omitempty"`
Environment map[string]string `yaml:"environment,omitempty"`
DependsOn []string `yaml:"depends_on,omitempty"`
Loggin struct {
Driver string `yaml:"driver,omitempty"`
Options struct {
@ -525,6 +524,7 @@ func (i *ExportApp) buildDockerComposeYaml() error {
for _, app := range apps {
image := app.Get("image").String()
shareImage := app.Get("share_image").String()
appName := app.Get("service_cname").String()
appName = unicode2zh(appName)
volumes := make([]string, 0, 3)
@ -542,8 +542,10 @@ func (i *ExportApp) buildDockerComposeYaml() error {
volumes = append(volumes, fmt.Sprintf("%s:%s", volumeName, volumePath))
}
lang := app.Get("language").String()
// 如果该组件是源码方式部署则挂载slug文件到runner容器内
if checkIsRunner(image) {
if lang != "dockerfile" && checkIsRunner(image) {
shareImage = image
shareSlugPath := app.Get("share_slug_path").String()
tarFileName := buildToLinuxFileName(shareSlugPath)
volume := fmt.Sprintf("__GROUP_DIR__/%s/%s:/tmp/slug/slug.tgz", appName, tarFileName)
@ -552,8 +554,10 @@ func (i *ExportApp) buildDockerComposeYaml() error {
}
// 处理环境变量
for k, v := range app.Get("service_env_map_list").Map() {
envs[k] = v.String()
for _, item := range app.Get("service_env_map_list").Array() {
key := item.Get("attr_name").String()
value := item.Get("attr_value").String()
envs[key] = value
}
for _, item := range app.Get("service_connect_info_map_list").Array() {
@ -562,6 +566,7 @@ func (i *ExportApp) buildDockerComposeYaml() error {
envs[key] = value
}
var depServices []string
// 如果该app依赖了另了个app-b则把app-b中所有公开环境变量注入到该app
for _, item := range app.Get("dep_service_map_list").Array() {
serviceKey := item.Get("dep_service_key").String()
@ -569,10 +574,14 @@ func (i *ExportApp) buildDockerComposeYaml() error {
for k, v := range depEnvs {
envs[k] = v
}
if svc := i.getDependedService(serviceKey, &apps); svc != "" {
depServices = append(depServices, svc)
}
}
service := &Service{
Image: image,
Image: shareImage,
ContainerName: appName,
Restart: "always",
NetworkMode: "host",
@ -583,6 +592,9 @@ func (i *ExportApp) buildDockerComposeYaml() error {
service.Loggin.Driver = "json-file"
service.Loggin.Options.MaxSize = "5m"
service.Loggin.Options.MaxFile = "2"
if depServices != nil && len(depServices) > 0 {
service.DependsOn = depServices
}
y.Services[appName] = service
}
@ -607,7 +619,7 @@ func (i *ExportApp) buildDockerComposeYaml() error {
func (i *ExportApp) getPublicEnvByKey(serviceKey string, apps *[]gjson.Result) map[string]string {
envs := make(map[string]string, 5)
for _, app := range *apps {
appKey := app.Get("service_key").String()
appKey := app.Get("service_share_uuid").String()
if appKey == serviceKey {
for _, item := range app.Get("service_connect_info_map_list").Array() {
key := item.Get("attr_name").String()
@ -621,6 +633,15 @@ func (i *ExportApp) getPublicEnvByKey(serviceKey string, apps *[]gjson.Result) m
return envs
}
func (i *ExportApp) getDependedService(key string, apps *[]gjson.Result) string {
for _, app := range *apps {
if key == app.Get("service_share_uuid").String() {
return app.Get("service_cname").String()
}
}
return ""
}
func (i *ExportApp) buildStartScript() error {
if err := exec.Command("cp", "/src/export-app/run.sh", i.SourceDir).Run(); err != nil {
err = errors.New("Failed to generate start script to: " + i.SourceDir)

View File

@ -0,0 +1,35 @@
// RAINBOND, Application Management Platform
// Copyright (C) 2014-2017 Goodrain Co., Ltd.
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version. For any non-GPL usage of Rainbond,
// one or multiple Commercial Licenses authorized by Goodrain Co., Ltd.
// must be obtained first.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package exector
import (
"github.com/goodrain/rainbond/event"
"testing"
)
func TestExportApp_parseApps(t *testing.T) {
e := &ExportApp{
SourceDir: "./",
Logger: event.GetTestLogger(),
}
err := e.buildDockerComposeYaml()
if err != nil {
t.Error(err)
}
}

View File

@ -32,6 +32,7 @@ import (
simplejson "github.com/bitly/go-simplejson"
"github.com/docker/docker/client"
"github.com/goodrain/rainbond/api/model"
"github.com/goodrain/rainbond/builder"
"github.com/goodrain/rainbond/builder/sources"
"github.com/goodrain/rainbond/db"
"github.com/goodrain/rainbond/event"
@ -388,7 +389,7 @@ func (i *ImportApp) importPlugins() error {
// 上传之前先要根据新的仓库地址修改镜像名
image := i.oldPluginPath[plugin.Get("service_key").String()]
imageName := sources.ImageNameWithNamespaceHandle(image)
saveImageName := fmt.Sprintf("%s/%s:%s", "goodrain.me", imageName.Name, imageName.Tag)
saveImageName := fmt.Sprintf("%s/%s:%s", builder.REGISTRYDOMAIN, imageName.Name, imageName.Tag)
newImageName := plugin.Get("share_image").String()
if err := sources.ImageTag(i.DockerClient, saveImageName, newImageName, i.Logger, 2); err != nil {
return fmt.Errorf("change plugin image tag(%s => %s) error %s", saveImageName, newImageName, err.Error())
@ -440,7 +441,7 @@ func (i *ImportApp) loadApps() error {
pass := app.Get("service_image.hub_password").String()
// 上传之前先要根据新的仓库地址修改镜像名
image := app.Get("share_image").String()
if err := sources.ImageTag(i.DockerClient, fmt.Sprintf("%s/%s:%s", "goodrain.me", oldImageName.Name, oldImageName.Tag), image, i.Logger, 15); err != nil {
if err := sources.ImageTag(i.DockerClient, fmt.Sprintf("%s/%s:%s", builder.REGISTRYDOMAIN, oldImageName.Name, oldImageName.Tag), image, i.Logger, 15); err != nil {
return fmt.Errorf("change image tag(%s => %s) error %s", fmt.Sprintf("%s/%s:%s", i.ServiceImage.HubUrl, oldImageName.Name, oldImageName.Tag), image, err.Error())
}
// 开始上传

File diff suppressed because one or more lines are too long

View File

@ -107,7 +107,7 @@ func (e *exectorManager) runD(t *model.BuildPluginTaskBody, logger event.Logger)
logger.Info("代码检测为dockerfile开始编译", map[string]string{"step": "build-exector"})
mm := strings.Split(t.GitURL, "/")
n1 := strings.Split(mm[len(mm)-1], ".")[0]
buildImageName := fmt.Sprintf("goodrain.me/plugin_%s_%s:%s", n1, t.PluginID, t.DeployVersion)
buildImageName := fmt.Sprintf(builder.REGISTRYDOMAIN+"/plugin_%s_%s:%s", n1, t.PluginID, t.DeployVersion)
buildOptions := types.ImageBuildOptions{
Tags: []string{buildImageName},
Remove: true,

View File

@ -48,7 +48,7 @@ var REGISTRYUSER = ""
var REGISTRYPASS = ""
//RUNNERIMAGENAME runner image name
var RUNNERIMAGENAME = "goodrain.me/runner"
var RUNNERIMAGENAME = REGISTRYDOMAIN + "/runner"
//BUILDERIMAGENAME builder image name
var BUILDERIMAGENAME = "goodrain.me/builder"
var BUILDERIMAGENAME = REGISTRYDOMAIN + "/builder"

View File

@ -409,7 +409,7 @@ type TenantServiceProbe struct {
//检测超时时间
TimeoutSecond int `gorm:"column:timeout_second;size:3;default:30" json:"timeout_second" validate:"timeout_second"`
//是否启用
IsUsed int `gorm:"column:is_used;size:1;default:1" json:"is_used" validate:"is_used"`
IsUsed *int `gorm:"column:is_used;size:1;default:1" json:"is_used" validate:"is_used"`
//标志为失败的检测次数
FailureThreshold int `gorm:"column:failure_threshold;size:2;default:3" json:"failure_threshold" validate:"failure_threshold"`
//标志为成功的检测次数

View File

@ -168,10 +168,7 @@ func (h *HTTPRuleDaoImpl) UpdateModel(mo model.Interface) error {
if !ok {
return fmt.Errorf("Failed to convert %s to *model.HTTPRule", reflect.TypeOf(mo).String())
}
return h.DB.Table(hr.TableName()).
Where("uuid = ?", hr.UUID).
Update(hr).Error
return h.DB.Save(hr).Error
}
// GetHTTPRuleByID gets a HTTPRule based on uuid

View File

@ -50,7 +50,8 @@ func (t *ServiceProbeDaoImpl) UpdateModel(mo model.Interface) error {
probe := mo.(*model.TenantServiceProbe)
if probe.ID == 0 {
var oldProbe model.TenantServiceProbe
if err := t.DB.Where("service_id = ? and mode = ? and probe_id=?", probe.ServiceID, probe.Mode, probe.ProbeID).Find(&oldProbe).Error; err != nil {
if err := t.DB.Where("service_id = ? and probe_id=?", probe.ServiceID,
probe.ProbeID).Find(&oldProbe).Error; err != nil {
return err
}
if oldProbe.ID == 0 {

View File

@ -116,7 +116,15 @@ func (l1 *Config) Equal(l2 *Config) bool {
if l1.ProxyBuffering != l2.ProxyBuffering {
return false
}
// TODO: ProxySetHeaders
if len(l1.SetHeaders) != len(l2.SetHeaders) {
return false
}
for k, v := range l1.SetHeaders {
if l2.SetHeaders[k] != v {
return false
}
}
return true
}
@ -219,6 +227,9 @@ func (a proxy) Parse(ing *extensions.Ingress) (interface{}, error) {
fmt.Sprintf("%s/%s", ing.GetNamespace(), ing.GetName()), err)
}
for k, v := range setHeaders {
if v == "empty" {
v = ""
}
config.SetHeaders[k] = v
}

View File

@ -92,7 +92,7 @@ func TestProxy(t *testing.T) {
data[parser.GetAnnotationWithPrefix("proxy-read-timeout")] = "3"
data[parser.GetAnnotationWithPrefix("proxy-buffers-number")] = "8"
data[parser.GetAnnotationWithPrefix("proxy-buffer-size")] = "1k"
data[parser.GetAnnotationWithPrefix("proxy-body-size")] = "2k"
data[parser.GetAnnotationWithPrefix("proxy-body-size")] = "2"
data[parser.GetAnnotationWithPrefix("proxy-next-upstream")] = "off"
data[parser.GetAnnotationWithPrefix("proxy-next-upstream-tries")] = "3"
data[parser.GetAnnotationWithPrefix("proxy-request-buffering")] = "off"
@ -122,8 +122,8 @@ func TestProxy(t *testing.T) {
if p.BufferSize != "1k" {
t.Errorf("expected 1k as buffer-size but returned %v", p.BufferSize)
}
if p.BodySize != "2k" {
t.Errorf("expected 2k as body-size but returned %v", p.BodySize)
if p.BodySize != 2 {
t.Errorf("expected 2 as body-size but returned %v", p.BodySize)
}
if p.NextUpstream != "off" {
t.Errorf("expected off as next-upstream but returned %v", p.NextUpstream)
@ -168,7 +168,7 @@ func TestProxyWithNoAnnotation(t *testing.T) {
if p.BufferSize != "10k" {
t.Errorf("expected 10k as buffer-size but returned %v", p.BufferSize)
}
if p.BodySize != "3k" {
if p.BodySize != 3 {
t.Errorf("expected 3k as body-size but returned %v", p.BodySize)
}
if p.NextUpstream != "error" {

View File

@ -29,9 +29,6 @@ import (
"strings"
"sync"
"github.com/goodrain/rainbond/db/model"
"k8s.io/apimachinery/pkg/labels"
"github.com/Sirupsen/logrus"
"github.com/eapache/channels"
"github.com/goodrain/rainbond/cmd/gateway/option"
@ -40,7 +37,7 @@ import (
"github.com/goodrain/rainbond/gateway/controller/config"
"github.com/goodrain/rainbond/gateway/defaults"
"github.com/goodrain/rainbond/gateway/util"
v1 "github.com/goodrain/rainbond/gateway/v1"
"github.com/goodrain/rainbond/gateway/v1"
corev1 "k8s.io/api/core/v1"
extensions "k8s.io/api/extensions/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -377,97 +374,74 @@ func (s *k8sStore) extractAnnotations(ing *extensions.Ingress) {
func (s *k8sStore) ListPool() ([]*v1.Pool, []*v1.Pool) {
var httpPools []*v1.Pool
var tcpPools []*v1.Pool
l7Pools := make(map[string]*v1.Pool)
l4Pools := make(map[string]*v1.Pool)
for _, item := range s.listers.Endpoint.List() {
ep := item.(*corev1.Endpoints)
f := func(backendMap map[string][]backend, poolMap map[string]struct{}) map[string]*v1.Pool {
pools := make(map[string]*v1.Pool)
for k, v := range backendMap {
service, err := s.listers.Service.ByKey(k)
if err != nil {
logrus.Warningf("Key: %s;error getting service: %v", k, err)
continue
}
originPort := service.GetLabels()["origin_port"]
var pluginPort int32
if originPort != "" {
port, err := strconv.Atoi(originPort)
if err != nil {
logrus.Warningf("Origin port: %s; error converting to type int: %v", err)
pluginPort = 0
} else {
pluginPort = int32(port)
if ep.Subsets != nil || len(ep.Subsets) != 0 {
epn := ep.ObjectMeta.Name
// l7
backends := l7PoolBackendMap[ep.ObjectMeta.Name]
for _, backend := range backends {
pool := l7Pools[backend.name]
if pool == nil {
pool = &v1.Pool{
Nodes: []*v1.Node{},
}
pool.Name = backend.name
pool.UpstreamHashBy = backend.hashBy
l7Pools[backend.name] = pool
}
}
// list related endpoints
labelname, ok := service.GetLabels()["name"]
if !ok {
logrus.Warningf("Key: %s;label 'name' not found", k)
continue
}
selector, err := labels.Parse(fmt.Sprintf("name=%s", labelname))
if err != nil {
logrus.Warningf("Label: %s; error parsing labels: %s",
fmt.Sprintf("name=%s", labelname), err.Error())
continue
}
endpoints, err := s.sharedInformer.Core().V1().Endpoints().Lister().Endpoints(service.GetNamespace()).List(selector)
if err != nil {
logrus.Warningf("Label: %s; error listing endpoints: %v",
fmt.Sprintf("name=%s", labelname), err)
continue
}
for _, ep := range endpoints {
if ep.Subsets == nil || len(ep.Subsets) == 0 {
continue
}
if ep.GetLabels()["service_kind"] != model.ServiceKindThirdParty.String() {
if ep.Subsets[0].Ports[0].Port != service.Spec.Ports[0].TargetPort.IntVal {
continue
for _, ss := range ep.Subsets {
var addresses []corev1.EndpointAddress
if ss.Addresses != nil && len(ss.Addresses) > 0 {
addresses = append(addresses, ss.Addresses...)
} else {
addresses = append(addresses, ss.NotReadyAddresses...)
}
for _, address := range addresses {
if _, ok := l7PoolMap[epn]; ok { // l7
pool.Nodes = append(pool.Nodes, &v1.Node{
Host: address.IP,
Port: ss.Ports[0].Port,
Weight: backend.weight,
})
}
}
}
for _, backend := range v {
pool := pools[backend.name]
if pool == nil {
pool = &v1.Pool{
Nodes: []*v1.Node{},
}
pool.Name = backend.name
pool.UpstreamHashBy = backend.hashBy
pools[backend.name] = pool
}
// l4
backends = l4PoolBackendMap[ep.ObjectMeta.Name]
for _, backend := range backends {
pool := l4Pools[backend.name]
if pool == nil {
pool = &v1.Pool{
Nodes: []*v1.Node{},
}
for _, ss := range ep.Subsets {
var addresses []corev1.EndpointAddress
if ss.Addresses != nil && len(ss.Addresses) > 0 {
addresses = append(addresses, ss.Addresses...)
} else {
addresses = append(addresses, ss.NotReadyAddresses...)
}
for _, address := range addresses {
if _, ok := poolMap[k]; ok { // l7
pool.Nodes = append(pool.Nodes, &v1.Node{
Host: address.IP,
Port: func(pluginPort, addressPort int32) int32 {
if pluginPort != 0 {
return pluginPort
}
return addressPort
}(pluginPort, ss.Ports[0].Port),
Weight: backend.weight,
})
}
pool.Name = backend.name
l4Pools[backend.name] = pool
}
for _, ss := range ep.Subsets {
var addresses []corev1.EndpointAddress
if ss.Addresses != nil && len(ss.Addresses) > 0 {
addresses = append(addresses, ss.Addresses...)
} else {
addresses = append(addresses, ss.NotReadyAddresses...)
}
for _, address := range addresses {
if _, ok := l4PoolMap[epn]; ok { // l7
pool.Nodes = append(pool.Nodes, &v1.Node{
Host: address.IP,
Port: ss.Ports[0].Port,
Weight: backend.weight,
})
}
}
}
}
}
return pools
}
l7Pools := f(l7PoolBackendMap, l7PoolMap)
l4Pools := f(l4PoolBackendMap, l4PoolMap)
// change map to slice TODO: use map directly
for _, pool := range l7Pools {
httpPools = append(httpPools, pool)
@ -491,20 +465,20 @@ func (s *k8sStore) ListVirtualService() (l7vs []*v1.VirtualService, l4vs []*v1.V
if !s.ingressIsValid(ing) {
continue
}
ingKey := k8s.MetaNamespaceKey(ing)
anns, err := s.GetIngressAnnotations(ingKey)
if err != nil {
logrus.Errorf("Error getting Ingress annotations %q: %v", ingKey, err)
}
if anns.L4.L4Enable && anns.L4.L4Port != 0 {
svcKey := fmt.Sprintf("%v/%v", ing.Namespace, ing.Spec.Backend.ServiceName)
// region l4
host := strings.Replace(anns.L4.L4Host, " ", "", -1)
if host == "" {
host = s.conf.IP
}
host = s.conf.IP
svcKey := fmt.Sprintf("%v/%v", ing.Namespace, ing.Spec.Backend.ServiceName)
protocol := s.GetServiceProtocol(svcKey, ing.Spec.Backend.ServicePort.IntVal)
listening := fmt.Sprintf("%s:%v", host, anns.L4.L4Port)
if string(protocol) == string(v1.ProtocolUDP) {
@ -521,11 +495,12 @@ func (s *k8sStore) ListVirtualService() (l7vs []*v1.VirtualService, l4vs []*v1.V
vs.Namespace = anns.Namespace
vs.ServiceID = anns.Labels["service_id"]
}
l4PoolMap[svcKey] = struct{}{}
l4PoolMap[ing.Spec.Backend.ServiceName] = struct{}{}
l4vsMap[listening] = vs
l4vs = append(l4vs, vs)
backend := backend{name: backendName, weight: anns.Weight.Weight}
l4PoolBackendMap[svcKey] = append(l4PoolBackendMap[svcKey], backend)
l4PoolBackendMap[ing.Spec.Backend.ServiceName] = append(l4PoolBackendMap[ing.Spec.Backend.ServiceName], backend)
// endregion
} else {
// region l7
@ -577,14 +552,15 @@ func (s *k8sStore) ListVirtualService() (l7vs []*v1.VirtualService, l4vs []*v1.V
vs.SSLCert = hostSSLMap[DefVirSrvName]
}
}
l7vsMap[virSrvName] = vs
l7vs = append(l7vs, vs)
}
for _, path := range rule.IngressRuleValue.HTTP.Paths {
svckey := fmt.Sprintf("%s/%s", ing.Namespace, path.Backend.ServiceName)
locKey := fmt.Sprintf("%s_%s", virSrvName, path.Path)
location := srvLocMap[locKey]
l7PoolMap[svckey] = struct{}{}
l7PoolMap[path.Backend.ServiceName] = struct{}{}
// if location do not exists, then creates a new one
if location == nil {
location = &v1.Location{
@ -593,10 +569,9 @@ func (s *k8sStore) ListVirtualService() (l7vs []*v1.VirtualService, l4vs []*v1.V
}
srvLocMap[locKey] = location
vs.Locations = append(vs.Locations, location)
// the first ingress proxy takes effect
location.Proxy = anns.Proxy
}
location.Proxy = anns.Proxy
// If their ServiceName is the same, then the new one will overwrite the old one.
nameCondition := &v1.Condition{}
var backendName string
@ -619,7 +594,7 @@ func (s *k8sStore) ListVirtualService() (l7vs []*v1.VirtualService, l4vs []*v1.V
if anns.UpstreamHashBy != "" {
backend.hashBy = anns.UpstreamHashBy
}
l7PoolBackendMap[svckey] = append(l7PoolBackendMap[svckey], backend)
l7PoolBackendMap[path.Backend.ServiceName] = append(l7PoolBackendMap[path.Backend.ServiceName], backend)
}
}
// endregion
@ -630,57 +605,47 @@ func (s *k8sStore) ListVirtualService() (l7vs []*v1.VirtualService, l4vs []*v1.V
// ingressIsValid checks if the specified ingress is valid
func (s *k8sStore) ingressIsValid(ing *extensions.Ingress) bool {
var svcKey string
var endpointKey string
if ing.Spec.Backend != nil { // stream
svcKey = fmt.Sprintf("%s/%s", ing.Namespace, ing.Spec.Backend.ServiceName)
endpointKey = fmt.Sprintf("%s/%s", ing.Namespace, ing.Spec.Backend.ServiceName)
} else { // http
Loop:
for _, rule := range ing.Spec.Rules {
for _, path := range rule.IngressRuleValue.HTTP.Paths {
svcKey = fmt.Sprintf("%s/%s", ing.Namespace, path.Backend.ServiceName)
if svcKey != "" {
endpointKey = fmt.Sprintf("%s/%s", ing.Namespace, path.Backend.ServiceName)
if endpointKey != "" {
break Loop
}
}
}
}
service, err := s.listers.Service.ByKey(svcKey)
item, exists, err := s.listers.Endpoint.GetByKey(endpointKey)
if err != nil {
logrus.Warningf("Key: %s; error getting key: %v", err)
logrus.Errorf("Can not get endpoint by key(%s): %v", endpointKey, err)
return false
}
labelname := fmt.Sprintf("name=%s", service.GetLabels()["name"])
selector, err := labels.Parse(labelname)
if err != nil {
logrus.Warningf("Label: %s;error parsing labels: %v", labelname, err)
if !exists {
logrus.Warningf("Endpoint \"%s\" does not exist.", endpointKey)
return false
}
eps, err := s.sharedInformer.Core().V1().Endpoints().Lister().Endpoints(service.GetNamespace()).List(selector)
if err != nil {
logrus.Warningf("selector: %s; error listing endpoints: %v",
fmt.Sprintf("name=%s", labelname), err)
endpoint, ok := item.(*corev1.Endpoints)
if !ok {
logrus.Errorf("Cant not convert %v to %v", reflect.TypeOf(item), reflect.TypeOf(endpoint))
return false
}
if eps == nil || len(eps) == 0 {
logrus.Warningf("Selector: %s; empty endpoints", fmt.Sprintf("name=%s", labelname))
if endpoint.Subsets == nil || len(endpoint.Subsets) == 0 {
logrus.Warningf("Endpoints(%s) is empty, ignore it", endpointKey)
return false
}
result := false
for _, ep := range eps {
if ep.Subsets != nil && len(ep.Subsets) > 0 {
e := ep.Subsets[0]
if ep.GetLabels()["service_kind"] != model.ServiceKindThirdParty.String() &&
e.Ports[0].Port != service.Spec.Ports[0].Port {
continue
}
if !((e.Addresses == nil || len(e.Addresses) == 0) && (e.NotReadyAddresses == nil || len(e.NotReadyAddresses) == 0)) {
result = true
break
}
for _, ep := range endpoint.Subsets {
if (ep.Addresses == nil || len(ep.Addresses) == 0) && (ep.NotReadyAddresses == nil || len(ep.NotReadyAddresses) == 0) {
logrus.Warningf("Endpoints(%s) is empty, ignore it", endpointKey)
return false
}
}
return result
return true
}
// GetIngress returns the Ingress matching key.
@ -701,20 +666,6 @@ func (s *k8sStore) ListIngresses() []*extensions.Ingress {
return ingresses
}
// GetServiceNameLabelByKey returns name in the labels of corev1.Service
// matching key(name/namespace).
func (s *k8sStore) GetServiceNameLabelByKey(key string) (string, error) {
svc, err := s.listers.Service.ByKey(key)
if err != nil {
return "", err
}
name, ok := svc.Labels["name"]
if !ok {
return "", fmt.Errorf("label \"name\" not found")
}
return name, nil
}
// GetServiceProtocol returns the Service matching key and port.
func (s *k8sStore) GetServiceProtocol(key string, port int32) corev1.Protocol {
svcs, err := s.listers.Service.ByKey(key)

View File

@ -75,6 +75,10 @@ func (l *Location) Equals(c *Location) bool {
}
}
if !l.Proxy.Equal(&c.Proxy) {
return false
}
return true
}

View File

@ -46,6 +46,9 @@ func (n *Node) Equals(c *Node) bool { //
if n.Host != c.Host {
return false
}
if n.Port != c.Port {
return false
}
if n.Protocol != c.Protocol {
return false
}
@ -61,5 +64,11 @@ func (n *Node) Equals(c *Node) bool { //
if n.Weight != c.Weight {
return false
}
if n.MaxFails != c.MaxFails {
return false
}
if n.FailTimeout != c.FailTimeout {
return false
}
return true
}

View File

@ -18,42 +18,21 @@
package v1
import "testing"
import (
"testing"
)
func TestNode_Equals(t *testing.T) {
n := &Node{
Meta: Meta{
Index: 888,
Name: "foo-node",
Namespace: "ns",
PluginName: "Nginx",
},
Host: "www.goodrain.com",
Port: 80,
Protocol: "Http",
State: "ok",
PoolName: "foo-poolName",
Ready: true,
Weight: 5,
}
c := &Node{
Meta: Meta{
Index: 888,
Name: "foo-node",
Namespace: "ns",
PluginName: "Nginx",
},
Host: "www.goodrain.com",
Port: 80,
Protocol: "Http",
State: "ok",
PoolName: "foo-poolName",
Ready: true,
Weight: 5,
}
n := newFakeNode()
c := newFakeNode()
if !n.Equals(c) {
t.Errorf("n should equal c")
}
f := newFakeNode()
f.MaxFails = 5
if n.Equals(f) {
t.Errorf("n should not equal c")
}
}
func newFakeNode() *Node {
@ -71,5 +50,7 @@ func newFakeNode() *Node {
PoolName: "foo-poolName",
Ready: true,
Weight: 5,
MaxFails: 3,
FailTimeout: "5",
}
}

View File

@ -47,7 +47,7 @@ func (s *SSLCert) Equals(c *SSLCert) bool {
if !s.Meta.Equals(c.Meta) {
return false
}
if s.CertificatePem != c.CertificatePem {
if (s.Certificate == nil) != (c.Certificate == nil) {
return false
}
if s.Certificate != nil && c.Certificate != nil {
@ -55,9 +55,6 @@ func (s *SSLCert) Equals(c *SSLCert) bool {
return false
}
}
if !(s.Certificate == nil && c.Certificate == nil) {
return false
}
if s.CertificateStr != c.CertificateStr {
return false
}

View File

@ -30,14 +30,24 @@ func TestSSLCert_Equals(t *testing.T) {
if !s.Equals(c) {
t.Errorf("s should equal c.")
}
s.Certificate = nil
if s.Equals(c) {
t.Errorf("s should not equal c.")
}
c.Certificate = nil
if !s.Equals(c) {
t.Errorf("s should equal c.")
}
}
func newFakeSSLCert() *SSLCert {
meta := newFakeMeta()
certificate := &x509.Certificate{}
certificate.Raw = []byte("foobar")
return &SSLCert{
Meta: &meta,
CertificateStr: "dummy certificate str",
Certificate: &x509.Certificate{},
Certificate: certificate,
PrivateKey: "dummy private key",
CertificatePem: "/expert/servers/nginx/certificate/dummy-secret.pem",
CN: []string{

View File

@ -174,6 +174,17 @@ func (v *VirtualService) Equals(c *VirtualService) bool {
if !v.SSLCert.Equals(c.SSLCert) {
return false
}
if v.ForceSSLRedirect != c.ForceSSLRedirect {
return false
}
if len(v.ExtensionConfig) != len(c.ExtensionConfig) {
return false
}
for key, ve := range v.ExtensionConfig {
if c.ExtensionConfig[key] != ve {
return false
}
}
return true
}

View File

@ -392,6 +392,36 @@ func showServiceDeployInfo(c *cli.Context) error {
}
fmt.Println("------------Service------------")
fmt.Println(serviceTable.Render())
//show endpoints
if deployInfo.Endpoints != nil && len(deployInfo.Endpoints) > 0 {
epTable := termtables.CreateTable()
epTable.AddHeaders("Name", "IP", "Port", "Protocol")
for epname := range deployInfo.Endpoints {
if clients.K8SClient != nil {
ep, _ := clients.K8SClient.CoreV1().Endpoints(tenantID).Get(epname, metav1.GetOptions{})
if ep != nil {
for i := range ep.Subsets {
ss := &ep.Subsets[i]
for j := range ss.Ports {
port := &ss.Ports[j]
for k := range ss.Addresses {
address := &ss.Addresses[k]
epTable.AddRow(ep.Name, address.IP, port.Port, port.Protocol)
}
for k := range ss.NotReadyAddresses {
address := &ss.NotReadyAddresses[k]
epTable.AddRow(ep.Name, address.IP, port.Port, port.Protocol)
}
}
}
}
} else {
epTable.AddRow(epname, "-", "-", "-")
}
}
fmt.Println("------------endpoints------------")
fmt.Println(epTable.Render())
}
//show ingress
ingressTable := termtables.CreateTable()
ingressTable.AddHeaders("Name", "Host")

View File

@ -7,7 +7,12 @@ BASE_NAME=rainbond
GO_VERSION=1.11
GATEWAY_GO_VERSION=1.11-alpine3.8
VERSION=master
VERSION=5.1.1
if [ -z "$TRAVIS_TAG" ]; then
VERSION=$TRAVIS_BRANCH-dev
else
VERSION=$TRAVIS_TAG
fi
buildTime=$(date +%F-%H)
git_commit=$(git log -n 1 --pretty --format=%h)

View File

@ -20,7 +20,6 @@ package prober
import (
"context"
"encoding/json"
"errors"
"sync"
"time"
@ -39,12 +38,13 @@ type Prober interface {
CloseWatch(serviceName string, id string) error
Start()
AddServices(in []*v1.Service)
CheckAndAddService(in *v1.Service) bool
CheckIfExist(in *v1.Service) bool
SetServices([]*v1.Service)
GetServices() []*v1.Service
GetServiceHealth() map[string]*v1.HealthStatus
SetAndUpdateServices([]*v1.Service) error
AddAndUpdateServices([]*v1.Service) error
UpdateServiceProbe(service *v1.Service)
UpdateServicesProbe(services []*v1.Service)
Stop() error
DisableWatcher(serviceName, watcherID string)
@ -276,22 +276,30 @@ func (p *probeManager) AddServices(in []*v1.Service) {
}
// CheckAndAddService checks if the input service exists, if it does not exist, add it.
func (p *probeManager) CheckAndAddService(in *v1.Service) bool {
func (p *probeManager) CheckIfExist(in *v1.Service) bool {
exist := false
for _, svc := range p.services {
if svc.Name == in.Name {
logrus.Debugf("svc name: %s; in name: %s;", svc.Name, in.Name)
exist = true
}
}
if !exist {
b, _ := json.Marshal(in)
logrus.Debugf("add service: %s", string(b))
p.services = append(p.services, in)
}
return exist
}
func (p *probeManager) isNeedUpdate(new *v1.Service) bool {
var old *v1.Service
for _, svc := range p.services {
if svc.Name == new.Name {
old = svc
}
}
if old == nil {
// not found old one
return true
}
return !new.Equal(old)
}
func (p *probeManager) GetServiceHealth() map[string]*v1.HealthStatus {
return p.status
}
@ -361,21 +369,46 @@ func (p *probeManager) updateAllServicesProbe() {
}
}
// UpdateServiceProbe updates and runs one service probe.
func (p *probeManager) UpdateServiceProbe(service *v1.Service) {
if service.ServiceHealth == nil || service.Disable || !p.isNeedUpdate(service) {
return
}
logrus.Debugf("Probe: %s; update probe.", service.Name)
// stop old probe
old := p.serviceProbe[service.Name]
if old != nil {
old.Stop()
}
// create new probe
serviceProbe := probe.CreateProbe(p.ctx, p.statusChan, service)
if serviceProbe != nil {
p.serviceProbe[service.Name] = serviceProbe
serviceProbe.Check()
}
}
// UpdateServicesProbe updates and runs services probe.
func (p *probeManager) UpdateServicesProbe(services []*v1.Service) {
logrus.Debugf("update services probe...")
if services == nil || len(services) == 0 {
logrus.Debug("Empty services")
return
}
oldSvcs := p.ListServicesBySid(services[0].Sid)
for _, v := range services {
if v.ServiceHealth == nil {
continue
}
if v.Disable {
delete(oldSvcs, v.Name)
if v.ServiceHealth == nil || v.Disable || !p.isNeedUpdate(v) {
continue
}
logrus.Debugf("Probe: %s; update probe.", v.Name)
// stop old probe
old := p.serviceProbe[v.Name]
if old != nil {
old.Stop()
}
if !p.CheckIfExist(v) {
p.services = append(p.services, v)
}
// create new probe
serviceProbe := probe.CreateProbe(p.ctx, p.statusChan, v)
if serviceProbe != nil {
@ -383,6 +416,20 @@ func (p *probeManager) UpdateServicesProbe(services []*v1.Service) {
serviceProbe.Check()
}
}
for _, svc := range oldSvcs {
p.StopProbes([]string{svc.Name})
}
}
// ListServicesBySid lists services corresponding to sid.
func (p *probeManager) ListServicesBySid(sid string) map[string]*v1.Service {
res := make(map[string]*v1.Service)
for _, svc := range p.services {
if svc.Sid == sid {
res[svc.Name] = svc
}
}
return res
}
// GetProbe returns a probe associated with name.
@ -397,6 +444,7 @@ func (p *probeManager) StopProbes(names []string) {
logrus.Debugf("Name: %s; Probe not found.", name)
continue
}
logrus.Debugf("Name: %s; Probe stopped", name)
probe.Stop()
delete(p.serviceProbe, name)
for idx, svc := range p.services {

View File

@ -20,7 +20,6 @@ package probe
import (
"context"
"github.com/Sirupsen/logrus"
"github.com/goodrain/rainbond/util/prober/types/v1"
)
@ -34,7 +33,6 @@ type Probe interface {
func CreateProbe(ctx context.Context, statusChan chan *v1.HealthStatus, v *v1.Service) Probe {
ctx, cancel := context.WithCancel(ctx)
if v.ServiceHealth.Model == "tcp" {
logrus.Debug("creat tcp probe...")
t := &TCPProbe{
Name: v.ServiceHealth.Name,
Address: v.ServiceHealth.Address,
@ -47,7 +45,6 @@ func CreateProbe(ctx context.Context, statusChan chan *v1.HealthStatus, v *v1.Se
return t
}
if v.ServiceHealth.Model == "http" {
logrus.Debug("creat http probe...")
t := &HTTPProbe{
Name: v.ServiceHealth.Name,
Address: v.ServiceHealth.Address,

View File

@ -32,7 +32,7 @@ func (h *TCPProbe) Stop() {
// TCPCheck -
func (h *TCPProbe) TCPCheck() {
logrus.Debug("tcp check...")
logrus.Debugf("TCP check; Name: %s; Address: %s", h.Name, h.Address)
timer := time.NewTimer(time.Second * time.Duration(h.TimeInterval))
defer timer.Stop()
for {

View File

@ -25,27 +25,78 @@ const (
StatHealthy string = "healthy"
// StatUnhealthy -
StatUnhealthy string = "unhealthy"
// StaTDeath -
// StatDeath -
StatDeath string = "death"
)
//Service Service
// Service Service
type Service struct {
Sid string `json:"service_id"`
Name string `json:"name"`
ServiceHealth *Health `json:"health"`
Disable bool `json:"disable"`
}
// Equal check if the left service(l) is equal to the right service(r)
func (l *Service) Equal(r *Service) bool {
if l == r {
return true
}
if l.Sid != r.Sid {
return false
}
if l.Name != r.Name {
return false
}
if l.Disable != r.Disable {
return false
}
if !l.ServiceHealth.Equal(r.ServiceHealth) {
return false
}
return true
}
//Health ServiceHealth
type Health struct {
Name string `json:"name"`
Model string `json:"model"`
IP string `json:"ip"`
Port int `json:"port"`
Address string `json:"address"`
TimeInterval int `json:"time_interval"`
MaxErrorsNum int `json:"max_errors_num"`
}
// Equal check if the left health(l) is equal to the right health(r)
func (l *Health) Equal(r *Health) bool {
if l == r {
return true
}
if l.Name != r.Name {
return false
}
if l.Model != r.Model {
return false
}
if l.IP != r.IP {
return false
}
if l.Port != r.Port {
return false
}
if l.Address != r.Address {
return false
}
if l.TimeInterval != r.TimeInterval {
return false
}
if l.MaxErrorsNum != r.MaxErrorsNum {
return false
}
return true
}
//HealthStatus health status
type HealthStatus struct {
Name string `json:"name"`

View File

@ -55,8 +55,10 @@ func (s *restartController) Begin() {
s.manager.callback(s.controllerID, nil)
}
func (s *restartController) restartOne(app v1.AppService) error {
stopController := stopController{
//Restart the control set timeout interval is 5m
stopController := &stopController{
manager: s.manager,
waiting: time.Minute * 5,
}
if err := stopController.stopOne(app); err != nil {
app.Logger.Error("(Restart)Stop app failure %s,you could waiting stoped and manual start it", GetCallbackLoggerOption())

View File

@ -35,6 +35,7 @@ type stopController struct {
controllerID string
appService []v1.AppService
manager *Manager
waiting time.Duration
}
func (s *stopController) Begin() {
@ -143,6 +144,9 @@ func (s *stopController) WaitingReady(app v1.AppService) error {
if storeAppService != nil && storeAppService.Replicas > 0 {
timeout = time.Duration(storeAppService.Replicas) * timeout
}
if s.waiting != 0 {
timeout = s.waiting
}
if err := WaitStop(s.manager.store, storeAppService, timeout, app.Logger, s.stopChan); err != nil {
return err
}

View File

@ -20,15 +20,16 @@ package controller
import (
"fmt"
"github.com/goodrain/rainbond/worker/appm/f"
"sync"
"time"
"github.com/Sirupsen/logrus"
v1 "github.com/goodrain/rainbond/worker/appm/types/v1"
"github.com/goodrain/rainbond/worker/appm/types/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
types "k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/types"
)
type upgradeController struct {
@ -140,7 +141,6 @@ func (s *upgradeController) upgradeService(newapp v1.AppService) {
}
func (s *upgradeController) upgradeOne(app v1.AppService) error {
//first: check and create namespace
_, err := s.manager.client.CoreV1().Namespaces().Get(app.TenantID, metav1.GetOptions{})
if err != nil {
@ -167,27 +167,16 @@ func (s *upgradeController) upgradeOne(app v1.AppService) error {
}
}
if ingresses := app.GetIngress(); ingresses != nil {
for _, ingress := range ingresses {
_, err := s.manager.client.Extensions().Ingresses(ingress.Namespace).Update(ingress)
if err != nil {
app.Logger.Error(fmt.Sprintf("upgrade ingress %s failure %s", app.ServiceAlias, err.Error()), getLoggerOption("failure"))
logrus.Errorf("upgrade ingress %s failure %s", app.ServiceAlias, err.Error())
}
}
}
//upgrade k8s service
oldApp := s.manager.store.GetAppService(app.ServiceID)
s.upgradeService(app)
//upgrade k8s secrets
if secrets := app.GetSecrets(); secrets != nil {
for _, secret := range secrets {
_, err := s.manager.client.CoreV1().Secrets(secret.Namespace).Update(secret)
if err != nil {
app.Logger.Error(fmt.Sprintf("upgrade secret %s failure %s", app.ServiceAlias, err.Error()), getLoggerOption("failure"))
logrus.Errorf("upgrade secret %s failure %s", app.ServiceAlias, err.Error())
}
}
handleErr := func(msg string, err error) error {
// ignore ingress and secret error
logrus.Warning(msg)
return nil
}
_ = f.UpgradeSecrets(s.manager.client, &app, oldApp.GetSecrets(), app.GetSecrets(), handleErr)
_ = f.UpgradeIngress(s.manager.client, &app, oldApp.GetIngress(), app.GetIngress(), handleErr)
return s.WaitingReady(app)
}

View File

@ -24,6 +24,8 @@ import (
"os"
"strings"
"github.com/goodrain/rainbond/builder"
"github.com/Sirupsen/logrus"
api_model "github.com/goodrain/rainbond/api/model"
@ -149,7 +151,7 @@ func createUDPDefaultPluginContainer(serviceID string, envs []v1.EnvVar) v1.Cont
}},
Env: envs,
TerminationMessagePath: "",
Image: "goodrain.me/adapter",
Image: builder.REGISTRYDOMAIN + "/adapter",
Resources: createAdapterResources(128, 500),
}
}
@ -239,7 +241,7 @@ func applyDefaultMeshPluginConfig(as *typesv1.AppService, dbmanager db.Manager)
if err != nil {
return "", err
}
pluginID := "tcpmesh" + util.NewUUID()
pluginID := "tcpmesh" + as.ServiceID
cm := &v1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("config-%s-%s", as.ServiceID, pluginID),

View File

@ -20,9 +20,8 @@ package f
import (
"fmt"
"github.com/Sirupsen/logrus"
v1 "github.com/goodrain/rainbond/worker/appm/types/v1"
"github.com/goodrain/rainbond/worker/appm/types/v1"
corev1 "k8s.io/api/core/v1"
extensions "k8s.io/api/extensions/v1beta1"
k8sErrors "k8s.io/apimachinery/pkg/api/errors"
@ -54,7 +53,7 @@ func ApplyOne(clientset *kubernetes.Clientset, app *v1.AppService) error {
}
// update endpoints
for _, ep := range app.GetEndpoints() {
ensureEndpoints(ep, clientset)
EnsureEndpoints(ep, clientset)
}
// update ingress
for _, ing := range app.GetIngress() {
@ -76,16 +75,6 @@ func ApplyOne(clientset *kubernetes.Clientset, app *v1.AppService) error {
logrus.Warningf("error deleting secret(%v): %v", secret, err)
}
}
// delete delEndpoints
for _, ep := range app.GetDelEndpoints() {
err := clientset.CoreV1().Endpoints(ep.Namespace).Delete(ep.Name, &metav1.DeleteOptions{})
if err != nil {
// don't return error, hope it is ok next time
logrus.Warningf("error deleting endpoints(%v): %v", ep, err)
continue
}
logrus.Debugf("successfully deleted endpoints(%v)", ep)
}
// delete delServices
for _, svc := range app.GetDelServices() {
err := clientset.CoreV1().Services(svc.Namespace).Delete(svc.Name, &metav1.DeleteOptions{})
@ -151,9 +140,9 @@ func ensureSecret(secret *corev1.Secret, clientSet kubernetes.Interface) {
}
}
func ensureEndpoints(ep *corev1.Endpoints, clientSet kubernetes.Interface) {
// EnsureEndpoints creates or updates endpoints.
func EnsureEndpoints(ep *corev1.Endpoints, clientSet kubernetes.Interface) {
_, err := clientSet.CoreV1().Endpoints(ep.Namespace).Update(ep)
if err != nil {
if k8sErrors.IsNotFound(err) {
_, err := clientSet.CoreV1().Endpoints(ep.Namespace).Create(ep)
@ -166,15 +155,166 @@ func ensureEndpoints(ep *corev1.Endpoints, clientSet kubernetes.Interface) {
}
}
// If the port has three different values, one of them cannot be 0
func checkRbdEndpoints(rbdEndpoints []*v1.RbdEndpoints) bool {
if len(rbdEndpoints) < 2 {
return true
// UpgradeIngress is used to update *extensions.Ingress.
func UpgradeIngress(clientset *kubernetes.Clientset,
as *v1.AppService,
old, new []*extensions.Ingress,
handleErr func(msg string, err error) error) error {
var oldMap = make(map[string]*extensions.Ingress, len(old))
for i, item := range old {
oldMap[item.Name] = old[i]
}
for _, item := range rbdEndpoints {
if item.Port == 0 {
return false
for _, n := range new {
if o, ok := oldMap[n.Name]; ok {
n.UID = o.UID
n.ResourceVersion = o.ResourceVersion
ing, err := clientset.ExtensionsV1beta1().Ingresses(n.Namespace).Update(n)
if err != nil {
if err := handleErr(fmt.Sprintf("error updating ingress: %+v: err: %v",
ing, err), err); err != nil {
return err
}
continue
}
as.SetIngress(ing)
delete(oldMap, o.Name)
logrus.Debugf("ServiceID: %s; successfully update ingress: %s", as.ServiceID, ing.Name)
} else {
logrus.Debugf("ingress: %+v", n)
ing, err := clientset.ExtensionsV1beta1().Ingresses(n.Namespace).Create(n)
if err != nil {
if err := handleErr(fmt.Sprintf("error creating ingress: %+v: err: %v",
ing, err), err); err != nil {
return err
}
continue
}
as.SetIngress(ing)
logrus.Debugf("ServiceID: %s; successfully create ingress: %s", as.ServiceID, ing.Name)
}
}
return true
for _, ing := range oldMap {
if ing != nil {
if err := clientset.ExtensionsV1beta1().Ingresses(ing.Namespace).Delete(ing.Name,
&metav1.DeleteOptions{}); err != nil {
if err := handleErr(fmt.Sprintf("error deleting ingress: %+v: err: %v",
ing, err), err); err != nil {
return err
}
continue
}
logrus.Debugf("ServiceID: %s; successfully delete ingress: %s", as.ServiceID, ing.Name)
}
}
return nil
}
// UpgradeSecrets is used to update *corev1.Secret.
func UpgradeSecrets(clientset *kubernetes.Clientset,
as *v1.AppService, old, new []*corev1.Secret,
handleErr func(msg string, err error) error) error {
var oldMap = make(map[string]*corev1.Secret, len(old))
for i, item := range old {
oldMap[item.Name] = old[i]
}
for _, n := range new {
if o, ok := oldMap[n.Name]; ok {
n.UID = o.UID
n.ResourceVersion = o.ResourceVersion
sec, err := clientset.CoreV1().Secrets(n.Namespace).Update(n)
if err != nil {
if err := handleErr(fmt.Sprintf("error updating secret: %+v: err: %v",
sec, err), err); err != nil {
return err
}
continue
}
as.SetSecret(sec)
delete(oldMap, o.Name)
logrus.Debugf("ServiceID: %s; successfully update secret: %s", as.ServiceID, sec.Name)
} else {
sec, err := clientset.CoreV1().Secrets(n.Namespace).Create(n)
if err != nil {
if err := handleErr(fmt.Sprintf("error creating secret: %+v: err: %v",
sec, err), err); err != nil {
return err
}
continue
}
as.SetSecret(sec)
logrus.Debugf("ServiceID: %s; successfully create secret: %s", as.ServiceID, sec.Name)
}
}
for _, sec := range oldMap {
if sec != nil {
if err := clientset.CoreV1().Secrets(sec.Namespace).Delete(sec.Name, &metav1.DeleteOptions{}); err != nil {
if err := handleErr(fmt.Sprintf("error deleting secret: %+v: err: %v",
sec, err), err); err != nil {
return err
}
continue
}
logrus.Debugf("ServiceID: %s; successfully delete secret: %s", as.ServiceID, sec.Name)
}
}
return nil
}
// UpgradeEndpoints is used to update *corev1.Endpoints.
func UpgradeEndpoints(clientset *kubernetes.Clientset,
as *v1.AppService, old, new []*corev1.Endpoints,
handleErr func(msg string, err error) error) error {
var oldMap = make(map[string]*corev1.Endpoints, len(old))
for i, item := range old {
oldMap[item.Name] = old[i]
}
for _, n := range new {
if o, ok := oldMap[n.Name]; ok {
ep, err := clientset.CoreV1().Endpoints(n.Namespace).Update(n)
if err != nil {
if e := handleErr(fmt.Sprintf("error updating endpoints: %+v: err: %v",
ep, err), err); e != nil {
return e
}
continue
}
as.AddEndpoints(ep)
delete(oldMap, o.Name)
logrus.Debugf("ServiceID: %s; successfully update endpoints: %s", as.ServiceID, ep.Name)
} else {
_, err := clientset.CoreV1().Endpoints(n.Namespace).Create(n)
if err != nil {
if err := handleErr(fmt.Sprintf("error creating endpoints: %+v: err: %v",
n, err), err); err != nil {
return err
}
continue
}
as.AddEndpoints(n)
logrus.Debugf("ServiceID: %s; successfully create endpoints: %s", as.ServiceID, n.Name)
}
}
for _, sec := range oldMap {
if sec != nil {
if err := clientset.CoreV1().Endpoints(sec.Namespace).Delete(sec.Name, &metav1.DeleteOptions{}); err != nil {
if err := handleErr(fmt.Sprintf("error deleting endpoints: %+v: err: %v",
sec, err), err); err != nil {
return err
}
continue
}
logrus.Debugf("ServiceID: %s; successfully delete endpoints: %s", as.ServiceID, sec.Name)
}
}
return nil
}
// UpdateEndpoints uses clientset to update the given Endpoints.
func UpdateEndpoints(ep *corev1.Endpoints, clientSet *kubernetes.Clientset) {
_, err := clientSet.CoreV1().Endpoints(ep.Namespace).Update(ep)
if err != nil {
logrus.Warningf("error updating endpoints: %+v; err: %v", ep, err)
return
}
logrus.Debugf("Key: %s/%s; Successfully update endpoints", ep.GetNamespace(), ep.GetName())
}

View File

@ -21,15 +21,13 @@ package prober
import (
"context"
"fmt"
"strconv"
"strings"
"github.com/Sirupsen/logrus"
"github.com/eapache/channels"
"github.com/goodrain/rainbond/db"
"github.com/goodrain/rainbond/db/model"
uitlprober "github.com/goodrain/rainbond/util/prober"
"github.com/goodrain/rainbond/util/prober/types/v1"
v1 "github.com/goodrain/rainbond/util/prober/types/v1"
"github.com/goodrain/rainbond/worker/appm/store"
"github.com/goodrain/rainbond/worker/appm/thirdparty/discovery"
appmv1 "github.com/goodrain/rainbond/worker/appm/types/v1"
@ -41,8 +39,8 @@ import (
type Prober interface {
Start()
Stop()
AddProbe(ep *corev1.Endpoints)
StopProbe(ep *corev1.Endpoints)
UpdateProbes(info []*store.ProbeInfo)
StopProbe(uuids []string)
}
// NewProber creates a new third-party service prober.
@ -100,19 +98,14 @@ func (t *tpProbe) Start() {
evt := event.(store.Event)
switch evt.Type {
case store.CreateEvent:
logrus.Debug("create probe")
ep := evt.Obj.(*corev1.Endpoints)
t.AddProbe(ep)
infos := evt.Obj.([]*store.ProbeInfo)
t.UpdateProbes(infos)
case store.UpdateEvent:
logrus.Debug("update probe")
old := evt.Old.(*corev1.Endpoints)
ep := evt.Obj.(*corev1.Endpoints)
t.StopProbe(old)
t.AddProbe(ep)
infos := evt.Obj.([]*store.ProbeInfo)
t.UpdateProbes(infos)
case store.DeleteEvent:
logrus.Debug("delete probe")
ep := evt.Obj.(*corev1.Endpoints)
t.StopProbe(ep)
uuids := evt.Obj.([]string)
t.StopProbe(uuids)
}
case <-t.ctx.Done():
return
@ -126,83 +119,83 @@ func (t *tpProbe) Stop() {
t.cancel()
}
func (t *tpProbe) AddProbe(ep *corev1.Endpoints) {
service, probeInfo, sid := t.createServices(ep)
if service == nil {
logrus.Debugf("Empty service, stop creating probe")
return
}
ip := ep.GetLabels()["ip"]
port, _ := strconv.Atoi(ep.GetLabels()["port"])
// watch
logrus.Debug("enable watcher...")
if t.utilprober.CheckAndAddService(service) {
logrus.Debugf("Service: %+v; Exists", service)
return
}
go func(service *v1.Service) {
watcher := t.utilprober.WatchServiceHealthy(service.Name)
t.utilprober.EnableWatcher(watcher.GetServiceName(), watcher.GetID())
defer watcher.Close()
defer t.utilprober.DisableWatcher(watcher.GetServiceName(), watcher.GetID())
for {
select {
case event := <-watcher.Watch():
if event == nil {
return
}
switch event.Status {
case v1.StatHealthy:
obj := &appmv1.RbdEndpoint{
UUID: service.Name,
IP: ip,
Port: port,
Sid: sid,
func (t *tpProbe) UpdateProbes(infos []*store.ProbeInfo) {
var services []*v1.Service
for _, info := range infos {
service, probeInfo := t.createServices(info)
if service == nil {
logrus.Debugf("Empty service, stop creating probe")
continue
}
services = append(services, service)
// watch
if t.utilprober.CheckIfExist(service) {
continue
}
go func(service *v1.Service, info *store.ProbeInfo) {
watcher := t.utilprober.WatchServiceHealthy(service.Name)
t.utilprober.EnableWatcher(watcher.GetServiceName(), watcher.GetID())
defer watcher.Close()
defer t.utilprober.DisableWatcher(watcher.GetServiceName(), watcher.GetID())
for {
select {
case event := <-watcher.Watch():
if event == nil {
return
}
t.updateCh.In() <- discovery.Event{
Type: discovery.HealthEvent,
Obj: obj,
}
case v1.StatDeath, v1.StatUnhealthy:
if event.ErrorNumber > service.ServiceHealth.MaxErrorsNum {
if probeInfo.Mode == model.OfflineFailureAction.String() {
obj := &appmv1.RbdEndpoint{
UUID: service.Name,
IP: ip,
Port: port,
Sid: sid,
}
t.updateCh.In() <- discovery.Event{
Type: discovery.DeleteEvent,
Obj: obj,
}
} else {
obj := &appmv1.RbdEndpoint{
UUID: service.Name,
IP: ip,
Port: port,
Sid: sid,
}
t.updateCh.In() <- discovery.Event{
Type: discovery.UnhealthyEvent,
Obj: obj,
switch event.Status {
case v1.StatHealthy:
obj := &appmv1.RbdEndpoint{
UUID: info.UUID,
IP: info.IP,
Port: int(info.Port),
Sid: info.Sid,
}
t.updateCh.In() <- discovery.Event{
Type: discovery.HealthEvent,
Obj: obj,
}
case v1.StatDeath, v1.StatUnhealthy:
if event.ErrorNumber > service.ServiceHealth.MaxErrorsNum {
if probeInfo.Mode == model.OfflineFailureAction.String() {
obj := &appmv1.RbdEndpoint{
UUID: info.UUID,
IP: info.IP,
Port: int(info.Port),
Sid: info.Sid,
}
t.updateCh.In() <- discovery.Event{
Type: discovery.DeleteEvent,
Obj: obj,
}
} else {
obj := &appmv1.RbdEndpoint{
UUID: info.UUID,
IP: info.IP,
Port: int(info.Port),
Sid: info.Sid,
}
t.updateCh.In() <- discovery.Event{
Type: discovery.UnhealthyEvent,
Obj: obj,
}
}
}
}
case <-t.ctx.Done():
// TODO: should stop for one service, not all services.
return
}
case <-t.ctx.Done():
return
}
}
}(service)
// start
t.utilprober.UpdateServicesProbe([]*v1.Service{service})
}(service, info)
}
t.utilprober.UpdateServicesProbe(services)
}
func (t *tpProbe) StopProbe(ep *corev1.Endpoints) {
name := t.createServiceNames(ep)
logrus.Debugf("Name: %s; Stop probe.", name)
t.utilprober.StopProbes([]string{name})
func (t *tpProbe) StopProbe(uuids []string) {
for _, name := range uuids {
t.utilprober.StopProbes([]string{name})
}
}
// GetProbeInfo returns probe info associated with sid.
@ -210,12 +203,11 @@ func (t *tpProbe) StopProbe(ep *corev1.Endpoints) {
// If there is no probe in the database, return a default probe
func (t *tpProbe) GetProbeInfo(sid string) (*model.TenantServiceProbe, error) {
probes, err := t.dbm.ServiceProbeDao().GetServiceProbes(sid)
if err != nil || probes == nil || len(probes) == 0 || probes[0].IsUsed == 0 {
if err != nil || probes == nil || len(probes) == 0 || *(probes[0].IsUsed) == 0 {
if err != nil {
logrus.Warningf("ServiceID: %s; error getting probes: %v", sid, err)
}
// no defined probe, use default one
logrus.Debugf("no defined probe, use default one")
return &model.TenantServiceProbe{
Scheme: "tcp",
PeriodSecond: 5,
@ -226,31 +218,23 @@ func (t *tpProbe) GetProbeInfo(sid string) (*model.TenantServiceProbe, error) {
return probes[0], nil
}
func (t *tpProbe) createServices(ep *corev1.Endpoints) (*v1.Service, *model.TenantServiceProbe, string) {
sid := ep.GetLabels()["service_id"]
ip := ep.GetLabels()["ip"]
port := ep.GetLabels()["port"]
if strings.TrimSpace(sid) == "" {
logrus.Warningf("Endpoints key: %s; ServiceID not found, stop creating probe",
fmt.Sprintf("%s/%s", ep.Namespace, ep.Name))
return nil, nil, ""
}
probeInfo, err := t.GetProbeInfo(sid)
func (t *tpProbe) createServices(probeInfo *store.ProbeInfo) (*v1.Service, *model.TenantServiceProbe) {
tsp, err := t.GetProbeInfo(probeInfo.Sid)
if err != nil {
logrus.Warningf("ServiceID: %s; Unexpected error occurred, ignore the creation of "+
"probes: %s", sid, err.Error())
return nil, nil, ""
"probes: %s", probeInfo.Sid, err.Error())
return nil, nil
}
if probeInfo.Mode == "liveness" {
probeInfo.Mode = model.IgnoreFailureAction.String()
if tsp.Mode == "liveness" {
tsp.Mode = model.IgnoreFailureAction.String()
}
service := createService(probeInfo)
service.Name = ep.GetLabels()["uuid"]
portint, _ := strconv.Atoi(port)
service.ServiceHealth.Port = portint
service.ServiceHealth.Name = service.Name // TODO: no need?
service.ServiceHealth.Address = fmt.Sprintf("%s:%s", ip, port)
return service, probeInfo, sid
service := createService(tsp)
service.Sid = probeInfo.Sid
service.Name = probeInfo.UUID
service.ServiceHealth.Port = int(probeInfo.Port)
service.ServiceHealth.Name = service.Name
service.ServiceHealth.Address = fmt.Sprintf("%s:%d", probeInfo.IP, probeInfo.Port)
return service, tsp
}
func (t *tpProbe) createServiceNames(ep *corev1.Endpoints) string {

View File

@ -20,6 +20,7 @@ package store
import (
"context"
"encoding/json"
"fmt"
"sync"
"time"
@ -78,7 +79,14 @@ const (
type Event struct {
Type EventType
Obj interface{}
Old interface{}
}
// ProbeInfo holds the context of a probe.
type ProbeInfo struct {
Sid string `json:"sid"`
UUID string `json:"uuid"`
IP string `json:"ip"`
Port int32 `json:"port"`
}
//appRuntimeStore app runtime store
@ -154,21 +162,22 @@ func NewStore(clientset *kubernetes.Clientset,
epEventHandler := cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
ep := obj.(*corev1.Endpoints)
logrus.Debugf("received add endpoints: %+v", ep)
serviceID := ep.Labels["service_id"]
version := ep.Labels["version"]
createrID := ep.Labels["creater_id"]
if serviceID != "" && createrID != "" {
appservice, err := store.getAppService(serviceID, version, createrID, true)
if err == conversion.ErrServiceNotFound {
store.conf.KubeClient.CoreV1().Endpoints(ep.Namespace).Delete(ep.Name, &metav1.DeleteOptions{})
logrus.Debugf("ServiceID: %s; Action: AddFunc; service not found", serviceID)
}
if appservice != nil {
appservice.AddEndpoints(ep)
if isThirdParty(ep) {
if isThirdParty(ep) && ep.Subsets != nil && len(ep.Subsets) > 0 {
logrus.Debugf("received add endpoints: %+v", ep)
probeInfos := listProbeInfos(ep, serviceID)
probeCh.In() <- Event{
Type: CreateEvent,
Obj: obj,
Obj: probeInfos,
}
}
return
@ -177,7 +186,6 @@ func NewStore(clientset *kubernetes.Clientset,
},
DeleteFunc: func(obj interface{}) {
ep := obj.(*corev1.Endpoints)
logrus.Debugf("received delete endpoints: %+v", ep)
serviceID := ep.Labels["service_id"]
version := ep.Labels["version"]
createrID := ep.Labels["creater_id"]
@ -186,12 +194,18 @@ func NewStore(clientset *kubernetes.Clientset,
if appservice != nil {
appservice.DelEndpoints(ep)
if appservice.IsClosed() {
logrus.Debugf("ServiceID: %s; Action: DeleteFunc;service is closed", serviceID)
store.DeleteAppService(appservice)
}
if isThirdParty(ep) {
logrus.Debugf("received delete endpoints: %+v", ep)
var uuids []string
for _, item := range ep.Subsets {
uuids = append(uuids, item.Ports[0].Name)
}
probeCh.In() <- Event{
Type: DeleteEvent,
Obj: obj,
Obj: uuids,
}
}
}
@ -206,20 +220,18 @@ func NewStore(clientset *kubernetes.Clientset,
if serviceID != "" && createrID != "" {
appservice, err := store.getAppService(serviceID, version, createrID, true)
if err == conversion.ErrServiceNotFound {
logrus.Debug("ServiceID: error service not found")
store.conf.KubeClient.CoreV1().Endpoints(cep.Namespace).Delete(cep.Name, &metav1.DeleteOptions{})
logrus.Debugf("ServiceID: %s; Action: UpdateFunc; service not found", serviceID)
}
if appservice != nil {
appservice.AddEndpoints(cep)
if isThirdParty(cep) {
curInfos := listProbeInfos(cep, serviceID)
probeCh.In() <- Event{
Type: UpdateEvent,
Obj: cur,
Old: old,
Obj: curInfos,
}
}
}
//}
}
},
}
@ -236,6 +248,67 @@ func NewStore(clientset *kubernetes.Clientset,
return store
}
func listProbeInfos(ep *corev1.Endpoints, sid string) []*ProbeInfo {
var probeInfos []*ProbeInfo
for _, subset := range ep.Subsets {
uuid := subset.Ports[0].Name
port := subset.Ports[0].Port
for _, address := range subset.NotReadyAddresses {
info := &ProbeInfo{
Sid: sid,
UUID: uuid,
IP: address.IP,
Port: port,
}
probeInfos = append(probeInfos, info)
}
for _, address := range subset.Addresses {
info := &ProbeInfo{
Sid: sid,
UUID: uuid,
IP: address.IP,
Port: port,
}
probeInfos = append(probeInfos, info)
}
}
return probeInfos
}
func upgradeProbe(ch chan<- interface{}, old, cur []*ProbeInfo) {
ob, _ := json.Marshal(old)
cb, _ := json.Marshal(cur)
logrus.Debugf("Old probe infos: %s", string(ob))
logrus.Debugf("Current probe infos: %s", string(cb))
oldMap := make(map[string]*ProbeInfo, len(old))
for i := 0; i < len(old); i++ {
oldMap[old[i].UUID] = old[i]
}
for _, c := range cur {
if info := oldMap[c.UUID]; info != nil {
delete(oldMap, c.UUID)
logrus.Debugf("UUID: %s; update probe", c.UUID)
ch <- Event{
Type: UpdateEvent,
Obj: c,
}
} else {
logrus.Debugf("UUID: %s; create probe", c.UUID)
ch <- Event{
Type: CreateEvent,
Obj: []*ProbeInfo{c},
}
}
}
for _, info := range oldMap {
logrus.Debugf("UUID: %s; delete probe", info.UUID)
ch <- Event{
Type: DeleteEvent,
Obj: info,
}
}
}
func (a *appRuntimeStore) init() error {
//init leader namespace
leaderNamespace := a.conf.LeaderElectionNamespace

View File

@ -35,9 +35,11 @@ const (
// UpdateEvent event associated with an object update in a service discovery center
UpdateEvent EventType = "UPDATE"
// DeleteEvent event associated when an object is removed from a service discovery center
DeleteEvent EventType = "DELETE"
DeleteEvent EventType = "DELETE"
// UnhealthyEvent -
UnhealthyEvent EventType = "UNHEALTHY"
HealthEvent EventType = "HEALTH"
// HealthEvent -
HealthEvent EventType = "HEALTH"
)
// Event holds the context of an event.

View File

@ -21,19 +21,18 @@ package thirdparty
import (
"encoding/json"
"fmt"
"github.com/goodrain/rainbond/worker/util"
"strconv"
"sync"
"github.com/Sirupsen/logrus"
"github.com/eapache/channels"
"github.com/goodrain/rainbond/db"
"github.com/goodrain/rainbond/db/model"
"github.com/goodrain/rainbond/worker/appm/f"
"github.com/goodrain/rainbond/worker/appm/store"
"github.com/goodrain/rainbond/worker/appm/thirdparty/discovery"
"github.com/goodrain/rainbond/worker/appm/types/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
k8sErrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
)
@ -63,7 +62,7 @@ func NewThirdPartier(clientset *kubernetes.Clientset,
}
type thirdparty struct {
clientset kubernetes.Interface
clientset *kubernetes.Clientset
store store.Storer
// a collection of stop channel for every service.
@ -87,13 +86,14 @@ func (t *thirdparty) Start() {
}
logrus.Debugf("Received event: %+v", evt)
if evt.Type == v1.StartEvent { // no need to distinguish between event types
needWatch := false
stopCh := t.svcStopCh[evt.Sid]
if stopCh != nil {
if stopCh == nil {
logrus.Debugf("ServiceID: %s; already started.", evt.Sid)
continue
needWatch = true
t.svcStopCh[evt.Sid] = make(chan struct{})
}
t.svcStopCh[evt.Sid] = make(chan struct{})
go t.runStart(evt.Sid)
go t.runStart(evt.Sid, needWatch)
}
if evt.Type == v1.StopEvent {
stopCh := t.svcStopCh[evt.Sid]
@ -114,7 +114,7 @@ func (t *thirdparty) Start() {
go t.runUpdate(devent)
case <-t.stopCh:
for _, stopCh := range t.svcStopCh {
close(stopCh)
close(stopCh) // TODO: close of closed channel
}
break
}
@ -122,38 +122,33 @@ func (t *thirdparty) Start() {
}()
}
func (t *thirdparty) runStart(sid string) {
logrus.Debugf("ServiceID: %s; run start...", sid)
func (t *thirdparty) runStart(sid string, needWatch bool) {
as := t.store.GetAppService(sid)
// TODO: when an error occurs, consider retrying.
rbdeps, d := t.ListRbdEndpoints(sid)
b, _ := json.Marshal(rbdeps)
logrus.Debugf("ServiceID: %s; rbd endpoints: %+v", sid, string(b))
// TODO: empty rbdeps
if rbdeps == nil || len(rbdeps) == 0 {
logrus.Warningf("ServiceID: %s;Empty rbd endpoints, stop starting third-party service.", sid)
var err error
for i := 3; i > 0; i-- { // retry 3 times
rbdeps, ir := t.ListRbdEndpoints(sid)
if rbdeps == nil || len(rbdeps) == 0 {
logrus.Warningf("ServiceID: %s;Empty rbd endpoints, stop starting third-party service.", sid)
continue
}
var eps []*corev1.Endpoints
eps, err = t.k8sEndpoints(as, rbdeps)
if err != nil {
logrus.Warningf("ServiceID: %s; error creating k8s endpoints: %s", sid, err.Error())
continue
}
for _, ep := range eps {
f.EnsureEndpoints(ep, t.clientset)
}
if needWatch && ir != nil {
ir.Watch()
}
logrus.Infof("ServiceID: %s; successfully running start task", sid)
return
}
eps, err := t.createK8sEndpoints(as, rbdeps)
if err != nil {
logrus.Errorf("ServiceID: %s; error creating k8s endpoints: %s", sid, err.Error())
return
}
// find out old endpoints, and delete it.
old := as.GetEndpoints()
// TODO: can do better
del := findDeletedEndpoints(old, eps)
for _, ep := range del {
deleteEndpoints(ep, t.clientset)
}
for _, ep := range eps {
ensureEndpoints(ep, t.clientset)
}
if d != nil {
d.Watch()
}
logrus.Errorf("ServiceID: %s; error running start task: %v", sid, err)
}
// ListRbdEndpoints lists all rbd endpoints, include static and dynamic.
@ -181,28 +176,20 @@ func (t *thirdparty) ListRbdEndpoints(sid string) ([]*v1.RbdEndpoint, Interacter
return res, d
}
func findDeletedEndpoints(old, new []*corev1.Endpoints) []*corev1.Endpoints {
if old == nil {
logrus.Debugf("empty old endpoints.")
return nil
}
var res []*corev1.Endpoints
for _, o := range old {
del := true
for _, n := range new {
if o.Name == n.Name {
del = false
break
func deleteSubset(as *v1.AppService, rbdep *v1.RbdEndpoint) {
eps := as.GetEndpoints()
for _, ep := range eps {
for idx, item := range ep.Subsets {
if item.Ports[0].Name == rbdep.UUID {
logrus.Debugf("UUID: %s; subset deleted", rbdep.UUID)
ep.Subsets[idx] = ep.Subsets[len(ep.Subsets)-1]
ep.Subsets = ep.Subsets[:len(ep.Subsets)-1]
}
}
if del {
res = append(res, o)
}
}
return res
}
func (t *thirdparty) createK8sEndpoints(as *v1.AppService, epinfo []*v1.RbdEndpoint) ([]*corev1.Endpoints, error) {
func (t *thirdparty) k8sEndpoints(as *v1.AppService, epinfo []*v1.RbdEndpoint) ([]*corev1.Endpoints, error) {
ports, err := db.GetManager().TenantServicesPortDao().GetPortsByServiceID(as.ServiceID)
if err != nil {
return nil, err
@ -213,258 +200,201 @@ func (t *thirdparty) createK8sEndpoints(as *v1.AppService, epinfo []*v1.RbdEndpo
}
p := ports[0]
logrus.Debugf("create outer third-party service")
f := func() []*corev1.Endpoints {
var eps []*corev1.Endpoints
for _, epi := range epinfo {
port, realport := func(targetPort int, realPort int) (int32, bool) { // final port
if realPort == 0 {
return int32(targetPort), false
}
return int32(realPort), true
}(p.ContainerPort, epi.Port)
ep := corev1.Endpoints{}
ep.Namespace = as.TenantID
if p.IsInnerService {
ep.Name = util.CreateEndpointsName(as.TenantName, as.ServiceAlias, epi.UUID)
ep.Name = epi.UUID
ep.Labels = as.GetCommonLabels(map[string]string{
"name": as.ServiceAlias + "Service",
"service-kind": model.ServiceKindThirdParty.String(),
})
}
if p.IsOuterService {
ep.Name = util.CreateEndpointsName(as.TenantName, as.ServiceAlias, epi.UUID) + "out"
ep.Labels = as.GetCommonLabels(map[string]string{
"name": as.ServiceAlias + "ServiceOUT",
"service-kind": model.ServiceKindThirdParty.String(),
})
}
ep.Labels["uuid"] = epi.UUID
ep.Labels["ip"] = epi.IP
ep.Labels["port"] = strconv.Itoa(int(port))
ep.Labels["real-port"] = strconv.FormatBool(realport)
subset := corev1.EndpointSubset{
Ports: []corev1.EndpointPort{
{
Port: port,
},
},
Addresses: []corev1.EndpointAddress{
{
IP: epi.IP,
},
},
}
ep.Subsets = append(ep.Subsets, subset)
eps = append(eps, &ep)
}
return eps
}
var res []*corev1.Endpoints
if p.IsInnerService {
res = append(res, f()...)
ep := &corev1.Endpoints{}
ep.Namespace = as.TenantID
// inner or outer
if p.IsInnerService {
ep.Name = fmt.Sprintf("service-%d-%d", p.ID, p.ContainerPort)
ep.Labels = as.GetCommonLabels(map[string]string{
"name": as.ServiceAlias + "Service",
"service-kind": model.ServiceKindThirdParty.String(),
})
}
res = append(res, ep)
}
if p.IsOuterService {
res = append(res, f()...)
ep := &corev1.Endpoints{}
ep.Namespace = as.TenantID
// inner or outer
if p.IsOuterService {
ep.Name = fmt.Sprintf("service-%d-%dout", p.ID, p.ContainerPort)
ep.Labels = as.GetCommonLabels(map[string]string{
"name": as.ServiceAlias + "ServiceOUT",
"service-kind": model.ServiceKindThirdParty.String(),
})
}
res = append(res, ep)
}
var subsets []corev1.EndpointSubset
for _, epi := range epinfo {
subset := corev1.EndpointSubset{
Ports: []corev1.EndpointPort{
{
Name: epi.UUID,
Port: func(targetPort int, realPort int) int32 {
if realPort == 0 {
return int32(targetPort)
}
return int32(realPort)
}(p.ContainerPort, epi.Port),
Protocol: corev1.ProtocolTCP,
},
},
Addresses: []corev1.EndpointAddress{
{
IP: epi.IP,
},
},
}
subsets = append(subsets, subset)
}
for _, item := range res {
item.Subsets = subsets
}
return res, nil
}
func deleteEndpoints(ep *corev1.Endpoints, clientset kubernetes.Interface) {
err := clientset.CoreV1().Endpoints(ep.Namespace).Delete(ep.Name, &metav1.DeleteOptions{})
func updateSubset(as *v1.AppService, rbdep *v1.RbdEndpoint) error {
ports, err := db.GetManager().TenantServicesPortDao().GetPortsByServiceID(as.ServiceID)
if err != nil {
logrus.Debugf("Ignore; error deleting endpoints%+v: %v", ep, err)
return err
}
}
func ensureEndpoints(ep *corev1.Endpoints, clientSet kubernetes.Interface) {
logrus.Debugf("ensure endpoints: %+v", ep)
_, err := clientSet.CoreV1().Endpoints(ep.Namespace).Update(ep)
if err != nil {
if k8sErrors.IsNotFound(err) {
_, err := clientSet.CoreV1().Endpoints(ep.Namespace).Create(ep)
if err != nil {
logrus.Warningf("error creating endpoints %+v: %v", ep, err)
// third-party service can only have one port
if ports == nil || len(ports) == 0 {
return fmt.Errorf("Port not found")
}
p := ports[0]
subset := corev1.EndpointSubset{
Ports: []corev1.EndpointPort{
{
Name: rbdep.UUID,
Port: func(targetPort int, realPort int) int32 {
if realPort == 0 {
return int32(targetPort)
}
return int32(realPort)
}(p.ContainerPort, rbdep.Port),
Protocol: corev1.ProtocolTCP,
},
},
Addresses: []corev1.EndpointAddress{
{
IP: rbdep.IP,
},
},
}
for _, ep := range as.GetEndpoints() {
exist := false
for idx, item := range ep.Subsets {
if item.Ports[0].Name == subset.Ports[0].Name {
ep.Subsets[idx] = item
exist = true
break
}
return
}
logrus.Warningf("error updating endpoints %+v: %v", ep, err)
}
}
func ensureConfigMap(cm *corev1.ConfigMap, clientSet kubernetes.Interface) {
_, err := clientSet.CoreV1().ConfigMaps(cm.Namespace).Update(cm)
if err != nil {
if k8sErrors.IsNotFound(err) {
_, err := clientSet.CoreV1().ConfigMaps(cm.Namespace).Create(cm)
if err != nil {
logrus.Warningf("error creating ConfigMaps %+v: %v", cm, err)
}
return
if !exist {
ep.Subsets = append(ep.Subsets, subset)
}
logrus.Warningf("error updating ConfigMaps %+v: %v", cm, err)
}
return nil
}
func (t *thirdparty) runUpdate(event discovery.Event) {
ep := event.Obj.(*v1.RbdEndpoint)
as := t.store.GetAppService(ep.Sid)
switch event.Type {
case discovery.CreateEvent:
b, _ := json.Marshal(ep)
logrus.Debugf("Run update; Event received: Type: %v; Body: %s", event.Type, string(b))
endpoints, err := t.createK8sEndpoints(as, []*v1.RbdEndpoint{ep})
if err != nil {
logrus.Warningf("ServiceID: %s; error creating k8s endpoints struct: %s",
ep.Sid, err.Error())
return
}
for _, ep := range endpoints {
ensureEndpoints(ep, t.clientset)
}
case discovery.UpdateEvent:
b, _ := json.Marshal(ep)
logrus.Debugf("Run update; Event received: Type: %v; Body: %s", event.Type, string(b))
// TODO: Compare old and new endpoints
// TODO: delete old endpoints
if !ep.IsOnline {
eps := ListOldEndpoints(as, ep)
for _, item := range eps {
deleteEndpoints(item, t.clientset)
fc := func(as *v1.AppService, rbdep *v1.RbdEndpoint, ready bool, msg string, condition func(subset corev1.EndpointSubset) bool) {
var wait sync.WaitGroup
go func() {
wait.Add(1)
defer wait.Done()
for _, ep := range as.GetEndpoints() {
for idx, subset := range ep.Subsets {
if subset.Ports[0].Name == rbdep.UUID && condition(subset) {
logrus.Debugf("Executed; health: %v; msg: %s", ready, msg)
ep.Subsets[idx] = createSubset(rbdep, ready)
f.UpdateEndpoints(ep, t.clientset)
}
}
}
return
}
endpoints, err := t.createK8sEndpoints(as, []*v1.RbdEndpoint{ep})
}()
wait.Wait()
}
rbdep := event.Obj.(*v1.RbdEndpoint)
as := t.store.GetAppService(rbdep.Sid)
b, _ := json.Marshal(rbdep)
msg := fmt.Sprintf("Run update; Event received: Type: %v; Body: %s", event.Type, string(b))
switch event.Type {
case discovery.CreateEvent, discovery.UpdateEvent:
logrus.Debug(msg)
err := updateSubset(as, rbdep)
if err != nil {
logrus.Warningf("ServiceID: %s; error creating k8s endpoints struct: %s",
ep.Sid, err.Error())
logrus.Warningf("ServiceID: %s; error adding subset: %s",
rbdep.Sid, err.Error())
return
}
for _, ep := range endpoints {
ensureEndpoints(ep, t.clientset)
}
_ = f.UpgradeEndpoints(t.clientset, as, as.GetEndpoints(), as.GetEndpoints(),
func(msg string, err error) error {
logrus.Warning(msg)
return nil
})
case discovery.DeleteEvent:
b, _ := json.Marshal(ep)
logrus.Debugf("Run update; Event received: Type: %v; Body: %s", event.Type, string(b))
eps := ListOldEndpoints(as, ep)
for _, item := range eps {
deleteEndpoints(item, t.clientset)
}
logrus.Debug(msg)
deleteSubset(as, rbdep)
eps := as.GetEndpoints()
_ = f.UpgradeEndpoints(t.clientset, as, as.GetEndpoints(), eps,
func(msg string, err error) error {
logrus.Warning(msg)
return nil
})
case discovery.HealthEvent:
subset := createSubset(ep, false)
eps := ListOldEndpoints(as, ep)
for _, ep := range eps {
if !isHealth(ep) {
ep.Subsets = []corev1.EndpointSubset{
subset,
}
ensureEndpoints(ep, t.clientset)
}
isUnhealthy := func(subset corev1.EndpointSubset) bool {
return !isHealthy(subset)
}
fc(as, rbdep, true, msg, isUnhealthy)
case discovery.UnhealthyEvent:
subset := createSubset(ep, true)
eps := ListOldEndpoints(as, ep)
for _, ep := range eps {
if isHealth(ep) {
ep.Subsets = []corev1.EndpointSubset{
subset,
}
ensureEndpoints(ep, t.clientset)
}
}
fc(as, rbdep, false, msg, isHealthy)
}
}
func (t *thirdparty) runDelete(sid string) {
as := t.store.GetAppService(sid) // TODO: need to delete?
if services := as.GetServices(); services != nil {
for _, service := range services {
err := t.clientset.CoreV1().Services(as.TenantID).Delete(service.Name, &metav1.DeleteOptions{})
if err != nil && !errors.IsNotFound(err) {
logrus.Warningf("error deleting service: %v", err)
}
}
}
if secrets := as.GetSecrets(); secrets != nil {
for _, secret := range secrets {
if secret != nil {
err := t.clientset.CoreV1().Secrets(as.TenantID).Delete(secret.Name, &metav1.DeleteOptions{})
if err != nil && !errors.IsNotFound(err) {
logrus.Warningf("error deleting secrets: %v", err)
}
t.store.OnDelete(secret)
}
}
}
if ingresses := as.GetIngress(); ingresses != nil {
for _, ingress := range ingresses {
err := t.clientset.ExtensionsV1beta1().Ingresses(as.TenantID).Delete(ingress.Name, &metav1.DeleteOptions{})
if err != nil && !errors.IsNotFound(err) {
logrus.Warningf("error deleting ingress: %v", err)
}
t.store.OnDelete(ingress)
}
}
if configs := as.GetConfigMaps(); configs != nil {
for _, config := range configs {
err := t.clientset.CoreV1().ConfigMaps(as.TenantID).Delete(config.Name, &metav1.DeleteOptions{})
if err != nil && !errors.IsNotFound(err) {
logrus.Warningf("error deleting config map: %v", err)
}
t.store.OnDelete(config)
}
}
if eps := as.GetEndpoints(); eps != nil {
for _, ep := range eps {
logrus.Debugf("Endpoints delete: %+v", ep)
err := t.clientset.CoreV1().Endpoints(as.TenantID).Delete(ep.Name, &metav1.DeleteOptions{})
if err != nil && !errors.IsNotFound(err) {
logrus.Warningf("error deleting endpoin empty old app servicets: %v", err)
logrus.Warningf("error deleting endpoint empty old app endpoints: %v", err)
}
t.store.OnDelete(ep)
}
}
}
func ListOldEndpoints(as *v1.AppService, ep *v1.RbdEndpoint) []*corev1.Endpoints {
var res []*corev1.Endpoints
for _, item := range as.GetEndpoints() {
if item.GetLabels()["uuid"] == ep.UUID {
res = append(res, item)
}
}
return res
}
func createSubset(ep *v1.RbdEndpoint, notReady bool) corev1.EndpointSubset {
func createSubset(ep *v1.RbdEndpoint, ready bool) corev1.EndpointSubset {
address := corev1.EndpointAddress{
IP: ep.IP,
}
port := corev1.EndpointPort{
Port: int32(ep.Port),
Name: ep.UUID,
Port: int32(ep.Port),
Protocol: corev1.ProtocolTCP,
}
subset := corev1.EndpointSubset{}
if notReady {
subset.NotReadyAddresses = append(subset.Addresses, address)
} else {
if ready {
subset.Addresses = append(subset.Addresses, address)
} else {
subset.NotReadyAddresses = append(subset.NotReadyAddresses, address)
}
subset.Ports = append(subset.Ports, port)
return subset
}
func isHealth(ep *corev1.Endpoints) bool {
if ep.Subsets == nil || len(ep.Subsets) == 0 {
return false
}
if ep.Subsets[0].Addresses != nil && len(ep.Subsets[0].Addresses) > 0 {
func isHealthy(subset corev1.EndpointSubset) bool {
if subset.Addresses != nil && len(subset.Addresses) > 0 {
return true
}
return false

View File

@ -23,6 +23,8 @@ import (
"os"
"strconv"
"github.com/goodrain/rainbond/builder"
"github.com/Sirupsen/logrus"
"github.com/goodrain/rainbond/db/model"
"github.com/goodrain/rainbond/event"
@ -104,7 +106,6 @@ type AppService struct {
services []*corev1.Service
delServices []*corev1.Service
endpoints []*corev1.Endpoints
delEndpoints []*corev1.Endpoints
configMaps []*corev1.ConfigMap
ingresses []*extensions.Ingress
delIngs []*extensions.Ingress // ingresses which need to be deleted
@ -317,11 +318,6 @@ func (a *AppService) GetEndpointsByName(name string) *corev1.Endpoints {
return nil
}
// GetDelEndpoints returns endpoints that need to be deleted in AppService
func (a *AppService) GetDelEndpoints() []*corev1.Endpoints {
return a.delEndpoints
}
//DelEndpoints deletes *corev1.Endpoints
func (a *AppService) DelEndpoints(ep *corev1.Endpoints) {
for i, c := range a.endpoints {
@ -576,7 +572,7 @@ func GetTCPMeshImageName() string {
if d := os.Getenv("TCPMESH_DEFAULT_IMAGE_NAME"); d != "" {
return d
}
return "goodrain.me/mesh_plugin"
return builder.REGISTRYDOMAIN + "/mesh_plugin"
}
//GetProbeMeshImageName get probe init mesh image name
@ -584,5 +580,5 @@ func GetProbeMeshImageName() string {
if d := os.Getenv("PROBE_MESH_IMAGE_NAME"); d != "" {
return d
}
return "goodrain.me/rbd-init-probe"
return builder.REGISTRYDOMAIN + "/rbd-init-probe"
}

View File

@ -615,7 +615,8 @@ func (ctrl *ProvisionController) processNextClaimWorkItem() bool {
if err := ctrl.syncClaimHandler(key); err != nil {
if ctrl.claimQueue.NumRequeues(obj) < ctrl.failedProvisionThreshold {
logrus.Warningf("Retrying syncing claim %q because failures %v < threshold %v", key, ctrl.claimQueue.NumRequeues(obj), ctrl.failedProvisionThreshold)
logrus.Warningf("Retrying syncing claim %q because failures %v < threshold %v",
key, ctrl.claimQueue.NumRequeues(obj), ctrl.failedProvisionThreshold)
ctrl.claimQueue.AddRateLimited(obj)
} else {
logrus.Errorf("Giving up syncing claim %q because failures %v >= threshold %v", key, ctrl.claimQueue.NumRequeues(obj), ctrl.failedProvisionThreshold)

View File

@ -427,10 +427,11 @@ type DeployInfo struct {
Deployment string `protobuf:"bytes,3,opt,name=deployment,proto3" json:"deployment,omitempty"`
Pods map[string]string `protobuf:"bytes,4,rep,name=pods,proto3" json:"pods,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
Services map[string]string `protobuf:"bytes,5,rep,name=services,proto3" json:"services,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
Secrets map[string]string `protobuf:"bytes,6,rep,name=secrets,proto3" json:"secrets,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
Ingresses map[string]string `protobuf:"bytes,7,rep,name=ingresses,proto3" json:"ingresses,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
Replicatset map[string]string `protobuf:"bytes,8,rep,name=replicatset,proto3" json:"replicatset,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
Status string `protobuf:"bytes,9,opt,name=status,proto3" json:"status,omitempty"`
Endpoints map[string]string `protobuf:"bytes,6,rep,name=endpoints,proto3" json:"endpoints,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
Secrets map[string]string `protobuf:"bytes,7,rep,name=secrets,proto3" json:"secrets,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
Ingresses map[string]string `protobuf:"bytes,8,rep,name=ingresses,proto3" json:"ingresses,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
Replicatset map[string]string `protobuf:"bytes,9,rep,name=replicatset,proto3" json:"replicatset,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"`
Status string `protobuf:"bytes,10,opt,name=status,proto3" json:"status,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
@ -496,6 +497,13 @@ func (m *DeployInfo) GetServices() map[string]string {
return nil
}
func (m *DeployInfo) GetEndpoints() map[string]string {
if m != nil {
return m.Endpoints
}
return nil
}
func (m *DeployInfo) GetSecrets() map[string]string {
if m != nil {
return m.Secrets
@ -916,6 +924,7 @@ func init() {
proto.RegisterMapType((map[string]*Container)(nil), "pb.ServiceAppPod.ContainersEntry")
proto.RegisterType((*Container)(nil), "pb.Container")
proto.RegisterType((*DeployInfo)(nil), "pb.DeployInfo")
proto.RegisterMapType((map[string]string)(nil), "pb.DeployInfo.EndpointsEntry")
proto.RegisterMapType((map[string]string)(nil), "pb.DeployInfo.IngressesEntry")
proto.RegisterMapType((map[string]string)(nil), "pb.DeployInfo.PodsEntry")
proto.RegisterMapType((map[string]string)(nil), "pb.DeployInfo.ReplicatsetEntry")
@ -932,72 +941,73 @@ func init() {
func init() { proto.RegisterFile("app_runtime_server.proto", fileDescriptor_f94cf1a886c479d6) }
var fileDescriptor_f94cf1a886c479d6 = []byte{
// 1026 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xc4, 0x56, 0xdb, 0x6e, 0xdb, 0x46,
0x10, 0xd5, 0xdd, 0xd2, 0xc8, 0x92, 0xe3, 0x49, 0x9d, 0xd0, 0x4a, 0xdc, 0x24, 0x2c, 0x52, 0xf8,
0x21, 0x50, 0x0b, 0x17, 0x46, 0x1d, 0x27, 0x68, 0x21, 0xc4, 0x86, 0x2b, 0xc0, 0x4d, 0x0d, 0xda,
0x79, 0x16, 0x68, 0x72, 0xe3, 0x6e, 0x2d, 0x2e, 0xb7, 0xdc, 0xa5, 0x01, 0x3e, 0xb6, 0xbf, 0x90,
0x1f, 0xea, 0x17, 0xf4, 0xad, 0xff, 0x53, 0xec, 0x85, 0x14, 0x75, 0x09, 0x5c, 0xb7, 0x05, 0xf2,
0xb6, 0x3c, 0x3b, 0x67, 0x66, 0xe7, 0x0c, 0x67, 0x76, 0xc1, 0xf1, 0x39, 0x9f, 0x24, 0x29, 0x93,
0x34, 0x22, 0x13, 0x41, 0x92, 0x1b, 0x92, 0x0c, 0x79, 0x12, 0xcb, 0x18, 0x6b, 0xfc, 0xd2, 0x5d,
0x83, 0xe6, 0x71, 0xc4, 0x65, 0xe6, 0x7e, 0x05, 0xfd, 0x73, 0x92, 0xdc, 0xd0, 0x80, 0x78, 0xe4,
0xd7, 0x94, 0x08, 0x89, 0x3b, 0x00, 0xc2, 0x20, 0x13, 0x1a, 0x3a, 0xd5, 0xa7, 0xd5, 0xdd, 0x8e,
0xd7, 0xb1, 0xc8, 0x38, 0x74, 0xf7, 0x60, 0xc3, 0x12, 0x44, 0xce, 0x78, 0x02, 0xdd, 0x19, 0x43,
0x58, 0x0a, 0x14, 0x14, 0xe1, 0xbe, 0x80, 0xde, 0x05, 0x61, 0x3e, 0x93, 0x39, 0xe3, 0x11, 0x74,
0xa4, 0x06, 0x66, 0x21, 0xda, 0x06, 0x18, 0x87, 0xee, 0x6f, 0x55, 0xe8, 0x9d, 0x4b, 0x5f, 0xa6,
0xe2, 0x47, 0x22, 0x84, 0x7f, 0x45, 0x70, 0x1f, 0x5a, 0x42, 0x03, 0x4e, 0xf5, 0x69, 0x7d, 0xb7,
0xbb, 0xb7, 0x33, 0xe4, 0x97, 0xc3, 0x39, 0x13, 0xfb, 0x75, 0xcc, 0x64, 0x92, 0x79, 0xd6, 0x78,
0xf0, 0x12, 0xba, 0x25, 0x18, 0xef, 0x41, 0xfd, 0x9a, 0x64, 0x36, 0x9c, 0x5a, 0xe2, 0x67, 0xd0,
0xbc, 0xf1, 0xa7, 0x29, 0x71, 0x6a, 0x1a, 0x33, 0x1f, 0x87, 0xb5, 0x83, 0xaa, 0x9b, 0x41, 0xf7,
0x88, 0x8a, 0xeb, 0xfc, 0x00, 0x5f, 0x43, 0x33, 0xa4, 0xe2, 0x3a, 0x8f, 0x3f, 0x50, 0xf1, 0x4b,
0xfb, 0x7a, 0x6d, 0x83, 0x1b, 0xc3, 0xc1, 0x01, 0xc0, 0x0c, 0xbc, 0x2d, 0x74, 0xb5, 0x1c, 0xfa,
0x10, 0x36, 0xad, 0xc0, 0x23, 0xce, 0xcf, 0xe2, 0xf0, 0x94, 0x0a, 0x89, 0xcf, 0xa1, 0xc1, 0xe3,
0x30, 0x8f, 0xbf, 0xa9, 0xf3, 0x2f, 0x1b, 0x79, 0x7a, 0xdb, 0xfd, 0xb3, 0x06, 0xbd, 0x39, 0xfc,
0x96, 0x6a, 0xaa, 0x42, 0x84, 0x84, 0x4f, 0xe3, 0x4c, 0xed, 0x1a, 0x15, 0xda, 0x06, 0x18, 0x87,
0xaa, 0xae, 0x76, 0x53, 0x66, 0x9c, 0x38, 0x75, 0x53, 0x57, 0x03, 0x5d, 0x64, 0x9c, 0xe0, 0x36,
0xb4, 0x79, 0x1c, 0x4e, 0x98, 0x1f, 0x11, 0xa7, 0xa1, 0x77, 0xd7, 0x78, 0x1c, 0xbe, 0xf5, 0x23,
0x82, 0x5b, 0xd0, 0x52, 0x5b, 0x94, 0x3b, 0x4d, 0xa3, 0x2d, 0x8f, 0xc3, 0x31, 0x57, 0xc7, 0x51,
0xb0, 0xad, 0x66, 0xcb, 0x1c, 0x87, 0xc7, 0xa1, 0xa9, 0x13, 0x8e, 0x00, 0x82, 0x98, 0x49, 0x9f,
0x32, 0x92, 0x08, 0x67, 0x4d, 0x27, 0xfb, 0x6c, 0x29, 0xd9, 0xe1, 0x9b, 0xc2, 0xc6, 0x68, 0x5e,
0x22, 0x0d, 0x4e, 0x61, 0x63, 0x61, 0x7b, 0x85, 0xfa, 0x5f, 0x94, 0xd5, 0xef, 0xee, 0xf5, 0x54,
0x88, 0x82, 0x55, 0x2e, 0xc6, 0x3b, 0xe8, 0x14, 0x38, 0x3e, 0x87, 0x7e, 0x11, 0xc8, 0x24, 0x6d,
0x5c, 0xf6, 0x0a, 0x54, 0xa7, 0xfe, 0x0c, 0xd6, 0x23, 0x12, 0xc5, 0x49, 0x36, 0x99, 0xd2, 0x88,
0x4a, 0x1d, 0xa3, 0xee, 0x75, 0x0d, 0x76, 0xaa, 0x20, 0xf7, 0xaf, 0x26, 0xc0, 0x91, 0x91, 0x99,
0xbd, 0x8f, 0xf1, 0x31, 0x74, 0x94, 0x3b, 0xc1, 0xfd, 0x20, 0xf7, 0x39, 0x03, 0xd0, 0x85, 0x75,
0xa5, 0x17, 0x79, 0x9f, 0x4e, 0x89, 0x20, 0xd2, 0x96, 0x69, 0x0e, 0xc3, 0xcf, 0xc1, 0xd6, 0x25,
0x22, 0x4c, 0xce, 0x57, 0x4a, 0x21, 0xf8, 0xc2, 0xfe, 0x3f, 0x0d, 0x2d, 0xa9, 0xa3, 0xff, 0xdf,
0x22, 0xfe, 0xf0, 0x2c, 0x0e, 0xad, 0x92, 0xda, 0x0a, 0x0f, 0xa0, 0x6d, 0x7f, 0x11, 0xe1, 0x34,
0x35, 0xe3, 0xf1, 0x02, 0x23, 0x1f, 0x01, 0x86, 0x55, 0x58, 0xe3, 0x3e, 0xac, 0x09, 0x12, 0x24,
0x44, 0xaa, 0xe2, 0x2a, 0xe2, 0xa3, 0x25, 0xa2, 0xde, 0x35, 0xbc, 0xdc, 0x16, 0x5f, 0x41, 0x87,
0xb2, 0xab, 0x84, 0x08, 0x41, 0xf2, 0xb2, 0xef, 0x2c, 0x10, 0xc7, 0xf9, 0xbe, 0xa1, 0xce, 0xec,
0x71, 0x04, 0xdd, 0x84, 0xf0, 0x29, 0x0d, 0x7c, 0xa9, 0xe4, 0x69, 0x6b, 0xfa, 0x93, 0x05, 0xba,
0x37, 0xb3, 0x30, 0x0e, 0xca, 0x1c, 0x7c, 0x50, 0x0c, 0x98, 0x8e, 0x96, 0x2e, 0x9f, 0x20, 0xdf,
0x42, 0xa7, 0xd0, 0xe6, 0x2e, 0xf3, 0x63, 0xf0, 0xaa, 0xe8, 0xc3, 0x7f, 0x41, 0x3e, 0x84, 0xf5,
0xb2, 0x4c, 0x77, 0xe2, 0xbe, 0x86, 0xfe, 0xbc, 0x52, 0x77, 0x62, 0x7f, 0x07, 0xf7, 0x16, 0x85,
0xba, 0xd3, 0xd8, 0xfc, 0xa3, 0x0a, 0xfd, 0x7c, 0xd2, 0x8b, 0x38, 0x4d, 0x02, 0xa2, 0x86, 0x48,
0xc0, 0xd3, 0x49, 0x62, 0x26, 0xbf, 0x76, 0x53, 0xf7, 0x20, 0xe0, 0x69, 0xe9, 0x2e, 0x50, 0x06,
0xe5, 0x5e, 0x69, 0x07, 0x3c, 0xd5, 0x8d, 0xa2, 0x5a, 0xce, 0xf6, 0x52, 0xee, 0xa0, 0xae, 0x2d,
0x7a, 0x06, 0xcd, 0x7d, 0x2c, 0xb6, 0x5c, 0x63, 0xa9, 0xe5, 0xf0, 0x4b, 0xd8, 0x48, 0x52, 0xc6,
0x28, 0xbb, 0x9a, 0xa8, 0x9b, 0x91, 0xa5, 0x91, 0x9e, 0x4c, 0x75, 0xaf, 0x67, 0xe1, 0x11, 0xe7,
0x6f, 0xd3, 0xc8, 0xfd, 0xbd, 0x0a, 0xdb, 0xa3, 0x30, 0xbc, 0xf8, 0x99, 0x26, 0xe1, 0x99, 0x9f,
0xc8, 0xec, 0x98, 0x85, 0x3c, 0xa6, 0x4c, 0xaa, 0xeb, 0x0e, 0x11, 0x1a, 0x69, 0x5a, 0x0c, 0x52,
0xbd, 0x56, 0x02, 0x89, 0x62, 0x7a, 0xaa, 0x25, 0xf6, 0xa1, 0x46, 0xb9, 0xed, 0xc2, 0x1a, 0xe5,
0x8a, 0xc5, 0xe3, 0xc4, 0x1c, 0xab, 0xe9, 0xe9, 0xb5, 0x4a, 0x9b, 0x8a, 0x49, 0xcc, 0xa6, 0x94,
0x11, 0x7d, 0x92, 0xb6, 0xd7, 0xa6, 0xe2, 0x27, 0xfd, 0xad, 0x0f, 0xf1, 0x8e, 0x7f, 0xe2, 0x43,
0x8c, 0x60, 0xfb, 0x88, 0x4c, 0xff, 0xcb, 0x19, 0xdc, 0x0f, 0x55, 0xc0, 0x65, 0x07, 0xff, 0x63,
0x02, 0xb3, 0xc6, 0x6d, 0x96, 0x1b, 0x77, 0x3e, 0xb1, 0xd6, 0x42, 0x62, 0xdf, 0xc3, 0xfd, 0x15,
0x59, 0xe1, 0x2e, 0xd4, 0xe3, 0xcb, 0x5f, 0xec, 0x15, 0xfb, 0x40, 0xcd, 0x8f, 0x65, 0x2b, 0x4f,
0x99, 0xec, 0x7d, 0x68, 0x40, 0x7f, 0xc4, 0xb9, 0x67, 0x5e, 0x57, 0xe7, 0x19, 0x0b, 0xf0, 0x00,
0xd6, 0x4f, 0x88, 0x1c, 0x71, 0x6e, 0x6f, 0xb2, 0xfb, 0xa5, 0x5b, 0x2b, 0x7f, 0x28, 0x0d, 0x36,
0x97, 0xde, 0x2d, 0x6e, 0x05, 0x5f, 0x02, 0x18, 0xa6, 0x9a, 0x34, 0x88, 0x25, 0x5e, 0x4e, 0xdb,
0x5a, 0xba, 0x01, 0xd5, 0x9b, 0xc0, 0xad, 0xe0, 0x3e, 0xf4, 0x4e, 0x88, 0x2c, 0x5d, 0x24, 0xab,
0xd8, 0xfd, 0xf9, 0x49, 0xe8, 0x56, 0xf0, 0x35, 0x6c, 0x9e, 0x10, 0xb9, 0xd0, 0xa7, 0xfa, 0x6c,
0x73, 0xaf, 0xb4, 0x01, 0x96, 0x21, 0x63, 0xe6, 0x56, 0xf0, 0x07, 0x78, 0xa8, 0xc2, 0xaf, 0x52,
0x70, 0x55, 0xf8, 0x87, 0xab, 0x85, 0x14, 0x6e, 0x05, 0xdf, 0xc0, 0xd6, 0xca, 0x4e, 0x43, 0x3d,
0xfb, 0x3f, 0xda, 0x84, 0x83, 0x8e, 0xda, 0x36, 0xcf, 0x57, 0xed, 0x64, 0x65, 0xa7, 0x18, 0x27,
0x1f, 0x6d, 0xa2, 0x25, 0x27, 0x2b, 0x7f, 0x75, 0xb4, 0xb7, 0xd0, 0xf4, 0x1f, 0x38, 0xb9, 0x6c,
0xe9, 0xe7, 0xf5, 0x37, 0x7f, 0x07, 0x00, 0x00, 0xff, 0xff, 0xc9, 0x52, 0x20, 0xc0, 0x7a, 0x0b,
0x00, 0x00,
// 1044 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xc4, 0x56, 0xef, 0x6e, 0xdb, 0x36,
0x10, 0xf7, 0xff, 0xd8, 0xe7, 0xd8, 0x69, 0xd8, 0xa5, 0x55, 0xdc, 0x76, 0x6d, 0x35, 0x74, 0xc8,
0x87, 0xc2, 0x1b, 0x32, 0x04, 0x4b, 0xd3, 0x62, 0x83, 0xd1, 0x04, 0x99, 0x81, 0xac, 0x0b, 0x94,
0xf4, 0xb3, 0xa1, 0x48, 0x6c, 0xc6, 0xc5, 0x22, 0x59, 0x91, 0x0a, 0xa0, 0x8f, 0xdb, 0x2b, 0xf4,
0x85, 0xf6, 0x04, 0xc3, 0x1e, 0x69, 0xe0, 0x1f, 0xc9, 0x92, 0xad, 0xa2, 0xf3, 0x36, 0x60, 0xdf,
0xa8, 0xdf, 0xdd, 0xef, 0x8e, 0x77, 0xa7, 0xbb, 0x23, 0x38, 0x3e, 0xe7, 0xb3, 0x38, 0xa1, 0x92,
0x44, 0x78, 0x26, 0x70, 0x7c, 0x8b, 0xe3, 0x31, 0x8f, 0x99, 0x64, 0xa8, 0xc1, 0xaf, 0xdc, 0x0d,
0x68, 0x9f, 0x44, 0x5c, 0xa6, 0xee, 0x57, 0x30, 0xbc, 0xc0, 0xf1, 0x2d, 0x09, 0xb0, 0x87, 0xdf,
0x27, 0x58, 0x48, 0xf4, 0x08, 0x40, 0x18, 0x64, 0x46, 0x42, 0xa7, 0xfe, 0xa4, 0xbe, 0xd7, 0xf3,
0x7a, 0x16, 0x99, 0x86, 0xee, 0x3e, 0x6c, 0x59, 0x82, 0xc8, 0x18, 0x8f, 0xa1, 0xbf, 0x60, 0x08,
0x4b, 0x81, 0x9c, 0x22, 0xdc, 0xe7, 0x30, 0xb8, 0xc4, 0xd4, 0xa7, 0x32, 0x63, 0x3c, 0x80, 0x9e,
0xd4, 0xc0, 0xc2, 0x45, 0xd7, 0x00, 0xd3, 0xd0, 0xfd, 0xb5, 0x0e, 0x83, 0x0b, 0xe9, 0xcb, 0x44,
0xfc, 0x88, 0x85, 0xf0, 0xaf, 0x31, 0x3a, 0x80, 0x8e, 0xd0, 0x80, 0x53, 0x7f, 0xd2, 0xdc, 0xeb,
0xef, 0x3f, 0x1a, 0xf3, 0xab, 0x71, 0x49, 0xc5, 0x7e, 0x9d, 0x50, 0x19, 0xa7, 0x9e, 0x55, 0x1e,
0xbd, 0x80, 0x7e, 0x01, 0x46, 0x77, 0xa0, 0x79, 0x83, 0x53, 0xeb, 0x4e, 0x1d, 0xd1, 0x67, 0xd0,
0xbe, 0xf5, 0xe7, 0x09, 0x76, 0x1a, 0x1a, 0x33, 0x1f, 0x47, 0x8d, 0xc3, 0xba, 0x9b, 0x42, 0xff,
0x98, 0x88, 0x9b, 0xec, 0x02, 0x5f, 0x43, 0x3b, 0x24, 0xe2, 0x26, 0xf3, 0x3f, 0x52, 0xfe, 0x0b,
0x72, 0x7d, 0xb6, 0xce, 0x8d, 0xe2, 0xe8, 0x10, 0x60, 0x01, 0x7e, 0xca, 0x75, 0xbd, 0xe8, 0xfa,
0x08, 0xb6, 0x6d, 0x82, 0x27, 0x9c, 0x9f, 0xb3, 0xf0, 0x8c, 0x08, 0x89, 0x9e, 0x41, 0x8b, 0xb3,
0x30, 0xf3, 0xbf, 0xad, 0xe3, 0x2f, 0x2a, 0x79, 0x5a, 0xec, 0xfe, 0xd1, 0x80, 0x41, 0x09, 0xff,
0x44, 0x35, 0x55, 0x21, 0x42, 0xcc, 0xe7, 0x2c, 0x55, 0x52, 0x93, 0x85, 0xae, 0x01, 0xa6, 0xa1,
0xaa, 0xab, 0x15, 0xca, 0x94, 0x63, 0xa7, 0x69, 0xea, 0x6a, 0xa0, 0xcb, 0x94, 0x63, 0xb4, 0x0b,
0x5d, 0xce, 0xc2, 0x19, 0xf5, 0x23, 0xec, 0xb4, 0xb4, 0x74, 0x83, 0xb3, 0xf0, 0x8d, 0x1f, 0x61,
0xb4, 0x03, 0x1d, 0x25, 0x22, 0xdc, 0x69, 0x9b, 0xdc, 0x72, 0x16, 0x4e, 0xb9, 0xba, 0x8e, 0x82,
0x6d, 0x35, 0x3b, 0xe6, 0x3a, 0x9c, 0x85, 0xa6, 0x4e, 0x68, 0x02, 0x10, 0x30, 0x2a, 0x7d, 0x42,
0x71, 0x2c, 0x9c, 0x0d, 0x1d, 0xec, 0xd3, 0x95, 0x60, 0xc7, 0xaf, 0x73, 0x1d, 0x93, 0xf3, 0x02,
0x69, 0x74, 0x06, 0x5b, 0x4b, 0xe2, 0x8a, 0xec, 0x7f, 0x51, 0xcc, 0x7e, 0x7f, 0x7f, 0xa0, 0x5c,
0xe4, 0xac, 0x62, 0x31, 0xde, 0x42, 0x2f, 0xc7, 0xd1, 0x33, 0x18, 0xe6, 0x8e, 0x4c, 0xd0, 0xc6,
0xe4, 0x20, 0x47, 0x75, 0xe8, 0x4f, 0x61, 0x33, 0xc2, 0x11, 0x8b, 0xd3, 0xd9, 0x9c, 0x44, 0x44,
0x6a, 0x1f, 0x4d, 0xaf, 0x6f, 0xb0, 0x33, 0x05, 0xb9, 0x7f, 0x76, 0x00, 0x8e, 0x4d, 0x9a, 0xe9,
0x3b, 0x86, 0x1e, 0x42, 0x4f, 0x99, 0x13, 0xdc, 0x0f, 0x32, 0x9b, 0x0b, 0x00, 0xb9, 0xb0, 0xa9,
0xf2, 0x85, 0xdf, 0x25, 0x73, 0x2c, 0xb0, 0xb4, 0x65, 0x2a, 0x61, 0xe8, 0x73, 0xb0, 0x75, 0x89,
0x30, 0x95, 0xe5, 0x4a, 0x29, 0x04, 0x3d, 0xb7, 0xff, 0x4f, 0x4b, 0xa7, 0xd4, 0xd1, 0xff, 0x6f,
0xee, 0x7f, 0x7c, 0xce, 0x42, 0x9b, 0x49, 0xad, 0x85, 0x0e, 0xa1, 0x6b, 0x7f, 0x11, 0xe1, 0xb4,
0x35, 0xe3, 0xe1, 0x12, 0x23, 0x1b, 0x01, 0x86, 0x95, 0x6b, 0xa3, 0x97, 0xd0, 0xc3, 0x34, 0xe4,
0x8c, 0x50, 0xa9, 0xca, 0x9b, 0x37, 0x6b, 0x81, 0x7a, 0x92, 0xc9, 0x0d, 0x77, 0xa1, 0x8f, 0x0e,
0x60, 0x43, 0xe0, 0x20, 0xc6, 0x32, 0x2b, 0xfd, 0x83, 0x15, 0xaf, 0x5a, 0x6a, 0x88, 0x99, 0xae,
0xf2, 0x49, 0xe8, 0x75, 0x8c, 0x85, 0xc0, 0xc2, 0xe9, 0x56, 0xfa, 0x9c, 0x66, 0x72, 0xeb, 0x33,
0xd7, 0x47, 0x13, 0xe8, 0xc7, 0x98, 0xcf, 0x49, 0xe0, 0x4b, 0x95, 0xdb, 0x9e, 0xa6, 0x3f, 0x5e,
0xa2, 0x7b, 0x0b, 0x0d, 0x63, 0xa0, 0xc8, 0x41, 0xf7, 0xf2, 0xe9, 0x04, 0x3a, 0xef, 0xd9, 0xf8,
0xf9, 0x16, 0x7a, 0x79, 0x62, 0xd7, 0x19, 0x3e, 0xa3, 0x97, 0x79, 0x13, 0xff, 0x03, 0xf2, 0x2b,
0x18, 0x96, 0x33, 0xbc, 0x16, 0xfb, 0x08, 0x36, 0x8b, 0x49, 0x5e, 0xd7, 0x73, 0x39, 0xcf, 0x6b,
0xb1, 0xbf, 0x83, 0x3b, 0xcb, 0x69, 0x5e, 0x6b, 0x62, 0xff, 0x5e, 0x87, 0x61, 0xb6, 0x64, 0x04,
0x4b, 0xe2, 0x00, 0xab, 0xf9, 0x15, 0xf0, 0x64, 0x16, 0x9b, 0xa5, 0xa3, 0xcd, 0x34, 0x3d, 0x08,
0x78, 0x52, 0x58, 0x43, 0x4a, 0xa1, 0xd8, 0xa6, 0xdd, 0x80, 0x27, 0xba, 0x47, 0x55, 0xb7, 0xdb,
0x36, 0xce, 0x0c, 0x34, 0xb5, 0xc6, 0xc0, 0xa0, 0x99, 0x8d, 0xe5, 0x6e, 0x6f, 0xad, 0x74, 0x3b,
0xfa, 0x12, 0xb6, 0xe2, 0x84, 0x52, 0x42, 0xaf, 0x67, 0x6a, 0x29, 0xd3, 0x24, 0xd2, 0x43, 0xb1,
0xe9, 0x0d, 0x2c, 0x3c, 0xe1, 0xfc, 0x4d, 0x12, 0xb9, 0xbf, 0xd5, 0x61, 0x77, 0x12, 0x86, 0x97,
0x3f, 0x93, 0x38, 0x3c, 0xf7, 0x63, 0x99, 0xe6, 0x85, 0xf4, 0xf0, 0x7b, 0x84, 0xa0, 0x95, 0x24,
0xf9, 0x0c, 0xd7, 0x67, 0x95, 0x20, 0x91, 0x0f, 0x6e, 0x75, 0x44, 0x43, 0x68, 0x10, 0x6e, 0x07,
0x40, 0x83, 0x70, 0xc5, 0xe2, 0x2c, 0x36, 0xd7, 0x6a, 0x7b, 0xfa, 0xac, 0xc2, 0x26, 0x62, 0xc6,
0xe8, 0x9c, 0x50, 0xac, 0x6f, 0xd2, 0xf5, 0xba, 0x44, 0xfc, 0xa4, 0xbf, 0xf5, 0x25, 0xde, 0xf2,
0xff, 0xf9, 0x12, 0x13, 0xd8, 0x3d, 0xc6, 0xf3, 0x7f, 0x73, 0x07, 0xf7, 0x43, 0x1d, 0xd0, 0xaa,
0x81, 0xff, 0x30, 0x80, 0x45, 0xdb, 0xb7, 0x8b, 0x6d, 0x5f, 0x0e, 0xac, 0xb3, 0x14, 0xd8, 0xf7,
0x70, 0xb7, 0x22, 0x2a, 0xb4, 0x07, 0x4d, 0x76, 0xf5, 0x8b, 0xdd, 0xee, 0xf7, 0xd4, 0xf4, 0x59,
0xd5, 0xf2, 0x94, 0xca, 0xfe, 0x87, 0x16, 0x0c, 0x27, 0x9c, 0x7b, 0xe6, 0x61, 0x77, 0x91, 0xd2,
0x00, 0x1d, 0xc2, 0xe6, 0x29, 0x96, 0x13, 0xce, 0xed, 0x12, 0xbd, 0x5b, 0x58, 0x98, 0xd9, 0x1b,
0x6d, 0xb4, 0xbd, 0xf2, 0x64, 0x72, 0x6b, 0xe8, 0x05, 0x80, 0x61, 0xaa, 0x39, 0x85, 0x50, 0x81,
0x97, 0xd1, 0x76, 0x56, 0x96, 0xaf, 0x7a, 0x8e, 0xb8, 0x35, 0x74, 0x00, 0x83, 0x53, 0x2c, 0x0b,
0x3b, 0xac, 0x8a, 0x3d, 0x2c, 0xcf, 0x51, 0xb7, 0x86, 0x5e, 0xc1, 0xf6, 0x29, 0x96, 0x4b, 0x7d,
0xaa, 0xef, 0x56, 0x7a, 0x20, 0x8e, 0x50, 0x11, 0x32, 0x6a, 0x6e, 0x0d, 0xfd, 0x00, 0xf7, 0x95,
0xfb, 0xaa, 0x0c, 0x56, 0xb9, 0xbf, 0x5f, 0x9d, 0x48, 0xe1, 0xd6, 0xd0, 0x6b, 0xd8, 0xa9, 0xec,
0x34, 0xa4, 0x37, 0xc7, 0x47, 0x9b, 0x70, 0xd4, 0x53, 0x62, 0xf3, 0x72, 0xd6, 0x46, 0x2a, 0x3b,
0xc5, 0x18, 0xf9, 0x68, 0x13, 0xad, 0x18, 0xa9, 0xfc, 0xd5, 0x91, 0xdd, 0x61, 0xf3, 0xbf, 0x61,
0xe4, 0xaa, 0xa3, 0x5f, 0xf6, 0xdf, 0xfc, 0x15, 0x00, 0x00, 0xff, 0xff, 0xa2, 0xcd, 0xa8, 0xa8,
0xf5, 0x0b, 0x00, 0x00,
}
// Reference imports to suppress errors if they are not otherwise used.

View File

@ -56,10 +56,11 @@ message DeployInfo {
string deployment = 3;
map<string, string> pods = 4;
map<string, string> services = 5;
map<string, string> secrets = 6;
map<string, string> ingresses = 7;
map<string, string> replicatset = 8;
string status = 9;
map<string, string> endpoints = 6;
map<string, string> secrets = 7;
map<string, string> ingresses = 8;
map<string, string> replicatset = 9;
string status = 10;
}
message TenantResource {

View File

@ -21,9 +21,9 @@ package server
import (
"context"
"fmt"
"github.com/goodrain/rainbond/db/model"
corev1 "k8s.io/api/core/v1"
"net"
"strconv"
"strings"
"time"
@ -180,6 +180,14 @@ func (r *RuntimeServer) GetDeployInfo(ctx context.Context, re *pb.ServiceRequest
}
deployinfo.Services = service
}
if endpoints := appService.GetEndpoints(); endpoints != nil &&
appService.AppServiceBase.ServiceKind == model.ServiceKindThirdParty {
eps := make(map[string]string, len(endpoints))
for _, s := range endpoints {
eps[s.Name] = s.Name
}
deployinfo.Endpoints = eps
}
if secrets := appService.GetSecrets(); secrets != nil {
secretsinfo := make(map[string]string, len(secrets))
for _, s := range secrets {
@ -242,36 +250,49 @@ func (r *RuntimeServer) ListThirdPartyEndpoints(ctx context.Context, re *pb.Serv
return new(pb.ThirdPartyEndpoints), nil
}
var pbeps []*pb.ThirdPartyEndpoint
// The same IP may correspond to two endpoints, which are internal and external endpoints.
// So it is need to filter the same IP.
exists := make(map[string]bool)
for _, ep := range as.GetEndpoints() {
if exists[ep.GetLabels()["uuid"]] {
if ep.Subsets == nil || len(ep.Subsets) == 0 {
logrus.Debugf("Key: %s; empty subsets", fmt.Sprintf("%s/%s", ep.Namespace, ep.Name))
continue
}
exists[ep.GetLabels()["uuid"]] = true
pbep := &pb.ThirdPartyEndpoint{
Uuid: ep.GetLabels()["uuid"],
Sid: ep.GetLabels()["service_id"],
Ip: ep.GetLabels()["ip"],
Port: func(item *corev1.Endpoints) int32 {
realport, _ := strconv.ParseBool(item.GetLabels()["realport"])
if realport {
portstr := item.GetLabels()["port"]
port, _ := strconv.Atoi(portstr)
return int32(port)
for idx, subset := range ep.Subsets {
if exists[subset.Ports[0].Name] {
continue
}
ip := func(subset corev1.EndpointSubset) string {
if subset.Addresses != nil && len(subset.Addresses) > 0 {
return subset.Addresses[0].IP
}
return 0
}(ep),
Status: func(item *corev1.Endpoints) string {
if item.Subsets == nil || len(item.Subsets) == 0 {
if subset.NotReadyAddresses != nil && len(subset.NotReadyAddresses) > 0 {
return subset.NotReadyAddresses[0].IP
}
return ""
}(subset)
if strings.TrimSpace(ip) == "" {
logrus.Debugf("Key: %s; Index: %d; IP not found", fmt.Sprintf("%s/%s", ep.Namespace, ep.Name), idx)
continue
}
exists[subset.Ports[0].Name] = true
pbep := &pb.ThirdPartyEndpoint{
Uuid: subset.Ports[0].Name,
Sid: ep.GetLabels()["service_id"],
Ip: ip,
Port: subset.Ports[0].Port,
Status: func(item *corev1.Endpoints) string {
if subset.Addresses != nil && len(subset.Addresses) > 0 {
return "healthy"
}
if subset.NotReadyAddresses != nil && len(subset.NotReadyAddresses) > 0 {
return "unhealthy"
}
return "unknown"
}
if item.Subsets[0].Addresses == nil || len(item.Subsets[0].Addresses) == 0 {
return "unhealthy"
}
return "healthy"
}(ep),
}(ep),
}
pbeps = append(pbeps, pbep)
}
pbeps = append(pbeps, pbep)
}
return &pb.ThirdPartyEndpoints{
Obj: pbeps,
@ -310,9 +331,16 @@ func (r *RuntimeServer) UpdThirdPartyEndpoint(ctx context.Context, re *pb.UpdThi
Port: int(re.Port),
IsOnline: re.IsOnline,
}
r.updateCh.In() <- discovery.Event{
Type: discovery.UpdateEvent,
Obj: rbdep,
if re.IsOnline == false {
r.updateCh.In() <- discovery.Event{
Type: discovery.DeleteEvent,
Obj: rbdep,
}
} else {
r.updateCh.In() <- discovery.Event{
Type: discovery.UpdateEvent,
Obj: rbdep,
}
}
return new(pb.Empty), nil
}