static endpoints discover

This commit is contained in:
GLYASAI 2021-08-05 11:48:31 +08:00
parent b1fbc171ae
commit 19fec5d0b0
11 changed files with 110 additions and 66 deletions

View File

@ -19,7 +19,7 @@ spec:
status: {}
validation:
openAPIV3Schema:
description: HelmApp -
description: ThirdComponent -
properties:
apiVersion:
description: 'APIVersion defines the versioned schema of this representation
@ -34,29 +34,15 @@ spec:
metadata:
type: object
spec:
description: ThirdComponentSpec -
properties:
endpointSource:
description: endpoint source config
properties:
endpoints:
items:
properties:
address:
description: The address including the port number.
type: string
clientSecret:
description: Specify a private certificate when the protocol
is HTTPS
type: string
protocol:
description: 'Address protocols, including: HTTP, TCP, UDP,
HTTPS'
type: string
required:
- address
type: object
type: array
type: boolean
kubernetesService:
description: KubernetesServiceSource -
properties:
name:
type: string
@ -127,6 +113,7 @@ spec:
- ports
type: object
status:
description: ThirdComponentStatus -
properties:
endpoints:
items:
@ -187,6 +174,7 @@ spec:
type: object
type: array
phase:
description: ComponentPhase -
type: string
reason:
type: string

View File

