From 76a6d486a1f50aa10864c2e3c164ea6773797263 Mon Sep 17 00:00:00 2001 From: huangrh Date: Thu, 28 Feb 2019 11:50:54 +0800 Subject: [PATCH] [ADD] create corev1.Endpoints for third-party services --- api/handler/service.go | 4 +- db/dao/dao.go | 12 +- db/dao/dao_mock.go | 162 +++++++++- db/db.go | 4 +- db/db_mock.go | 42 ++- db/model/third_party_service.go | 14 +- db/mysql/dao/3rd_party.go | 46 ++- db/mysql/dao/tenants.go | 22 ++ db/mysql/dao_impl.go | 12 +- db/mysql/mysql.go | 2 +- db/tenant_test.go | 207 +++++++++++++ worker/appm/controller/apply_rule.go | 36 +++ worker/appm/conversion/conversion.go | 18 +- worker/appm/conversion/conversion_test.go | 46 ++- worker/appm/conversion/errors.go | 4 +- worker/appm/conversion/gateway.go | 64 +++- worker/appm/conversion/gateway_test.go | 281 ++++++++++-------- worker/appm/conversion/service.go | 23 +- worker/appm/store/store.go | 40 ++- worker/appm/store/third_party_service.go | 47 +++ worker/appm/thirdparty/discovery/discovery.go | 41 +++ worker/appm/thirdparty/discovery/etcd.go | 104 +++++++ worker/appm/thirdparty/endpoints.go | 81 +++++ worker/appm/types/v1/endpoint.go | 25 ++ worker/appm/types/v1/status.go | 3 + worker/appm/types/v1/v1.go | 67 ++++- worker/discover/model/model.go | 1 + worker/handle/manager.go | 24 +- 28 files changed, 1229 insertions(+), 203 deletions(-) create mode 100644 db/tenant_test.go create mode 100644 worker/appm/store/third_party_service.go create mode 100644 worker/appm/thirdparty/discovery/discovery.go create mode 100644 worker/appm/thirdparty/discovery/etcd.go create mode 100644 worker/appm/thirdparty/endpoints.go create mode 100644 worker/appm/types/v1/endpoint.go diff --git a/api/handler/service.go b/api/handler/service.go index 39ba5db60..dd607d6dd 100644 --- a/api/handler/service.go +++ b/api/handler/service.go @@ -613,7 +613,7 @@ func (s *ServiceAction) ServiceCreate(sc *api_model.ServiceStruct) error { tx.Rollback() return err } - c := &dbmodel.ThirdPartyServiceDiscoveryCfg{ + c := &dbmodel.ThirdPartySvcDiscoveryCfg{ ServiceID: sc.ServiceID, Type: cfg.Type, Servers: strings.Join(cfg.Servers, ","), @@ -621,7 +621,7 @@ func (s *ServiceAction) ServiceCreate(sc *api_model.ServiceStruct) error { Username: cfg.Username, Password: cfg.Password, } - if err := db.GetManager().ThirdPartyServiceDiscoveryCfgDaoTransactions(tx). + if err := db.GetManager().ThirdPartySvcDiscoveryCfgDaoTransactions(tx). AddModel(c); err != nil { logrus.Errorf("error saving discover center configuration: %v", err) tx.Rollback() diff --git a/db/dao/dao.go b/db/dao/dao.go index f4689b179..14b091018 100644 --- a/db/dao/dao.go +++ b/db/dao/dao.go @@ -80,6 +80,7 @@ type TenantServiceDao interface { GetPagedTenantService(offset, len int, serviceIDs []string) ([]map[string]interface{}, int, error) GetAllServicesID() ([]*model.TenantServices, error) UpdateDeployVersion(serviceID, deployversion string) error + ListThirdPartyServices() ([]*model.TenantServices, error) } //TenantServiceDeleteDao TenantServiceDeleteDao @@ -98,6 +99,7 @@ type TenantServicesPortDao interface { GetInnerPorts(serviceID string) ([]*model.TenantServicesPort, error) GetPort(serviceID string, port int) (*model.TenantServicesPort, error) DELPortsByServiceID(serviceID string) error + HasOpenPort(sid string) bool } //TenantPluginDao TenantPluginDao @@ -411,12 +413,14 @@ type IPPoolDao interface { type EndpointsDao interface { Dao GetByUUID(uuid string) (*model.Endpoint, error) - List(sid string) ([]*model.Endpoint, error) DelByUUID(uuid string) error + List(sid string) ([]*model.Endpoint, error) + ListIsOnline(sid string) ([]*model.Endpoint, error) } -// ThirdPartyServiceDiscoveryCfgDao is an interface for defining method +// ThirdPartySvcDiscoveryCfgDao is an interface for defining method // for operating table 3rd_party_svc_discovery_cfg. -type ThirdPartyServiceDiscoveryCfgDao interface { +type ThirdPartySvcDiscoveryCfgDao interface { Dao -} \ No newline at end of file + GetByServiceID(sid string) (*model.ThirdPartySvcDiscoveryCfg, error) +} diff --git a/db/dao/dao_mock.go b/db/dao/dao_mock.go index 9fe8f0175..8a4697166 100644 --- a/db/dao/dao_mock.go +++ b/db/dao/dao_mock.go @@ -4,10 +4,9 @@ package dao import ( - time "time" - - model "github.com/goodrain/rainbond/db/model" gomock "github.com/rafrombrc/gomock/gomock" + model "github.com/goodrain/rainbond/db/model" + time "time" ) // Mock of Dao interface @@ -533,6 +532,17 @@ func (_mr *_MockTenantServiceDaoRecorder) UpdateDeployVersion(arg0, arg1 interfa return _mr.mock.ctrl.RecordCall(_mr.mock, "UpdateDeployVersion", arg0, arg1) } +func (_m *MockTenantServiceDao) ListThirdPartyServices() ([]*model.TenantServices, error) { + ret := _m.ctrl.Call(_m, "ListThirdPartyServices") + ret0, _ := ret[0].([]*model.TenantServices) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +func (_mr *_MockTenantServiceDaoRecorder) ListThirdPartyServices() *gomock.Call { + return _mr.mock.ctrl.RecordCall(_mr.mock, "ListThirdPartyServices") +} + // Mock of TenantServiceDeleteDao interface type MockTenantServiceDeleteDao struct { ctrl *gomock.Controller @@ -705,6 +715,16 @@ func (_mr *_MockTenantServicesPortDaoRecorder) DELPortsByServiceID(arg0 interfac return _mr.mock.ctrl.RecordCall(_mr.mock, "DELPortsByServiceID", arg0) } +func (_m *MockTenantServicesPortDao) HasOpenPort(sid string) bool { + ret := _m.ctrl.Call(_m, "HasOpenPort", sid) + ret0, _ := ret[0].(bool) + return ret0 +} + +func (_mr *_MockTenantServicesPortDaoRecorder) HasOpenPort(arg0 interface{}) *gomock.Call { + return _mr.mock.ctrl.RecordCall(_mr.mock, "HasOpenPort", arg0) +} + // Mock of TenantPluginDao interface type MockTenantPluginDao struct { ctrl *gomock.Controller @@ -3441,3 +3461,139 @@ func (_m *MockIPPoolDao) GetIPPoolByEID(eid string) (*model.IPPool, error) { func (_mr *_MockIPPoolDaoRecorder) GetIPPoolByEID(arg0 interface{}) *gomock.Call { return _mr.mock.ctrl.RecordCall(_mr.mock, "GetIPPoolByEID", arg0) } + +// Mock of EndpointsDao interface +type MockEndpointsDao struct { + ctrl *gomock.Controller + recorder *_MockEndpointsDaoRecorder +} + +// Recorder for MockEndpointsDao (not exported) +type _MockEndpointsDaoRecorder struct { + mock *MockEndpointsDao +} + +func NewMockEndpointsDao(ctrl *gomock.Controller) *MockEndpointsDao { + mock := &MockEndpointsDao{ctrl: ctrl} + mock.recorder = &_MockEndpointsDaoRecorder{mock} + return mock +} + +func (_m *MockEndpointsDao) EXPECT() *_MockEndpointsDaoRecorder { + return _m.recorder +} + +func (_m *MockEndpointsDao) AddModel(_param0 model.Interface) error { + ret := _m.ctrl.Call(_m, "AddModel", _param0) + ret0, _ := ret[0].(error) + return ret0 +} + +func (_mr *_MockEndpointsDaoRecorder) AddModel(arg0 interface{}) *gomock.Call { + return _mr.mock.ctrl.RecordCall(_mr.mock, "AddModel", arg0) +} + +func (_m *MockEndpointsDao) UpdateModel(_param0 model.Interface) error { + ret := _m.ctrl.Call(_m, "UpdateModel", _param0) + ret0, _ := ret[0].(error) + return ret0 +} + +func (_mr *_MockEndpointsDaoRecorder) UpdateModel(arg0 interface{}) *gomock.Call { + return _mr.mock.ctrl.RecordCall(_mr.mock, "UpdateModel", arg0) +} + +func (_m *MockEndpointsDao) GetByUUID(uuid string) (*model.Endpoint, error) { + ret := _m.ctrl.Call(_m, "GetByUUID", uuid) + ret0, _ := ret[0].(*model.Endpoint) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +func (_mr *_MockEndpointsDaoRecorder) GetByUUID(arg0 interface{}) *gomock.Call { + return _mr.mock.ctrl.RecordCall(_mr.mock, "GetByUUID", arg0) +} + +func (_m *MockEndpointsDao) DelByUUID(uuid string) error { + ret := _m.ctrl.Call(_m, "DelByUUID", uuid) + ret0, _ := ret[0].(error) + return ret0 +} + +func (_mr *_MockEndpointsDaoRecorder) DelByUUID(arg0 interface{}) *gomock.Call { + return _mr.mock.ctrl.RecordCall(_mr.mock, "DelByUUID", arg0) +} + +func (_m *MockEndpointsDao) List(sid string) ([]*model.Endpoint, error) { + ret := _m.ctrl.Call(_m, "List", sid) + ret0, _ := ret[0].([]*model.Endpoint) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +func (_mr *_MockEndpointsDaoRecorder) List(arg0 interface{}) *gomock.Call { + return _mr.mock.ctrl.RecordCall(_mr.mock, "List", arg0) +} + +func (_m *MockEndpointsDao) ListIsOnline(sid string) ([]*model.Endpoint, error) { + ret := _m.ctrl.Call(_m, "ListIsOnline", sid) + ret0, _ := ret[0].([]*model.Endpoint) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +func (_mr *_MockEndpointsDaoRecorder) ListIsOnline(arg0 interface{}) *gomock.Call { + return _mr.mock.ctrl.RecordCall(_mr.mock, "ListIsOnline", arg0) +} + +// Mock of ThirdPartySvcDiscoveryCfgDao interface +type MockThirdPartySvcDiscoveryCfgDao struct { + ctrl *gomock.Controller + recorder *_MockThirdPartySvcDiscoveryCfgDaoRecorder +} + +// Recorder for MockThirdPartySvcDiscoveryCfgDao (not exported) +type _MockThirdPartySvcDiscoveryCfgDaoRecorder struct { + mock *MockThirdPartySvcDiscoveryCfgDao +} + +func NewMockThirdPartySvcDiscoveryCfgDao(ctrl *gomock.Controller) *MockThirdPartySvcDiscoveryCfgDao { + mock := &MockThirdPartySvcDiscoveryCfgDao{ctrl: ctrl} + mock.recorder = &_MockThirdPartySvcDiscoveryCfgDaoRecorder{mock} + return mock +} + +func (_m *MockThirdPartySvcDiscoveryCfgDao) EXPECT() *_MockThirdPartySvcDiscoveryCfgDaoRecorder { + return _m.recorder +} + +func (_m *MockThirdPartySvcDiscoveryCfgDao) AddModel(_param0 model.Interface) error { + ret := _m.ctrl.Call(_m, "AddModel", _param0) + ret0, _ := ret[0].(error) + return ret0 +} + +func (_mr *_MockThirdPartySvcDiscoveryCfgDaoRecorder) AddModel(arg0 interface{}) *gomock.Call { + return _mr.mock.ctrl.RecordCall(_mr.mock, "AddModel", arg0) +} + +func (_m *MockThirdPartySvcDiscoveryCfgDao) UpdateModel(_param0 model.Interface) error { + ret := _m.ctrl.Call(_m, "UpdateModel", _param0) + ret0, _ := ret[0].(error) + return ret0 +} + +func (_mr *_MockThirdPartySvcDiscoveryCfgDaoRecorder) UpdateModel(arg0 interface{}) *gomock.Call { + return _mr.mock.ctrl.RecordCall(_mr.mock, "UpdateModel", arg0) +} + +func (_m *MockThirdPartySvcDiscoveryCfgDao) GetByServiceID(sid string) (*model.ThirdPartySvcDiscoveryCfg, error) { + ret := _m.ctrl.Call(_m, "GetByServiceID", sid) + ret0, _ := ret[0].(*model.ThirdPartySvcDiscoveryCfg) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +func (_mr *_MockThirdPartySvcDiscoveryCfgDaoRecorder) GetByServiceID(arg0 interface{}) *gomock.Call { + return _mr.mock.ctrl.RecordCall(_mr.mock, "GetByServiceID", arg0) +} diff --git a/db/db.go b/db/db.go index 0ddec8b32..0df851320 100644 --- a/db/db.go +++ b/db/db.go @@ -113,8 +113,8 @@ type Manager interface { // third-party service EndpointsDao() dao.EndpointsDao EndpointsDaoTransactions(db *gorm.DB) dao.EndpointsDao - ThirdPartyServiceDiscoveryCfgDao() dao.ThirdPartyServiceDiscoveryCfgDao - ThirdPartyServiceDiscoveryCfgDaoTransactions(db *gorm.DB) dao.ThirdPartyServiceDiscoveryCfgDao + ThirdPartySvcDiscoveryCfgDao() dao.ThirdPartySvcDiscoveryCfgDao + ThirdPartySvcDiscoveryCfgDaoTransactions(db *gorm.DB) dao.ThirdPartySvcDiscoveryCfgDao } var defaultManager Manager diff --git a/db/db_mock.go b/db/db_mock.go index 9c585c51f..225ac429b 100644 --- a/db/db_mock.go +++ b/db/db_mock.go @@ -4,9 +4,9 @@ package db import ( - gomock "github.com/rafrombrc/gomock/gomock" gorm "github.com/jinzhu/gorm" dao "github.com/goodrain/rainbond/db/dao" + gomock "github.com/rafrombrc/gomock/gomock" ) // Mock of Manager interface @@ -719,3 +719,43 @@ func (_m *MockManager) IPPoolDao() dao.IPPoolDao { func (_mr *_MockManagerRecorder) IPPoolDao() *gomock.Call { return _mr.mock.ctrl.RecordCall(_mr.mock, "IPPoolDao") } + +func (_m *MockManager) EndpointsDao() dao.EndpointsDao { + ret := _m.ctrl.Call(_m, "EndpointsDao") + ret0, _ := ret[0].(dao.EndpointsDao) + return ret0 +} + +func (_mr *_MockManagerRecorder) EndpointsDao() *gomock.Call { + return _mr.mock.ctrl.RecordCall(_mr.mock, "EndpointsDao") +} + +func (_m *MockManager) EndpointsDaoTransactions(db *gorm.DB) dao.EndpointsDao { + ret := _m.ctrl.Call(_m, "EndpointsDaoTransactions", db) + ret0, _ := ret[0].(dao.EndpointsDao) + return ret0 +} + +func (_mr *_MockManagerRecorder) EndpointsDaoTransactions(arg0 interface{}) *gomock.Call { + return _mr.mock.ctrl.RecordCall(_mr.mock, "EndpointsDaoTransactions", arg0) +} + +func (_m *MockManager) ThirdPartySvcDiscoveryCfgDao() dao.ThirdPartySvcDiscoveryCfgDao { + ret := _m.ctrl.Call(_m, "ThirdPartySvcDiscoveryCfgDao") + ret0, _ := ret[0].(dao.ThirdPartySvcDiscoveryCfgDao) + return ret0 +} + +func (_mr *_MockManagerRecorder) ThirdPartySvcDiscoveryCfgDao() *gomock.Call { + return _mr.mock.ctrl.RecordCall(_mr.mock, "ThirdPartySvcDiscoveryCfgDao") +} + +func (_m *MockManager) ThirdPartySvcDiscoveryCfgDaoTransactions(db *gorm.DB) dao.ThirdPartySvcDiscoveryCfgDao { + ret := _m.ctrl.Call(_m, "ThirdPartySvcDiscoveryCfgDaoTransactions", db) + ret0, _ := ret[0].(dao.ThirdPartySvcDiscoveryCfgDao) + return ret0 +} + +func (_mr *_MockManagerRecorder) ThirdPartySvcDiscoveryCfgDaoTransactions(arg0 interface{}) *gomock.Call { + return _mr.mock.ctrl.RecordCall(_mr.mock, "ThirdPartySvcDiscoveryCfgDaoTransactions", arg0) +} diff --git a/db/model/third_party_service.go b/db/model/third_party_service.go index 138610223..ba7f4ee8d 100644 --- a/db/model/third_party_service.go +++ b/db/model/third_party_service.go @@ -23,10 +23,10 @@ type Endpoint struct { Model UUID string `gorm:"column:uuid;size:32"` ServiceID string `gorm:"column:service_id;size:32;not null"` - IP string `gorm:"column:ip;not null"` - Port int `gorm:"column:port;size:65535"` + IP string `gorm:"column:ip;not null" json:"ip"` + Port int `gorm:"column:port;size:65535" json:"port"` //use pointer type, zero values won't be saved into database - IsOnline *bool `gorm:"column:is_online;default:true"` + IsOnline *bool `gorm:"column:is_online;default:true" json:"is_online"` } // TableName returns table name of Endpoint. @@ -34,10 +34,10 @@ func (Endpoint) TableName() string { return "tenant_service_3rd_party_endpoints" } -// ThirdPartyServiceDiscoveryCfg s a persistent object for table +// ThirdPartySvcDiscoveryCfg s a persistent object for table // 3rd_party_svc_discovery_cfg. 3rd_party_svc_discovery_cfg contains // service discovery center configuration for third party service. -type ThirdPartyServiceDiscoveryCfg struct { +type ThirdPartySvcDiscoveryCfg struct { Model ServiceID string `gorm:"column:service_id;size:32"` Type string `gorm:"column:type"` @@ -47,7 +47,7 @@ type ThirdPartyServiceDiscoveryCfg struct { Password string `gorm:"password"` } -// TableName returns table name of ThirdPartyServiceDiscoveryCfg. -func (ThirdPartyServiceDiscoveryCfg) TableName() string { +// TableName returns table name of ThirdPartySvcDiscoveryCfg. +func (ThirdPartySvcDiscoveryCfg) TableName() string { return "tenant_service_3rd_party_discovery_cfg" } diff --git a/db/mysql/dao/3rd_party.go b/db/mysql/dao/3rd_party.go index eed724ef7..4bcb816ba 100644 --- a/db/mysql/dao/3rd_party.go +++ b/db/mysql/dao/3rd_party.go @@ -23,8 +23,8 @@ import ( "reflect" "strings" - "github.com/jinzhu/gorm" "github.com/goodrain/rainbond/db/model" + "github.com/jinzhu/gorm" ) // EndpointDaoImpl implements EndpintDao @@ -71,7 +71,7 @@ func (e *EndpointDaoImpl) GetByUUID(uuid string) (*model.Endpoint, error) { } // List list all endpints matching the given serivce_id(sid). -func (e *EndpointDaoImpl) List(sid string) ([]*model.Endpoint, error){ +func (e *EndpointDaoImpl) List(sid string) ([]*model.Endpoint, error) { var eps []*model.Endpoint if err := e.DB.Where("service_id=?", sid).Find(&eps).Error; err != nil { if err == gorm.ErrRecordNotFound { @@ -82,6 +82,18 @@ func (e *EndpointDaoImpl) List(sid string) ([]*model.Endpoint, error){ return eps, nil } +// ListIsOnline lists *model.Endpoint according to sid, and filter out the ones that are not online. +func (e *EndpointDaoImpl) ListIsOnline(sid string) ([]*model.Endpoint, error) { + var eps []*model.Endpoint + if err := e.DB.Where("service_id=? and is_online=1", sid).Find(&eps).Error; err != nil { + if err == gorm.ErrRecordNotFound { + return nil, nil + } + return nil, err + } + return eps, nil +} + // DelByUUID deletes endpoints matching uuid. func (e *EndpointDaoImpl) DelByUUID(uuid string) error { if err := e.DB.Where("uuid=?", uuid).Delete(model.Endpoint{}).Error; err != nil { @@ -90,19 +102,19 @@ func (e *EndpointDaoImpl) DelByUUID(uuid string) error { return nil } -// ThirdPartyServiceDiscoveryCfgDaoImpl implements ThirdPartyServiceDiscoveryCfgDao -type ThirdPartyServiceDiscoveryCfgDaoImpl struct { +// ThirdPartySvcDiscoveryCfgDaoImpl implements ThirdPartySvcDiscoveryCfgDao +type ThirdPartySvcDiscoveryCfgDaoImpl struct { DB *gorm.DB } - + // AddModel add one record for table 3rd_party_svc_discovery_cfg. -func (t *ThirdPartyServiceDiscoveryCfgDaoImpl) AddModel(mo model.Interface) error { - cfg, ok := mo.(*model.ThirdPartyServiceDiscoveryCfg) +func (t *ThirdPartySvcDiscoveryCfgDaoImpl) AddModel(mo model.Interface) error { + cfg, ok := mo.(*model.ThirdPartySvcDiscoveryCfg) if !ok { - return fmt.Errorf("Type conversion error. From %s to *model.ThirdPartyServiceDiscoveryCfg", + return fmt.Errorf("Type conversion error. From %s to *model.ThirdPartySvcDiscoveryCfg", reflect.TypeOf(mo)) } - var old model.ThirdPartyServiceDiscoveryCfg + var old model.ThirdPartySvcDiscoveryCfg if ok := t.DB.Where("service_id=?", cfg.ServiceID).Find(&old).RecordNotFound(); ok { if err := t.DB.Create(cfg).Error; err != nil { return err @@ -114,6 +126,18 @@ func (t *ThirdPartyServiceDiscoveryCfgDaoImpl) AddModel(mo model.Interface) erro } // UpdateModel blabla -func (t *ThirdPartyServiceDiscoveryCfgDaoImpl) UpdateModel(mo model.Interface) error { +func (t *ThirdPartySvcDiscoveryCfgDaoImpl) UpdateModel(mo model.Interface) error { return nil -} \ No newline at end of file +} + +// GetByServiceID return third-party service discovery configuration according to service_id. +func (t *ThirdPartySvcDiscoveryCfgDaoImpl) GetByServiceID(sid string) (*model.ThirdPartySvcDiscoveryCfg, error) { + var cfg model.ThirdPartySvcDiscoveryCfg + if err := t.DB.Where("service_id=?", sid).Find(&cfg).Error; err != nil { + if err == gorm.ErrRecordNotFound { + return nil, nil + } + return nil, err + } + return &cfg, nil +} diff --git a/db/mysql/dao/tenants.go b/db/mysql/dao/tenants.go index d3b44eeed..cbe371e0b 100644 --- a/db/mysql/dao/tenants.go +++ b/db/mysql/dao/tenants.go @@ -426,6 +426,15 @@ func (t *TenantServicesDaoImpl) DeleteServiceByServiceID(serviceID string) error return nil } +// ListThirdPartyService lists all third party services +func (t *TenantServicesDaoImpl) ListThirdPartyServices() ([]*model.TenantServices, error) { + var res []*model.TenantServices + if err := t.DB.Where("kind=?", "third_party").Find(&res).Error; err != nil { + return nil, err + } + return res, nil +} + //TenantServicesDeleteImpl TenantServiceDeleteImpl type TenantServicesDeleteImpl struct { DB *gorm.DB @@ -569,6 +578,19 @@ func (t *TenantServicesPortDaoImpl) DELPortsByServiceID(serviceID string) error return nil } +// HasOpenPort checks if the given service(according to sid) has open port. +func (t *TenantServicesPortDaoImpl) HasOpenPort(sid string) bool { + var port model.TenantServicesPort + if err := t.DB.Where("service_id = ? and (is_outer_service=1 or is_inner_service=1)", sid). + Find(&port).Error; err != nil { + if err != gorm.ErrRecordNotFound { + logrus.Warningf("error getting TenantServicesPort: %v", err) + } + return false + } + return true +} + //TenantServiceRelationDaoImpl TenantServiceRelationDaoImpl type TenantServiceRelationDaoImpl struct { DB *gorm.DB diff --git a/db/mysql/dao_impl.go b/db/mysql/dao_impl.go index 71541faef..c00077e89 100644 --- a/db/mysql/dao_impl.go +++ b/db/mysql/dao_impl.go @@ -507,16 +507,16 @@ func (m *Manager) EndpointsDaoTransactions(db *gorm.DB) dao.EndpointsDao { } } -// ThirdPartyServiceDiscoveryCfgDao returns a new ThirdPartyServiceDiscoveryCfgDao. -func (m *Manager) ThirdPartyServiceDiscoveryCfgDao() dao.ThirdPartyServiceDiscoveryCfgDao { - return &mysqldao.ThirdPartyServiceDiscoveryCfgDaoImpl{ +// ThirdPartySvcDiscoveryCfgDao returns a new ThirdPartySvcDiscoveryCfgDao. +func (m *Manager) ThirdPartySvcDiscoveryCfgDao() dao.ThirdPartySvcDiscoveryCfgDao { + return &mysqldao.ThirdPartySvcDiscoveryCfgDaoImpl{ DB: m.db, } } -// ThirdPartyServiceDiscoveryCfgDaoTransactions returns a new ThirdPartyServiceDiscoveryCfgDao. -func (m *Manager) ThirdPartyServiceDiscoveryCfgDaoTransactions(db *gorm.DB) dao.ThirdPartyServiceDiscoveryCfgDao { - return &mysqldao.ThirdPartyServiceDiscoveryCfgDaoImpl{ +// ThirdPartySvcDiscoveryCfgDaoTransactions returns a new ThirdPartySvcDiscoveryCfgDao. +func (m *Manager) ThirdPartySvcDiscoveryCfgDaoTransactions(db *gorm.DB) dao.ThirdPartySvcDiscoveryCfgDao { + return &mysqldao.ThirdPartySvcDiscoveryCfgDaoImpl{ DB: db, } } \ No newline at end of file diff --git a/db/mysql/mysql.go b/db/mysql/mysql.go index 5ffc5b51e..072b61706 100644 --- a/db/mysql/mysql.go +++ b/db/mysql/mysql.go @@ -131,7 +131,7 @@ func (m *Manager) RegisterTableModel() { m.models = append(m.models, &model.IPPool{}) m.models = append(m.models, &model.TenantServiceConfigFile{}) m.models = append(m.models, &model.Endpoint{}) - m.models = append(m.models, &model.ThirdPartyServiceDiscoveryCfg{}) + m.models = append(m.models, &model.ThirdPartySvcDiscoveryCfg{}) } //CheckTable check and create tables diff --git a/db/tenant_test.go b/db/tenant_test.go new file mode 100644 index 000000000..87fedb7bb --- /dev/null +++ b/db/tenant_test.go @@ -0,0 +1,207 @@ +// RAINBOND, Application Management Platform +// Copyright (C) 2014-2017 Goodrain Co., Ltd. + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. For any non-GPL usage of Rainbond, +// one or multiple Commercial Licenses authorized by Goodrain Co., Ltd. +// must be obtained first. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +package db + +import ( + "context" + "fmt" + dbconfig "github.com/goodrain/rainbond/db/config" + "github.com/goodrain/rainbond/db/model" + "github.com/goodrain/rainbond/util" + "github.com/testcontainers/testcontainers-go" + "testing" + "time" +) + +func TestTenantServicesDao_ListThirdPartyServices(t *testing.T) { + dbname := "region" + rootpw := "rainbond" + + ctx := context.Background() + req := testcontainers.ContainerRequest{ + Image: "mariadb", + ExposedPorts: []string{"3306/tcp"}, + Env: map[string]string{ + "MYSQL_ROOT_PASSWORD": rootpw, + "MYSQL_DATABASE": dbname, + }, + Cmd: "--character-set-server=utf8mb4 --collation-server=utf8mb4_unicode_ci", + } + mariadb, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ + ContainerRequest: req, + Started: true, + }) + if err != nil { + t.Fatal(err) + } + defer mariadb.Terminate(ctx) + + host, err := mariadb.Host(ctx) + if err != nil { + t.Error(err) + } + port, err := mariadb.MappedPort(ctx, "3306") + if err != nil { + t.Error(err) + } + + connInfo := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s", "root", + rootpw, host, port.Int(), dbname) + tryTimes := 3 + for { + if err := CreateManager(dbconfig.Config{ + DBType: "mysql", + MysqlConnectionInfo: connInfo, + }); err != nil { + if tryTimes == 0 { + t.Fatalf("Connect info: %s; error creating db manager: %v", connInfo, err) + } else { + tryTimes = tryTimes - 1 + time.Sleep(10 * time.Second) + continue + } + } + break + } + + svcs, err := GetManager().TenantServiceDao().ListThirdPartyServices() + if err != nil { + t.Fatalf("error listing third-party service: %v", err) + } + if len(svcs) != 0 { + t.Errorf("Expected 0 for the length of third-party services, but returned %d", len(svcs)) + } + + for i:=0; i < 3; i++ { + item1 := &model.TenantServices{ + TenantID: util.NewUUID(), + ServiceID: util.NewUUID(), + Kind: "third_party", + } + if err = GetManager().TenantServiceDao().AddModel(item1); err != nil { + t.Fatalf("error create third-party service: %v", err) + } + } + svcs, err = GetManager().TenantServiceDao().ListThirdPartyServices() + if err != nil { + t.Fatalf("error listing third-party service: %v", err) + } + if len(svcs) != 3 { + t.Errorf("Expected 3 for the length of third-party services, but returned %d", len(svcs)) + } +} + +func TestTenantServicesPortDao_HasOpenPort(t *testing.T) { + dbname := "region" + rootpw := "rainbond" + + ctx := context.Background() + req := testcontainers.ContainerRequest{ + Image: "mariadb", + ExposedPorts: []string{"3306/tcp"}, + Env: map[string]string{ + "MYSQL_ROOT_PASSWORD": rootpw, + "MYSQL_DATABASE": dbname, + }, + Cmd: "--character-set-server=utf8mb4 --collation-server=utf8mb4_unicode_ci", + } + mariadb, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ + ContainerRequest: req, + Started: true, + }) + if err != nil { + t.Fatal(err) + } + defer mariadb.Terminate(ctx) + + host, err := mariadb.Host(ctx) + if err != nil { + t.Error(err) + } + port, err := mariadb.MappedPort(ctx, "3306") + if err != nil { + t.Error(err) + } + + connInfo := fmt.Sprintf("%s:%s@tcp(%s:%d)/%s", "root", + rootpw, host, port.Int(), dbname) + tryTimes := 3 + for { + if err := CreateManager(dbconfig.Config{ + DBType: "mysql", + MysqlConnectionInfo: connInfo, + }); err != nil { + if tryTimes == 0 { + t.Fatalf("Connect info: %s; error creating db manager: %v", connInfo, err) + } else { + tryTimes = tryTimes - 1 + time.Sleep(10 * time.Second) + continue + } + } + break + } + + t.Run("service doesn't exist", func(t *testing.T) { + hasOpenPort := GetManager().TenantServicesPortDao().HasOpenPort("foobar") + if hasOpenPort { + t.Error("Expected false for hasOpenPort, but returned true") + } + }) + t.Run("outer service", func(t *testing.T) { + port := &model.TenantServicesPort{ + ServiceID: util.NewUUID(), + IsOuterService: true, + } + if err := GetManager().TenantServicesPortDao().AddModel(port); err != nil { + t.Fatalf("error creating TenantServicesPort: %v", err) + } + hasOpenPort := GetManager().TenantServicesPortDao().HasOpenPort(port.ServiceID) + if !hasOpenPort { + t.Errorf("Expected true for hasOpenPort, but returned %v", hasOpenPort) + } + }) + t.Run("inner service", func(t *testing.T) { + port := &model.TenantServicesPort{ + ServiceID: util.NewUUID(), + IsInnerService: true, + } + if err := GetManager().TenantServicesPortDao().AddModel(port); err != nil { + t.Fatalf("error creating TenantServicesPort: %v", err) + } + hasOpenPort := GetManager().TenantServicesPortDao().HasOpenPort(port.ServiceID) + if !hasOpenPort { + t.Errorf("Expected true for hasOpenPort, but returned %v", hasOpenPort) + } + }) + t.Run("not inner or outer service", func(t *testing.T) { + port := &model.TenantServicesPort{ + ServiceID: util.NewUUID(), + IsInnerService: false, + IsOuterService: false, + } + if err := GetManager().TenantServicesPortDao().AddModel(port); err != nil { + t.Fatalf("error creating TenantServicesPort: %v", err) + } + hasOpenPort := GetManager().TenantServicesPortDao().HasOpenPort(port.ServiceID) + if hasOpenPort { + t.Errorf("Expected false for hasOpenPort, but returned %v", hasOpenPort) + } + }) +} diff --git a/worker/appm/controller/apply_rule.go b/worker/appm/controller/apply_rule.go index df3779f81..ae4af314a 100644 --- a/worker/appm/controller/apply_rule.go +++ b/worker/appm/controller/apply_rule.go @@ -68,6 +68,10 @@ func (a *applyRuleController) applyOne(app *v1.AppService) error { ensureSecret(secret, a.manager.client) } // update ingress + for _, ep := range app.GetEndpoints() { + ensureEndpoints(ep, a.manager.client) + } + // update ingress for _, ing := range app.GetIngress() { ensureIngress(ing, a.manager.client) } @@ -87,6 +91,22 @@ func (a *applyRuleController) applyOne(app *v1.AppService) error { logrus.Warningf("error deleting secret(%v): %v", secret, err) } } + // delete delEndpoints + for _, ep := range app.GetDelEndpoints() { + err := a.manager.client.CoreV1().Endpoints(ep.Namespace).Delete(ep.Name, &metav1.DeleteOptions{}) + if err != nil { + // don't return error, hope it is ok next time + logrus.Warningf("error deleting endpoints(%v): %v", ep, err) + } + } + // delete delServices + for _, svc := range app.GetDelServices() { + err := a.manager.client.CoreV1().Services(svc.Namespace).Delete(svc.Name, &metav1.DeleteOptions{}) + if err != nil { + // don't return error, hope it is ok next time + logrus.Warningf("error deleting service(%v): %v", svc, err) + } + } return nil } @@ -143,3 +163,19 @@ func ensureSecret(secret *corev1.Secret, clientSet kubernetes.Interface) { logrus.Warningf("error updating secret %+v: %v", secret, err) } } + +func ensureEndpoints(ep *corev1.Endpoints, clientSet kubernetes.Interface) { + _, err := clientSet.CoreV1().Endpoints(ep.Namespace).Update(ep) + + if err != nil { + if k8sErrors.IsNotFound(err) { + _, err := clientSet.CoreV1().Endpoints(ep.Namespace).Create(ep) + if err != nil { + logrus.Warningf("error creating endpoints %+v: %v", ep, err) + } + return + } + + logrus.Warningf("error updating endpoints %+v: %v", ep, err) + } +} diff --git a/worker/appm/conversion/conversion.go b/worker/appm/conversion/conversion.go index ece325513..6818c9ea4 100644 --- a/worker/appm/conversion/conversion.go +++ b/worker/appm/conversion/conversion.go @@ -21,7 +21,7 @@ package conversion import ( "github.com/goodrain/rainbond/db" "github.com/goodrain/rainbond/util" - v1 "github.com/goodrain/rainbond/worker/appm/types/v1" + "github.com/goodrain/rainbond/worker/appm/types/v1" ) func init() { @@ -73,17 +73,27 @@ func InitAppService(dbmanager db.Manager, serviceID string, enableConversionList //InitCacheAppService init cache app service. //if store manager receive a kube model belong with service and not find in store,will create -func InitCacheAppService(dbmanager db.Manager, serviceID, version, createrID string) (*v1.AppService, error) { +func InitCacheAppService(dbm db.Manager, serviceID, creatorID string) (*v1.AppService, error) { appService := &v1.AppService{ AppServiceBase: v1.AppServiceBase{ ServiceID: serviceID, - CreaterID: createrID, + CreaterID: creatorID, ExtensionSet: make(map[string]string), }, UpgradePatch: make(map[string][]byte, 2), } - if err := TenantServiceBase(appService, dbmanager); err != nil { + if err := TenantServiceBase(appService, dbm); err != nil { return nil, err } + svc, err := dbm.TenantServiceDao().GetServiceByID(serviceID) + if err != nil { + return nil, err + } + if svc.Kind == "third_party" { + if err := TenantServiceRegist(appService, dbm); err != nil { + return nil, err + } + } + return appService, nil } diff --git a/worker/appm/conversion/conversion_test.go b/worker/appm/conversion/conversion_test.go index e712775e3..407fb3da3 100644 --- a/worker/appm/conversion/conversion_test.go +++ b/worker/appm/conversion/conversion_test.go @@ -19,19 +19,45 @@ package conversion import ( - "fmt" + "github.com/goodrain/rainbond/db" + "github.com/goodrain/rainbond/db/dao" + "github.com/goodrain/rainbond/db/model" + "github.com/goodrain/rainbond/util" + "github.com/goodrain/rainbond/worker/appm/types/v1" + "github.com/rafrombrc/gomock/gomock" "testing" - "time" ) -func TestInitAppService(t *testing.T) { +func TestTenantServiceBase(t *testing.T) { + t.Run("third-party service", func(t *testing.T) { + as := &v1.AppService{} + as.ServiceID = util.NewUUID() + as.TenantID = util.NewUUID() + as.TenantName = "abcdefg" -} + ctrl := gomock.NewController(t) + defer ctrl.Finish() -func TestTemp(t *testing.T) { - createTime, err := time.Parse(time.RFC3339, "2018-10-22T14:14:12Z") - if err != nil { - t.Errorf("error: %v", err) - } - fmt.Println(createTime) + dbm := db.NewMockManager(ctrl) + // TenantServiceDao + tenantServiceDao := dao.NewMockTenantServiceDao(ctrl) + tenantService := &model.TenantServices{ + TenantID: as.TenantID, + ServiceID: as.ServiceID, + Kind: "third_party", + } + tenantServiceDao.EXPECT().GetServiceByID(as.ServiceID).Return(tenantService, nil) + dbm.EXPECT().TenantServiceDao().Return(tenantServiceDao) + // TenantDao + tenantDao := dao.NewMockTenantDao(ctrl) + tenant := &model.Tenants{ + UUID: as.TenantID, + Name: as.TenantName, + } + tenantDao.EXPECT().GetTenantByUUID(as.TenantID).Return(tenant, nil) + dbm.EXPECT().TenantDao().Return(tenantDao) + if err := TenantServiceBase(as, dbm); err != nil { + t.Errorf("Unexpected error: %v", err) + } + }) } diff --git a/worker/appm/conversion/errors.go b/worker/appm/conversion/errors.go index f430a5fda..9f0fdcafb 100644 --- a/worker/appm/conversion/errors.go +++ b/worker/appm/conversion/errors.go @@ -20,5 +20,5 @@ package conversion import "errors" -//ErrorNotFoundService error not found -var ErrorNotFoundService = errors.New("service not found") +//ErrServiceNotFound error not found +var ErrServiceNotFound = errors.New("service not found") diff --git a/worker/appm/conversion/gateway.go b/worker/appm/conversion/gateway.go index 572866662..90b583517 100644 --- a/worker/appm/conversion/gateway.go +++ b/worker/appm/conversion/gateway.go @@ -20,11 +20,12 @@ package conversion import ( "fmt" + "github.com/goodrain/rainbond/worker/appm/thirdparty" "os" "strings" "github.com/goodrain/rainbond/util" - v1 "github.com/goodrain/rainbond/worker/appm/types/v1" + "github.com/goodrain/rainbond/worker/appm/types/v1" "github.com/jinzhu/gorm" "github.com/Sirupsen/logrus" @@ -146,15 +147,25 @@ func (a *AppServiceBuild) Build() ([]*corev1.Service, []*extensions.Ingress, []* var services []*corev1.Service var ingresses []*extensions.Ingress var secrets []*corev1.Secret + var endpoints []*corev1.Endpoints if ports != nil && len(ports) > 0 { for i := range ports { port := ports[i] + var v1eps []*v1.Endpoint + if !port.IsOuterService && !port.IsInnerService { + v1eps, err = thirdparty.ListEndpoints(a.serviceID, a.dbmanager) + return nil, nil, nil, err + } if port.IsInnerService { services = append(services, a.createInnerService(port)) + if a.service.Kind == "third_party" { + // ignore services other than third_party + endpoints = append(endpoints, a.createEndpoints(port, v1eps, true)...) + } } if port.IsOuterService { service := a.createOuterService(port) - + services = append(services, service) ings, secret, err := a.ApplyRules(port, service) if err != nil { return nil, nil, nil, err @@ -163,8 +174,10 @@ func (a *AppServiceBuild) Build() ([]*corev1.Service, []*extensions.Ingress, []* if secret != nil { secrets = append(secrets, secret) } - - services = append(services, service) + if a.service.Kind == "third_party" { + // ignore services other than third_party + endpoints = append(endpoints, a.createEndpoints(port, v1eps, false)...) + } } } } @@ -530,6 +543,49 @@ func (a *AppServiceBuild) createOuterService(port *model.TenantServicesPort) *co return &service } +func (a *AppServiceBuild) createEndpoints(port *model.TenantServicesPort, v1eps []*v1.Endpoint, isInner bool) []*corev1.Endpoints { + var res []*corev1.Endpoints + for _, item := range v1eps { + ep := corev1.Endpoints{} + ep.Namespace = a.tenant.UUID + ep.Name = util.NewUUID() // TODO: consider a better name??? + if isInner { + ep.Labels = a.appService.GetCommonLabels(map[string]string{ + "name": a.service.ServiceAlias + "Service", + }) + } else { + ep.Labels = a.appService.GetCommonLabels(map[string]string{ + "name": a.service.ServiceAlias + "ServiceOut", + }) + } + + epport := func(targetPort int, realPort int) int32 { + if realPort == 0 { + return int32(targetPort) + } + return int32(realPort) + } + subset := corev1.EndpointSubset{ + Ports: []corev1.EndpointPort{ + { + Port: epport(port.ContainerPort, item.Port), + }, + }, + } + for _, ip := range item.IPs { + address := corev1.EndpointAddress{ + IP: ip, + } + subset.Addresses = append(subset.Addresses, address) + } + ep.Subsets = []corev1.EndpointSubset{ + subset, + } + res = append(res, &ep) + } + return res +} + func (a *AppServiceBuild) createStatefulService(ports []*model.TenantServicesPort) *corev1.Service { var service corev1.Service service.Name = a.service.ServiceName diff --git a/worker/appm/conversion/gateway_test.go b/worker/appm/conversion/gateway_test.go index 4e5f32d9e..ae8231d25 100644 --- a/worker/appm/conversion/gateway_test.go +++ b/worker/appm/conversion/gateway_test.go @@ -20,6 +20,7 @@ package conversion import ( "fmt" + "github.com/goodrain/rainbond/gateway/annotations/parser" "github.com/goodrain/rainbond/db" "github.com/goodrain/rainbond/db/dao" @@ -93,122 +94,170 @@ QZ+yDlTdRpvoEP2mzW2cZA== ) func TestApplyTcpRule(t *testing.T) { - //testCase := map[string]string{ - // "namespace": "e8539a9c33fd123456789e26d2bca431", - // parser.GetAnnotationWithPrefix("l4-enable"): "true", - // parser.GetAnnotationWithPrefix("l4-host"): "127.0.0.1", - // parser.GetAnnotationWithPrefix("l4-port"): "32145", - // "serviceName": "default-svc", - // "containerPort": "10000", - //} - // - //serviceID := "43eaae441859eda35b02075d37d83589" - //containerPort, err := strconv.Atoi(testCase["containerPort"]) - //if err != nil { - // t.Errorf("Can not convert %s(string) to int: %v", testCase["containerPort"], err) - //} - //port := &model.TenantServicesPort{ - // TenantID: testCase["namespace"], - // ServiceID: serviceID, - // ContainerPort: containerPort, - // Protocol: "http", - // PortAlias: "GRD835895000", - // IsInnerService: false, - // IsOuterService: true, - //} - // - //service := &corev1.Service{ - // ObjectMeta: metav1.ObjectMeta{ - // Name: testCase["serviceName"], - // Namespace: testCase["namespace"], - // }, - // Spec: corev1.ServiceSpec{ - // Ports: []corev1.ServicePort{ - // { - // Name: "service-port", - // Port: int32(containerPort), - // TargetPort: intstr.Parse(testCase["containerPort"]), - // }, - // }, - // Selector: map[string]string{ - // "tier": "default", - // }, - // }, - //} - // - //externalPort, err := strconv.Atoi(testCase[parser.GetAnnotationWithPrefix("l4-port")]) - //if err != nil { - // t.Errorf("Can not convert %s(string) to int: %v", - // testCase[parser.GetAnnotationWithPrefix("l4-port")], err) - //} - //tcpRule := &model.TCPRule{ - // UUID: "default", - // ServiceID: serviceID, - // ContainerPort: port.ContainerPort, - // IP: testCase[parser.GetAnnotationWithPrefix("l4-host")], - // Port: externalPort, - //} - // - //ing, err := applyTCPRule(tcpRule, service, testCase["namespace"]) - //if err != nil { - // t.Errorf("Unexpected error occurred while applying stream rule: %v", err) - //} - // - //if ing.Namespace != testCase["namespace"] { - // t.Errorf("Expected %s for namespace but returned %s", testCase["namespace"], ing.Namespace) - //} - //if ing.Annotations[parser.GetAnnotationWithPrefix("l4-enable")] != - // testCase[parser.GetAnnotationWithPrefix("l4-enable")] { - // t.Errorf("Expected %s for annotations[%s] but returned %s", - // testCase[parser.GetAnnotationWithPrefix("l4-enable")], - // parser.GetAnnotationWithPrefix("l4-enable"), - // ing.Annotations[parser.GetAnnotationWithPrefix("l4-enable")]) - //} - //if ing.Annotations[parser.GetAnnotationWithPrefix("l4-host")] != - // testCase[parser.GetAnnotationWithPrefix("l4-host")] { - // t.Errorf("Expected %s for annotations[%s] but returned %s", - // testCase[parser.GetAnnotationWithPrefix("l4-host")], - // parser.GetAnnotationWithPrefix("l4-host"), - // ing.Annotations[parser.GetAnnotationWithPrefix("l4-host")]) - //} - //if ing.Annotations[parser.GetAnnotationWithPrefix("l4-port")] != - // testCase[parser.GetAnnotationWithPrefix("l4-port")] { - // t.Errorf("Expected %s for annotations[%s] but returned %s", - // testCase[parser.GetAnnotationWithPrefix("l4-port")], - // parser.GetAnnotationWithPrefix("l4-port"), - // ing.Annotations[parser.GetAnnotationWithPrefix("l4-port")]) - //} - //if ing.Spec.Backend.ServiceName != testCase["serviceName"] { - // t.Errorf("Expected %s for ServiceName but returned %s", testCase["serviceName"], - // ing.Spec.Backend.ServiceName) - //} - //if ing.Spec.Backend.ServicePort.IntVal != int32(containerPort) { - // t.Errorf("Expected %v for ServicePort but returned %v", containerPort, - // ing.Spec.Backend.ServicePort) - //} - // - //// create k8s resources - //c, err := clientcmd.BuildConfigFromFlags("", "/Users/abe/go/src/github.com/goodrain/rainbond/test/admin.kubeconfig") - //if err != nil { - // t.Fatalf("read kube config file error: %v", err) - //} - //clientSet, err := kubernetes.NewForConfig(c) - //if err != nil { - // t.Fatalf("create kube api client error: %v", err) - //} - //if _, err := clientSet.CoreV1().Namespaces().Create(&corev1.Namespace{ - // ObjectMeta: metav1.ObjectMeta{ - // Name: testCase["namespace"], - // }, - //}); err != nil { - // t.Errorf("Can't create Namespace(%s): %v", testCase["namespace"], err) - //} - //if _, err := clientSet.ExtensionsV1beta1().Ingresses(ing.Namespace).Create(ing); err != nil { - // t.Errorf("Can't create Ingress(%s): %v", ing.Name, err) - //} - //if err := clientSet.CoreV1().Namespaces().Delete(testCase["namespace"], &metav1.DeleteOptions{}); err != nil { - // t.Errorf("Can't delete namespace(%s)", testCase["namespace"]) - //} + testCase := map[string]string{ + "namespace": "e8539a9c33fd123456789e26d2bca431", + parser.GetAnnotationWithPrefix("l4-enable"): "true", + parser.GetAnnotationWithPrefix("l4-host"): "127.0.0.1", + parser.GetAnnotationWithPrefix("l4-port"): "32145", + "serviceName": "default-svc", + "containerPort": "10000", + } + + serviceID := "43eaae441859eda35b02075d37d83589" + containerPort, err := strconv.Atoi(testCase["containerPort"]) + if err != nil { + t.Errorf("Can not convert %s(string) to int: %v", testCase["containerPort"], err) + } + port := &model.TenantServicesPort{ + TenantID: testCase["namespace"], + ServiceID: serviceID, + ContainerPort: containerPort, + Protocol: "http", + PortAlias: "GRD835895000", + IsInnerService: false, + IsOuterService: true, + } + + service := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: testCase["serviceName"], + Namespace: testCase["namespace"], + }, + Spec: corev1.ServiceSpec{ + Ports: []corev1.ServicePort{ + { + Name: "service-port", + Port: int32(containerPort), + TargetPort: intstr.Parse(testCase["containerPort"]), + }, + }, + Selector: map[string]string{ + "tier": "default", + }, + }, + } + + externalPort, err := strconv.Atoi(testCase[parser.GetAnnotationWithPrefix("l4-port")]) + if err != nil { + t.Errorf("Can not convert %s(string) to int: %v", + testCase[parser.GetAnnotationWithPrefix("l4-port")], err) + } + tcpRule := &model.TCPRule{ + UUID: "default", + ServiceID: serviceID, + ContainerPort: port.ContainerPort, + IP: testCase[parser.GetAnnotationWithPrefix("l4-host")], + Port: externalPort, + } + + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + dbmanager := db.NewMockManager(ctrl) + + serviceDao := dao.NewMockTenantServiceDao(ctrl) + updateTime, _ := time.Parse(time.RFC3339, "2018-10-22T14:14:12Z") + services := &model.TenantServices{ + TenantID: testCase["namespace"], + ServiceID: serviceID, + ServiceKey: "application", + ServiceAlias: "grd83589", + Comment: "application info", + ContainerCPU: 20, + ContainerMemory: 128, + ExtendMethod: "stateless", + Replicas: 1, + DeployVersion: "20181022200709", + Category: "application", + CurStatus: "undeploy", + Status: 0, + Namespace: "goodrain", + UpdateTime: updateTime, + ServiceOrigin: "assistant", + } + serviceDao.EXPECT().GetServiceByID(serviceID).Return(services, nil) + dbmanager.EXPECT().TenantServiceDao().Return(serviceDao) + + tenantDao := dao.NewMockTenantDao(ctrl) + tenant := &model.Tenants{ + Name: "0enb7gyx", + UUID: testCase["namespace"], + EID: "214ec4d212582eb36a84cc180aad2783", + } + tenantDao.EXPECT().GetTenantByUUID(services.TenantID).Return(tenant, nil) + dbmanager.EXPECT().TenantDao().Return(tenantDao) + + appService := &v1.AppService{} + appService.ServiceID = serviceID + appService.CreaterID = "Rainbond" + appService.TenantID = testCase["namespace"] + + replicationType := v1.TypeDeployment + build, err := AppServiceBuilder(serviceID, string(replicationType), dbmanager, appService) + if err != nil { + t.Errorf("Unexpected occurred while creating AppServiceBuild: %v", err) + } + + ing, err := build.applyTCPRule(tcpRule, service, testCase["namespace"]) + if err != nil { + t.Errorf("Unexpected error occurred while applying stream rule: %v", err) + } + + if ing.Namespace != testCase["namespace"] { + t.Errorf("Expected %s for namespace but returned %s", testCase["namespace"], ing.Namespace) + } + if ing.Annotations[parser.GetAnnotationWithPrefix("l4-enable")] != + testCase[parser.GetAnnotationWithPrefix("l4-enable")] { + t.Errorf("Expected %s for annotations[%s] but returned %s", + testCase[parser.GetAnnotationWithPrefix("l4-enable")], + parser.GetAnnotationWithPrefix("l4-enable"), + ing.Annotations[parser.GetAnnotationWithPrefix("l4-enable")]) + } + if ing.Annotations[parser.GetAnnotationWithPrefix("l4-host")] != + testCase[parser.GetAnnotationWithPrefix("l4-host")] { + t.Errorf("Expected %s for annotations[%s] but returned %s", + testCase[parser.GetAnnotationWithPrefix("l4-host")], + parser.GetAnnotationWithPrefix("l4-host"), + ing.Annotations[parser.GetAnnotationWithPrefix("l4-host")]) + } + if ing.Annotations[parser.GetAnnotationWithPrefix("l4-port")] != + testCase[parser.GetAnnotationWithPrefix("l4-port")] { + t.Errorf("Expected %s for annotations[%s] but returned %s", + testCase[parser.GetAnnotationWithPrefix("l4-port")], + parser.GetAnnotationWithPrefix("l4-port"), + ing.Annotations[parser.GetAnnotationWithPrefix("l4-port")]) + } + if ing.Spec.Backend.ServiceName != testCase["serviceName"] { + t.Errorf("Expected %s for ServiceName but returned %s", testCase["serviceName"], + ing.Spec.Backend.ServiceName) + } + if ing.Spec.Backend.ServicePort.IntVal != int32(containerPort) { + t.Errorf("Expected %v for ServicePort but returned %v", containerPort, + ing.Spec.Backend.ServicePort) + } + + // create k8s resources + c, err := clientcmd.BuildConfigFromFlags("", "/Users/abe/go/src/github.com/goodrain/rainbond/test/admin.kubeconfig") + if err != nil { + t.Fatalf("read kube config file error: %v", err) + } + clientSet, err := kubernetes.NewForConfig(c) + if err != nil { + t.Fatalf("create kube api client error: %v", err) + } + if _, err := clientSet.CoreV1().Namespaces().Create(&corev1.Namespace{ + ObjectMeta: metav1.ObjectMeta{ + Name: testCase["namespace"], + }, + }); err != nil { + t.Errorf("Can't create Namespace(%s): %v", testCase["namespace"], err) + } + if _, err := clientSet.ExtensionsV1beta1().Ingresses(ing.Namespace).Create(ing); err != nil { + t.Errorf("Can't create Ingress(%s): %v", ing.Name, err) + } + if err := clientSet.CoreV1().Namespaces().Delete(testCase["namespace"], &metav1.DeleteOptions{}); err != nil { + t.Errorf("Can't delete namespace(%s)", testCase["namespace"]) + } } func TestAppServiceBuild_ApplyHttpRule(t *testing.T) { diff --git a/worker/appm/conversion/service.go b/worker/appm/conversion/service.go index 5a5c3b93f..0b4a50e49 100644 --- a/worker/appm/conversion/service.go +++ b/worker/appm/conversion/service.go @@ -82,22 +82,15 @@ func TenantServiceBase(as *v1.AppService, dbmanager db.Manager) error { tenantService, err := dbmanager.TenantServiceDao().GetServiceByID(as.ServiceID) if err != nil { if err == gorm.ErrRecordNotFound { - return ErrorNotFoundService + return ErrServiceNotFound } return fmt.Errorf("error getting service base info by serviceID(%s) %s", as.ServiceID, err.Error()) } + as.ServiceKind = tenantService.Kind tenant, err := dbmanager.TenantDao().GetTenantByUUID(tenantService.TenantID) if err != nil { return fmt.Errorf("get tenant info failure %s", err.Error()) } - serviceType, err := dbmanager.TenantServiceLabelDao().GetTenantServiceTypeLabel(as.ServiceID) - if err != nil { - return fmt.Errorf("get service type info failure %s", err.Error()) - } - label, err := dbmanager.TenantServiceLabelDao().GetLabelByNodeSelectorKey(as.ServiceID, "windows") - if label != nil { - as.IsWindowsService = true - } as.TenantID = tenantService.TenantID if as.DeployVersion == "" { as.DeployVersion = tenantService.DeployVersion @@ -114,6 +107,17 @@ func TenantServiceBase(as *v1.AppService, dbmanager db.Manager) error { if err := initTenant(as, tenant); err != nil { return fmt.Errorf("conversion tenant info failure %s", err.Error()) } + if tenantService.Kind == "third_party" { + return nil + } + serviceType, err := dbmanager.TenantServiceLabelDao().GetTenantServiceTypeLabel(as.ServiceID) + if err != nil { + return fmt.Errorf("get service type info failure %s", err.Error()) + } + label, err := dbmanager.TenantServiceLabelDao().GetLabelByNodeSelectorKey(as.ServiceID, "windows") + if label != nil { + as.IsWindowsService = true + } if serviceType == nil || serviceType.LabelValue == util.StatelessServiceType { initBaseDeployment(as, tenantService) return nil @@ -124,6 +128,7 @@ func TenantServiceBase(as *v1.AppService, dbmanager db.Manager) error { } return fmt.Errorf("do not decision build type for service %s", as.ServiceAlias) } + func initTenant(as *v1.AppService, tenant *dbmodel.Tenants) error { if tenant == nil || tenant.UUID == "" { return fmt.Errorf("tenant is invalid") diff --git a/worker/appm/store/store.go b/worker/appm/store/store.go index a70d2b8d8..f5e0d766f 100644 --- a/worker/appm/store/store.go +++ b/worker/appm/store/store.go @@ -31,7 +31,7 @@ import ( "github.com/goodrain/rainbond/db" "github.com/goodrain/rainbond/db/model" "github.com/goodrain/rainbond/worker/appm/conversion" - v1 "github.com/goodrain/rainbond/worker/appm/types/v1" + "github.com/goodrain/rainbond/worker/appm/types/v1" "github.com/jinzhu/gorm" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" @@ -139,6 +139,7 @@ func (a *appRuntimeStore) init() error { return err } } + // init third-party service return a.initStorageclass() } @@ -146,6 +147,9 @@ func (a *appRuntimeStore) Start() error { if err := a.init(); err != nil { return err } + if err := a.initThirdPartyService(); err != nil { + return fmt.Errorf("error initiating third-party services: %v", err) + } stopch := make(chan struct{}) a.informers.Start(stopch) a.stopch = stopch @@ -173,6 +177,7 @@ func (a *appRuntimeStore) checkReplicasetWhetherDelete(app *v1.AppService, rs *a } } } + func (a *appRuntimeStore) OnAdd(obj interface{}) { if deployment, ok := obj.(*appsv1.Deployment); ok { serviceID := deployment.Labels["service_id"] @@ -180,7 +185,7 @@ func (a *appRuntimeStore) OnAdd(obj interface{}) { createrID := deployment.Labels["creater_id"] if serviceID != "" && version != "" && createrID != "" { appservice, err := a.getAppService(serviceID, version, createrID, true) - if err == conversion.ErrorNotFoundService { + if err == conversion.ErrServiceNotFound { a.conf.KubeClient.AppsV1().Deployments(deployment.Namespace).Delete(deployment.Name, &metav1.DeleteOptions{}) } if appservice != nil { @@ -195,7 +200,7 @@ func (a *appRuntimeStore) OnAdd(obj interface{}) { createrID := statefulset.Labels["creater_id"] if serviceID != "" && version != "" && createrID != "" { appservice, err := a.getAppService(serviceID, version, createrID, true) - if err == conversion.ErrorNotFoundService { + if err == conversion.ErrServiceNotFound { a.conf.KubeClient.AppsV1().StatefulSets(statefulset.Namespace).Delete(statefulset.Name, &metav1.DeleteOptions{}) } if appservice != nil { @@ -210,7 +215,7 @@ func (a *appRuntimeStore) OnAdd(obj interface{}) { createrID := replicaset.Labels["creater_id"] if serviceID != "" && version != "" && createrID != "" { appservice, err := a.getAppService(serviceID, version, createrID, true) - if err == conversion.ErrorNotFoundService { + if err == conversion.ErrServiceNotFound { a.conf.KubeClient.AppsV1().Deployments(replicaset.Namespace).Delete(replicaset.Name, &metav1.DeleteOptions{}) } if appservice != nil { @@ -226,7 +231,7 @@ func (a *appRuntimeStore) OnAdd(obj interface{}) { createrID := pod.Labels["creater_id"] if serviceID != "" && version != "" && createrID != "" { appservice, err := a.getAppService(serviceID, version, createrID, true) - if err == conversion.ErrorNotFoundService { + if err == conversion.ErrServiceNotFound { a.conf.KubeClient.CoreV1().Pods(pod.Namespace).Delete(pod.Name, &metav1.DeleteOptions{}) } if appservice != nil { @@ -242,7 +247,7 @@ func (a *appRuntimeStore) OnAdd(obj interface{}) { createrID := secret.Labels["creater_id"] if serviceID != "" && createrID != "" { appservice, err := a.getAppService(serviceID, version, createrID, true) - if err == conversion.ErrorNotFoundService { + if err == conversion.ErrServiceNotFound { a.conf.KubeClient.CoreV1().Secrets(secret.Namespace).Delete(secret.Name, &metav1.DeleteOptions{}) } if appservice != nil { @@ -257,7 +262,7 @@ func (a *appRuntimeStore) OnAdd(obj interface{}) { createrID := service.Labels["creater_id"] if serviceID != "" && createrID != "" { appservice, err := a.getAppService(serviceID, version, createrID, true) - if err == conversion.ErrorNotFoundService { + if err == conversion.ErrServiceNotFound { a.conf.KubeClient.CoreV1().Services(service.Namespace).Delete(service.Name, &metav1.DeleteOptions{}) } if appservice != nil { @@ -272,7 +277,7 @@ func (a *appRuntimeStore) OnAdd(obj interface{}) { createrID := ingress.Labels["creater_id"] if serviceID != "" && createrID != "" { appservice, err := a.getAppService(serviceID, version, createrID, true) - if err == conversion.ErrorNotFoundService { + if err == conversion.ErrServiceNotFound { a.conf.KubeClient.Extensions().Ingresses(ingress.Namespace).Delete(ingress.Name, &metav1.DeleteOptions{}) } if appservice != nil { @@ -287,7 +292,7 @@ func (a *appRuntimeStore) OnAdd(obj interface{}) { createrID := configmap.Labels["creater_id"] if serviceID != "" && createrID != "" { appservice, err := a.getAppService(serviceID, version, createrID, true) - if err == conversion.ErrorNotFoundService { + if err == conversion.ErrServiceNotFound { a.conf.KubeClient.CoreV1().ConfigMaps(configmap.Namespace).Delete(configmap.Name, &metav1.DeleteOptions{}) } if appservice != nil { @@ -296,6 +301,21 @@ func (a *appRuntimeStore) OnAdd(obj interface{}) { } } } + if ep, ok := obj.(*corev1.Endpoints); ok { + serviceID := ep.Labels["service_id"] + version := ep.Labels["version"] + createrID := ep.Labels["creater_id"] + if serviceID != "" && createrID != "" { + appservice, err := a.getAppService(serviceID, version, createrID, true) + if err == conversion.ErrServiceNotFound { + a.conf.KubeClient.CoreV1().Endpoints(ep.Namespace).Delete(ep.Name, &metav1.DeleteOptions{}) + } + if appservice != nil { + appservice.SetEndpoints(ep) + return + } + } + } } //getAppService if creater is true, will create new app service where not found in store @@ -304,7 +324,7 @@ func (a *appRuntimeStore) getAppService(serviceID, version, createrID string, cr appservice = a.GetAppService(serviceID) if appservice == nil && creater { var err error - appservice, err = conversion.InitCacheAppService(a.dbmanager, serviceID, version, createrID) + appservice, err = conversion.InitCacheAppService(a.dbmanager, serviceID, createrID) if err != nil { logrus.Errorf("init cache app service failure:%s", err.Error()) return nil, err diff --git a/worker/appm/store/third_party_service.go b/worker/appm/store/third_party_service.go new file mode 100644 index 000000000..815d59187 --- /dev/null +++ b/worker/appm/store/third_party_service.go @@ -0,0 +1,47 @@ +// RAINBOND, Application Management Platform +// Copyright (C) 2014-2017 Goodrain Co., Ltd. + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. For any non-GPL usage of Rainbond, +// one or multiple Commercial Licenses authorized by Goodrain Co., Ltd. +// must be obtained first. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +package store + +import ( + "github.com/Sirupsen/logrus" + "github.com/goodrain/rainbond/worker/appm/conversion" +) + +func (a *appRuntimeStore) initThirdPartyService() error { + // TODO: list third party services that have open ports directly. + svcs, err := a.dbmanager.TenantServiceDao().ListThirdPartyServices() + if err != nil { + logrus.Errorf("error listing third-party services: %v", err) + return err + } + for _, svc := range svcs { + // ignore service without open port. + if !a.dbmanager.TenantServicesPortDao().HasOpenPort(svc.ServiceID) { + continue + } + + appService, err := conversion.InitCacheAppService(a.dbmanager, svc.ServiceID, "Rainbond") + if err != nil { + logrus.Errorf("error initializing cache app service: %v", err) + return err + } + a.RegistAppService(appService) + } + return nil +} diff --git a/worker/appm/thirdparty/discovery/discovery.go b/worker/appm/thirdparty/discovery/discovery.go new file mode 100644 index 000000000..5dfbac8fb --- /dev/null +++ b/worker/appm/thirdparty/discovery/discovery.go @@ -0,0 +1,41 @@ +// RAINBOND, Application Management Platform +// Copyright (C) 2014-2017 Goodrain Co., Ltd. + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. For any non-GPL usage of Rainbond, +// one or multiple Commercial Licenses authorized by Goodrain Co., Ltd. +// must be obtained first. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +package discovery + +import ( + "github.com/goodrain/rainbond/db/model" + "strings" +) + +// Discoverier is the interface that wraps the required methods to gather +// information about third-party service endpoints. +type Discoverier interface { + Connect() error + Fetch() ([]*model.Endpoint, error) + Close() error +} + +func NewDiscoverier(cfg *model.ThirdPartySvcDiscoveryCfg) Discoverier { + switch strings.ToUpper(cfg.Type) { + case "ETCD": + return NewEtcd(cfg) + } + return nil +} + diff --git a/worker/appm/thirdparty/discovery/etcd.go b/worker/appm/thirdparty/discovery/etcd.go new file mode 100644 index 000000000..e4c7fbb7a --- /dev/null +++ b/worker/appm/thirdparty/discovery/etcd.go @@ -0,0 +1,104 @@ +// RAINBOND, Application Management Platform +// Copyright (C) 2014-2017 Goodrain Co., Ltd. + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. For any non-GPL usage of Rainbond, +// one or multiple Commercial Licenses authorized by Goodrain Co., Ltd. +// must be obtained first. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +package discovery + +import ( + "context" + "encoding/json" + "fmt" + c "github.com/coreos/etcd/clientv3" + "github.com/goodrain/rainbond/db/model" + "strings" + "time" +) + +type etcd struct { + cli *c.Client + + endpoints []string + key string + username string + password string +} + +// NewEtcd creates a new Discorvery which implemeted by etcd. +func NewEtcd(cfg *model.ThirdPartySvcDiscoveryCfg) Discoverier { + // TODO: validate endpoints + return &etcd{ + endpoints: strings.Split(cfg.Servers, ","), + key: cfg.Key, + username: cfg.Username, + password: cfg.Password, + } +} + +// Connect connects a etcdv3 client with a given configuration. +func (e *etcd) Connect() error { + cli, err := c.New(c.Config{ + Endpoints: e.endpoints, + DialTimeout: 5, + Username: e.username, + Password: e.password, + }) + if err != nil { + return fmt.Errorf("error connecting etcd: %v", err) + } + e.cli = cli + return nil +} + +// Fetch fetches data from Etcd. +func (e *etcd) Fetch() ([]*model.Endpoint, error) { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + if e.cli == nil { + return nil, fmt.Errorf("can't fetching data from etcd without etcdv3 client") + } + + resp, err := e.cli.Get(ctx, e.key) + if err != nil { + return nil, fmt.Errorf("error fetching endpoints form etcd: %v", err) + } + if resp == nil { + return nil, fmt.Errorf("error fetching endpoints form etcd: empty GetResponse") + } + + type ep struct { + Endpint string `json:"endpoint"` + IsOnline bool `json:"is_online"` + } + var res []*model.Endpoint + for _, kv := range resp.Kvs { + var eps []*model.Endpoint + if err := json.Unmarshal([]byte(kv.Value), &eps); err != nil { + return nil, fmt.Errorf("error getting data from etcd: %v", err) + } + res = append(res, eps...) + } + return res, nil +} + +// Close shuts down the client's etcd connections. +func (e *etcd) Close() error { + if e.cli != nil { + return nil + } + return e.cli.Close() +} diff --git a/worker/appm/thirdparty/endpoints.go b/worker/appm/thirdparty/endpoints.go new file mode 100644 index 000000000..ec336c5e5 --- /dev/null +++ b/worker/appm/thirdparty/endpoints.go @@ -0,0 +1,81 @@ +// RAINBOND, Application Management Platform +// Copyright (C) 2014-2017 Goodrain Co., Ltd. + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. For any non-GPL usage of Rainbond, +// one or multiple Commercial Licenses authorized by Goodrain Co., Ltd. +// must be obtained first. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +package thirdparty + +import ( + "fmt" + "github.com/goodrain/rainbond/db" + "github.com/goodrain/rainbond/db/model" + "github.com/goodrain/rainbond/worker/appm/thirdparty/discovery" + "github.com/goodrain/rainbond/worker/appm/types/v1" +) + +// ListEndpoints lists third-party endpoints. +func ListEndpoints(sid string, dbm db.Manager) ([]*v1.Endpoint, error) { + // dynamic endpoints + cfg, err := dbm.ThirdPartySvcDiscoveryCfgDao().GetByServiceID(sid) + if err != nil { + return nil, err + } + if cfg != nil { + d := discovery.NewDiscoverier(cfg) + if err := d.Connect(); err != nil { + return nil, err + } + defer d.Close() + endpoints, err := d.Fetch() + if err != nil { + return nil, err + } + if endpoints != nil && len(endpoints) > 0 { + return conv(endpoints) + } + } + // static endpoints + endpoints, err := dbm.EndpointsDao().ListIsOnline(sid) + if err != nil { + return nil, err + } + if endpoints == nil || len(endpoints) == 0 { + return nil, fmt.Errorf("error not found endpoints") + } + return conv(endpoints) +} + +func conv(eps []*model.Endpoint) ([]*v1.Endpoint, error) { + var res []*v1.Endpoint + m := make(map[int]*v1.Endpoint) + for _, ep := range eps { + v1ep, ok := m[ep.Port] // the value of port may be 0 + if ok { + v1ep.IPs = append(v1ep.IPs, ep.IP) + continue + } + v1ep = &v1.Endpoint{ + Port: ep.Port, + IPs: []string{ + ep.IP, + }, + } + m[ep.Port] = v1ep + res = append(res, v1ep) + } + // TODO: If the port has three different values, one of them cannot be 0 + return res, nil +} diff --git a/worker/appm/types/v1/endpoint.go b/worker/appm/types/v1/endpoint.go new file mode 100644 index 000000000..a2067758c --- /dev/null +++ b/worker/appm/types/v1/endpoint.go @@ -0,0 +1,25 @@ +// RAINBOND, Application Management Platform +// Copyright (C) 2014-2017 Goodrain Co., Ltd. + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. For any non-GPL usage of Rainbond, +// one or multiple Commercial Licenses authorized by Goodrain Co., Ltd. +// must be obtained first. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +package v1 + +// Endpoint holds information to create corv1.Endpoints(kubernetes object). +type Endpoint struct { + Port int + IPs []string +} diff --git a/worker/appm/types/v1/status.go b/worker/appm/types/v1/status.go index 04c004d22..1b73260da 100644 --- a/worker/appm/types/v1/status.go +++ b/worker/appm/types/v1/status.go @@ -34,6 +34,9 @@ func (a *AppService) IsEmpty() bool { //IsClosed is closed func (a *AppService) IsClosed() bool { + if a.ServiceKind == "third_party" && a.endpoints != nil && len(a.endpoints) > 0 { + return true + } if a.IsEmpty() && a.statefulset == nil && a.deployment == nil { return true } diff --git a/worker/appm/types/v1/v1.go b/worker/appm/types/v1/v1.go index 2ab1750dc..108ff4f60 100644 --- a/worker/appm/types/v1/v1.go +++ b/worker/appm/types/v1/v1.go @@ -59,6 +59,7 @@ type AppServiceBase struct { ServiceID string ServiceAlias string ServiceType AppServiceType + ServiceKind string // inter or third_party DeployVersion string ContainerCPU int ContainerMemory int @@ -80,6 +81,9 @@ type AppService struct { deployment *v1.Deployment replicasets []*v1.ReplicaSet services []*corev1.Service + delServices []*corev1.Service + endpoints []*corev1.Endpoints + delEndpoints []*corev1.Endpoints configMaps []*corev1.ConfigMap ingresses []*extensions.Ingress delIngs []*extensions.Ingress // ingresses which need to be deleted @@ -249,6 +253,11 @@ func (a *AppService) GetServices() []*corev1.Service { return a.services } +//GetDelServices returns services that need to be deleted. +func (a *AppService) GetDelServices() []*corev1.Service { + return a.delServices +} + //DeleteServices delete service func (a *AppService) DeleteServices(service *corev1.Service) { for i, c := range a.services { @@ -259,6 +268,29 @@ func (a *AppService) DeleteServices(service *corev1.Service) { } } +// SetEndpoints sets *corev1.Endpoints for *AppService. +func (a *AppService) SetEndpoints(ep *corev1.Endpoints) { + if len(a.endpoints) > 0 { + for i, e := range a.endpoints { + if e.GetName() == ep.GetName() { + a.endpoints[i] = ep + return + } + } + } + a.endpoints = append(a.endpoints, ep) +} + +// GetEndpoints returns endpoints in AppService +func (a *AppService) GetEndpoints() []*corev1.Endpoints { + return a.endpoints +} + +// GetDelEndpoints returns endpoints that need to be deleted in AppService +func (a *AppService) GetDelEndpoints() []*corev1.Endpoints { + return a.delEndpoints +} + //GetIngress get ingress func (a *AppService) GetIngress() []*extensions.Ingress { return a.ingresses @@ -344,7 +376,7 @@ func (a *AppService) SetAllSecrets(secrets []*corev1.Secret) { a.secrets = secrets } -//DeleteSecrets set srcrets +//DeleteSecrets set secrets func (a *AppService) DeleteSecrets(d *corev1.Secret) { for i, c := range a.secrets { if c.GetName() == d.GetName() { @@ -402,8 +434,11 @@ func (a *AppService) GetTenant() *corev1.Namespace { return a.tenant } -// SetDelIngsSecrets sets delIngs and delSecrets -func (a *AppService) SetDelIngsSecrets(old *AppService) { +// SetDeletedResources sets the resources that need to be deleted +func (a *AppService) SetDeletedResources(old *AppService) { + if old == nil { + return + } for _, o := range old.GetIngress() { del := true for _, n := range a.GetIngress() { @@ -425,7 +460,31 @@ func (a *AppService) SetDelIngsSecrets(old *AppService) { } } if del { - a.secrets = append(a.secrets, o) + a.delSecrets = append(a.delSecrets, o) + } + } + for _, o := range old.GetEndpoints() { + del := true + for _, n := range a.GetEndpoints() { + if o.Name == n.Name { + del = false + break + } + } + if del { + a.delEndpoints = append(a.delEndpoints, o) + } + } + for _, o := range old.GetServices() { + del := true + for _, n := range a.GetServices() { + if o.Name == n.Name { + del = false + break + } + } + if del { + a.delServices = append(a.delServices, o) } } } diff --git a/worker/discover/model/model.go b/worker/discover/model/model.go index 25bd22581..d3717a8a6 100644 --- a/worker/discover/model/model.go +++ b/worker/discover/model/model.go @@ -275,6 +275,7 @@ type ApplyRuleTaskBody struct { ServiceID string `json:"service_id"` DeployVersion string `json:"deploy_version"` EventID string `json:"event_id"` + ServiceKind string `json:"service_kind"` Action string `json:"action"` } diff --git a/worker/handle/manager.go b/worker/handle/manager.go index 1922cc030..953aea924 100644 --- a/worker/handle/manager.go +++ b/worker/handle/manager.go @@ -21,7 +21,9 @@ package handle import ( "context" "fmt" + "github.com/goodrain/rainbond/worker/appm/types/v1" "reflect" + "strings" "github.com/Sirupsen/logrus" "github.com/goodrain/rainbond/cmd/worker/option" @@ -331,13 +333,21 @@ func (m *Manager) applyRuleExec(task *model.Task) error { logrus.Errorf("Can't convert %s to *model.ApplyRuleTaskBody", reflect.TypeOf(task.Body)) return fmt.Errorf("Can't convert %s to *model.ApplyRuleTaskBody", reflect.TypeOf(task.Body)) } + svc, err := db.GetManager().TenantServiceDao().GetServiceByID(body.ServiceID) + if err != nil { + logrus.Errorf("error get TenantServices: %v", err) + return fmt.Errorf("error get TenantServices: %v", err) + } logger := event.GetManager().GetLogger(body.EventID) - oldAppService := m.store.GetAppService(body.ServiceID) - if oldAppService == nil || oldAppService.IsClosed() { - logrus.Debugf("service is closed,no need handle") - logger.Info("service is closed,no need handle", controller.GetLastLoggerOption()) - event.GetManager().ReleaseLogger(logger) - return nil + var oldAppService *v1.AppService + if svc.Kind != "third_party" && !strings.HasPrefix(body.Action, "switch-port") { + oldAppService = m.store.GetAppService(body.ServiceID) + if oldAppService == nil || oldAppService.IsClosed() { + logrus.Debugf("service is closed, no need handle") + logger.Info("service is closed,no need handle", controller.GetLastLoggerOption()) + event.GetManager().ReleaseLogger(logger) + return nil + } } newAppService, err := conversion.InitAppService(m.dbmanager, body.ServiceID) if err != nil { @@ -347,7 +357,7 @@ func (m *Manager) applyRuleExec(task *model.Task) error { return fmt.Errorf("Application init create failure") } newAppService.Logger = logger - newAppService.SetDelIngsSecrets(oldAppService) + newAppService.SetDeletedResources(oldAppService) // update k8s resources err = m.controllerManager.StartController(controller.TypeApplyRuleController, *newAppService) if err != nil {