@ -18,7 +18,9 @@
package model
import "fmt"
import (
"fmt"
)
// Endpoint is a persistent object for table 3rd_party_svc_endpoints.
type Endpoint struct {
@ -36,8 +38,8 @@ func (Endpoint) TableName() string {
return "tenant_service_3rd_party_endpoints"
}
// Address -
func (e *Endpoint) Address() string {
// GetAddress -
func (e *Endpoint) GetAddress() string {
if e.Port == 0 {
return e.IP
}

View File

@ -46,6 +46,11 @@ type ThirdComponent struct {
Status ThirdComponentStatus `json:"status,omitempty"`
}
// GetComponentID -
func (in *ThirdComponent) GetComponentID() string {
return in.Name
}
// +kubebuilder:object:root=true
// ThirdComponentList contains a list of ThirdComponent
@ -68,8 +73,8 @@ type ThirdComponentSpec struct {
// ThirdComponentEndpointSource -
type ThirdComponentEndpointSource struct {
StaticEndpoints []*ThirdComponentEndpoint `json:"endpoints,omitempty"`
KubernetesService *KubernetesServiceSource `json:"kubernetesService,omitempty"`
StaticEndpoints *bool `json:"endpoints,omitempty"`
KubernetesService *KubernetesServiceSource `json:"kubernetesService,omitempty"`
//other source
// NacosSource
// EurekaSource

View File

@ -460,14 +460,8 @@ func (in *ThirdComponentEndpointSource) DeepCopyInto(out *ThirdComponentEndpoint
*out = *in
if in.StaticEndpoints != nil {
in, out := &in.StaticEndpoints, &out.StaticEndpoints
*out = make([]*ThirdComponentEndpoint, len(*in))
for i := range *in {
if (*in)[i] != nil {
in, out := &(*in)[i], &(*out)[i]
*out = new(ThirdComponentEndpoint)
**out = **in
}
}
*out = new(bool)
**out = **in
}
if in.KubernetesService != nil {
in, out := &in.KubernetesService, &out.KubernetesService

View File

@ -21,7 +21,7 @@ package componentdefinition
//ThirdComponentProperties third component properties
type ThirdComponentProperties struct {
Kubernetes *ThirdComponentKubernetes `json:"kubernetes,omitempty"`
Endpoints []*ThirdComponentEndpoint `json:"endpoints,omitempty"`
Endpoints *bool `json:"endpoints,omitempty"`
Port []*ThirdComponentPort `json:"port"`
}
@ -38,10 +38,3 @@ type ThirdComponentKubernetes struct {
Name string `json:"name"`
Namespace string `json:"namespace"`
}
// ThirdComponentEndpoint -
type ThirdComponentEndpoint struct {
Address string `json:"address"`
Protocol string `json:"protocol,omitempty"`
ClientSecret string `json:"clientSecret,omitempty"`
}

View File

@ -28,13 +28,16 @@ import (
dbmodel "github.com/goodrain/rainbond/db/model"
"github.com/goodrain/rainbond/pkg/apis/rainbond/v1alpha1"
rainbondversioned "github.com/goodrain/rainbond/pkg/generated/clientset/versioned"
"github.com/goodrain/rainbond/util/commonutil"
v1 "github.com/goodrain/rainbond/worker/appm/types/v1"
"github.com/sirupsen/logrus"
k8sErrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
// ErrNotSupport -
var ErrNotSupport = fmt.Errorf("not support component definition")
// ErrOnlyCUESupport -
var ErrOnlyCUESupport = fmt.Errorf("component definition only support cue template")
@ -124,11 +127,11 @@ func (c *ComponentDefinitionBuilder) GetComponentProperties(as *v1.AppService, d
}
// static endpoints
endpoints, err := c.listStaticEndpoints(as.ServiceID)
containsEndpoints, err := c.containsEndpoints(as.ServiceID)
if err != nil {
c.logger.Errorf("component id: %s; list static endpoints: %v", as.ServiceID, err)
}
properties.Endpoints = endpoints
properties.Endpoints = commonutil.Bool(containsEndpoints)
ports, err := dbm.TenantServicesPortDao().GetPortsByServiceID(as.ServiceID)
if err != nil {
@ -152,19 +155,12 @@ func (c *ComponentDefinitionBuilder) GetComponentProperties(as *v1.AppService, d
}
}
func (c *ComponentDefinitionBuilder) listStaticEndpoints(componentID string) ([]*ThirdComponentEndpoint, error) {
func (c *ComponentDefinitionBuilder) containsEndpoints(componentID string) (bool, error) {
endpoints, err := db.GetManager().EndpointsDao().List(componentID)
if err != nil {
return nil, err
return false, err
}
var res []*ThirdComponentEndpoint
for _, ep := range endpoints {
res = append(res, &ThirdComponentEndpoint{
Address: ep.Address(),
})
}
return res, nil
return len(endpoints) > 0, nil
}
// BuildWorkloadResource -
@ -196,7 +192,7 @@ func (c *ComponentDefinitionBuilder) InitCoreComponentDefinition(rainbondClient
for _, ccd := range coreComponentDefinition {
if c.GetComponentDefinition(ccd.Name) == nil {
logrus.Infof("create core componentdefinition %s", ccd.Name)
if _, err := rainbondClient.RainbondV1alpha1().ComponentDefinitions(c.namespace).Create(context.Background(), ccd, metav1.CreateOptions{}); err != nil {
if _, err := rainbondClient.RainbondV1alpha1().ComponentDefinitions(c.namespace).Create(context.Background(), ccd, metav1.CreateOptions{}); err != nil && !k8sErrors.IsNotFound(err) {
logrus.Errorf("create core componentdefinition %s failire %s", ccd.Name, err.Error())
}
}

View File

@ -55,11 +55,7 @@ parameter: {
namespace?: string
name: string
}
endpoints?: [...{
address: string
protocol?: string
clientSecret?: string
}]
endpoints?: bool
port?: [...{
name: string
port: >0 & <=65533

View File

@ -24,7 +24,10 @@ import (
"time"
"github.com/goodrain/rainbond/pkg/apis/rainbond/v1alpha1"
rainbondlistersv1alpha1 "github.com/goodrain/rainbond/pkg/generated/listers/rainbond/v1alpha1"
dis "github.com/goodrain/rainbond/worker/master/controller/thirdcomponent/discover"
"github.com/oam-dev/kubevela/pkg/utils/apply"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
@ -34,8 +37,10 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/retry"
ctrl "sigs.k8s.io/controller-runtime"
runtimecache "sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
@ -52,6 +57,9 @@ type Reconciler struct {
applyer apply.Applicator
discoverPool *DiscoverPool
discoverNum prometheus.Gauge
informer runtimecache.Informer
lister rainbondlistersv1alpha1.ThirdComponentLister
}
// Reconcile is the main logic of appDeployment controller
@ -82,7 +90,7 @@ func (r *Reconciler) Reconcile(req ctrl.Request) (res reconcile.Result, retErr e
return ctrl.Result{}, nil
}
logrus.Debugf("start to reconcile component %s/%s", component.Namespace, component.Name)
discover, err := NewDiscover(component, r.restConfig)
discover, err := dis.NewDiscover(component, r.restConfig, r.lister)
if err != nil {
component.Status.Phase = v1alpha1.ComponentFailed
component.Status.Reason = err.Error()
@ -283,17 +291,23 @@ func (r *Reconciler) Collect(ch chan<- prometheus.Metric) {
// Setup adds a controller that reconciles AppDeployment.
func Setup(ctx context.Context, mgr ctrl.Manager) (*Reconciler, error) {
applyer := apply.NewAPIApplicator(mgr.GetClient())
informer, err := mgr.GetCache().GetInformerForKind(ctx, v1alpha1.SchemeGroupVersion.WithKind("ThirdComponent"))
if err != nil {
return nil, errors.WithMessage(err, "get informer for thirdcomponent")
}
lister := rainbondlistersv1alpha1.NewThirdComponentLister(informer.(cache.SharedIndexInformer).GetIndexer())
r := &Reconciler{
Client: mgr.GetClient(),
restConfig: mgr.GetConfig(),
Scheme: mgr.GetScheme(),
applyer: applyer,
applyer: apply.NewAPIApplicator(mgr.GetClient()),
discoverNum: prometheus.NewGauge(prometheus.GaugeOpts{
Namespace: "controller",
Name: "third_component_discover_number",
Help: "Number of running endpoint discover worker of third component.",
}),
lister: lister,
}
dp := NewDiscoverPool(ctx, r)
r.discoverPool = dp

View File

@ -16,7 +16,7 @@
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package thirdcomponent
package discover
import (
"context"
@ -24,6 +24,8 @@ import (
"time"
"github.com/goodrain/rainbond/pkg/apis/rainbond/v1alpha1"
rainbondlistersv1alpha1 "github.com/goodrain/rainbond/pkg/generated/listers/rainbond/v1alpha1"
"github.com/goodrain/rainbond/util/commonutil"
"github.com/sirupsen/logrus"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -38,7 +40,7 @@ type Discover interface {
Discover(ctx context.Context, update chan *v1alpha1.ThirdComponent) ([]*v1alpha1.ThirdComponentEndpointStatus, error)
}
func NewDiscover(component *v1alpha1.ThirdComponent, restConfig *rest.Config) (Discover, error) {
func NewDiscover(component *v1alpha1.ThirdComponent, restConfig *rest.Config, lister rainbondlistersv1alpha1.ThirdComponentLister) (Discover, error) {
if component.Spec.EndpointSource.KubernetesService != nil {
clientset, err := kubernetes.NewForConfig(restConfig)
if err != nil {
@ -50,6 +52,12 @@ func NewDiscover(component *v1alpha1.ThirdComponent, restConfig *rest.Config) (D
client: clientset,
}, nil
}
if commonutil.BoolValue(component.Spec.EndpointSource.StaticEndpoints) {
return &staticEndpoint{
component: component,
lister: lister,
}, nil
}
return nil, fmt.Errorf("not support source type")
}
@ -86,6 +94,7 @@ func (k *kubernetesDiscover) Discover(ctx context.Context, update chan *v1alpha1
case <-ctx.Done():
return nil, nil
case <-re.ResultChan():
logrus.Infof("endpoint(%s/%s) changed", namespace, service.Name)
func() {
ctx, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel()

View File

@ -0,0 +1,46 @@
package discover
import (
"context"
"github.com/goodrain/rainbond/db"
"github.com/goodrain/rainbond/pkg/apis/rainbond/v1alpha1"
rainbondlistersv1alpha1 "github.com/goodrain/rainbond/pkg/generated/listers/rainbond/v1alpha1"
"github.com/pkg/errors"
)
type staticEndpoint struct {
lister rainbondlistersv1alpha1.ThirdComponentLister
component *v1alpha1.ThirdComponent
}
func (s *staticEndpoint) GetComponent() *v1alpha1.ThirdComponent {
return s.component
}
func (s *staticEndpoint) Discover(ctx context.Context, update chan *v1alpha1.ThirdComponent) ([]*v1alpha1.ThirdComponentEndpointStatus, error) {
return nil, nil
}
func (s *staticEndpoint) DiscoverOne(ctx context.Context) ([]*v1alpha1.ThirdComponentEndpointStatus, error) {
// Optimization: reduce the press of database if necessary.
endpoints, err := db.GetManager().EndpointsDao().ListIsOnline(s.component.GetComponentID())
if err != nil {
return nil, errors.WithMessage(err, "list online static endpoints")
}
var res []*v1alpha1.ThirdComponentEndpointStatus
for _, ep := range endpoints {
ed := v1alpha1.NewEndpointAddress(ep.IP, ep.Port)
if ed == nil {
continue
}
res = append(res, &v1alpha1.ThirdComponentEndpointStatus{
ServicePort: ep.Port,
Address: v1alpha1.EndpointAddress(ep.GetAddress()),
Status: v1alpha1.EndpointReady,
})
}
return res, nil
}

View File

@ -25,6 +25,7 @@ import (
"time"
"github.com/goodrain/rainbond/pkg/apis/rainbond/v1alpha1"
dis "github.com/goodrain/rainbond/worker/master/controller/thirdcomponent/discover"
"github.com/sirupsen/logrus"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
@ -86,7 +87,7 @@ func (d *DiscoverPool) Start() {
}
type Worker struct {
discover Discover
discover dis.Discover
cancel context.CancelFunc
ctx context.Context
updateChan chan *v1alpha1.ThirdComponent
@ -110,7 +111,7 @@ func (w *Worker) Start() {
}
}
func (w *Worker) UpdateDiscover(discover Discover) {
func (w *Worker) UpdateDiscover(discover dis.Discover) {
w.discover = discover
}
@ -122,7 +123,7 @@ func (w *Worker) IsStop() bool {
return w.stoped
}
func (d *DiscoverPool) newWorker(dis Discover) *Worker {
func (d *DiscoverPool) newWorker(dis dis.Discover) *Worker {
ctx, cancel := context.WithCancel(d.ctx)
return &Worker{
ctx: ctx,
@ -132,7 +133,7 @@ func (d *DiscoverPool) newWorker(dis Discover) *Worker {
}
}
func (d *DiscoverPool) AddDiscover(dis Discover) {
func (d *DiscoverPool) AddDiscover(dis dis.Discover) {
d.lock.Lock()
defer d.lock.Unlock()
component := dis.GetComponent()