list third endpoints

This commit is contained in:
GLYASAI 2021-08-15 14:41:33 +08:00
parent b0067ea50b
commit 345af7621b
12 changed files with 2122 additions and 3514 deletions

View File

@ -137,7 +137,7 @@ func (t *ThirdPartyServiceController) listEndpoints(w http.ResponseWriter, r *ht
return
}
if len(res) == 0 {
httputil.ReturnSuccess(r, w, []*model.EndpointResp{})
httputil.ReturnSuccess(r, w, []*model.ThirdEndpoint{})
return
}
httputil.ReturnSuccess(r, w, res)

View File

@ -29,11 +29,13 @@ import (
dbmodel "github.com/goodrain/rainbond/db/model"
"github.com/goodrain/rainbond/util"
"github.com/goodrain/rainbond/worker/client"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
// ThirdPartyServiceHanlder handles business logic for all third-party services
type ThirdPartyServiceHanlder struct {
logger *logrus.Entry
dbmanager db.Manager
statusCli *client.AppRuntimeSyncClient
}
@ -41,6 +43,7 @@ type ThirdPartyServiceHanlder struct {
// Create3rdPartySvcHandler creates a new *ThirdPartyServiceHanlder.
func Create3rdPartySvcHandler(dbmanager db.Manager, statusCli *client.AppRuntimeSyncClient) *ThirdPartyServiceHanlder {
return &ThirdPartyServiceHanlder{
logger: logrus.WithField("WHO", "ThirdPartyServiceHanlder"),
dbmanager: dbmanager,
statusCli: statusCli,
}
@ -130,51 +133,78 @@ func (t *ThirdPartyServiceHanlder) DelEndpoints(epid, sid string) error {
}
// ListEndpoints lists third-party service endpoints.
func (t *ThirdPartyServiceHanlder) ListEndpoints(sid string) ([]*model.EndpointResp, error) {
endpoints, err := t.dbmanager.EndpointsDao().List(sid)
func (t *ThirdPartyServiceHanlder) ListEndpoints(componentID string) ([]*model.ThirdEndpoint, error) {
logger := t.logger.WithField("Method", "ListEndpoints").
WithField("ComponentID", componentID)
runtimeEndpoints, err := t.listRuntimeEndpoints(componentID)
if err != nil {
logrus.Warningf("ServiceID: %s; error listing endpoints from db; %v", sid, err)
logger.Warning(err.Error())
}
m := make(map[string]*model.EndpointResp)
for _, item := range endpoints {
ep := &model.EndpointResp{
EpID: item.UUID,
Address: func(ip string, p int) string {
if p != 0 {
return fmt.Sprintf("%s:%d", ip, p)
}
return ip
}(item.IP, item.Port),
staticEndpoints, err := t.listStaticEndpoints(componentID)
if err != nil {
staticEndpoints = map[string]*model.ThirdEndpoint{}
logger.Warning(err.Error())
}
// Merge runtimeEndpoints with staticEndpoints
for _, ep := range runtimeEndpoints {
sep, ok := staticEndpoints[ep.EpID]
if !ok {
continue
}
ep.IsStatic = sep.IsStatic
ep.Address = sep.Address
delete(staticEndpoints, ep.EpID)
}
// Add offline static endpoints
for _, ep := range staticEndpoints {
runtimeEndpoints = append(runtimeEndpoints, ep)
}
sort.Sort(model.ThirdEndpoints(runtimeEndpoints))
return runtimeEndpoints, nil
}
func (t *ThirdPartyServiceHanlder) listRuntimeEndpoints(componentID string) ([]*model.ThirdEndpoint, error) {
runtimeEndpoints, err := t.statusCli.ListThirdPartyEndpoints(componentID)
if err != nil {
return nil, errors.Wrap(err, "list runtime third endpoints")
}
var endpoints []*model.ThirdEndpoint
for _, item := range runtimeEndpoints.Items {
endpoints = append(endpoints, &model.ThirdEndpoint{
EpID: item.Name,
Address: item.Address,
Status: item.Status,
})
}
return endpoints, nil
}
func (t *ThirdPartyServiceHanlder) listStaticEndpoints(componentID string) (map[string]*model.ThirdEndpoint, error) {
staticEndpoints, err := t.dbmanager.EndpointsDao().List(componentID)
if err != nil {
return nil, errors.Wrap(err, "list static endpoints")
}
endpoints := make(map[string]*model.ThirdEndpoint)
for _, item := range staticEndpoints {
address := func(ip string, p int) string {
if p != 0 {
return fmt.Sprintf("%s:%d", ip, p)
}
return ip
}(item.IP, item.Port)
endpoints[item.UUID] = &model.ThirdEndpoint{
EpID: item.UUID,
Address: address,
Status: "-",
IsStatic: true,
}
m[ep.Address] = ep
}
thirdPartyEndpoints, err := t.statusCli.ListThirdPartyEndpoints(sid)
if err != nil {
logrus.Warningf("ServiceID: %s; grpc; error listing third-party endpoints: %v", sid, err)
return nil, err
}
if thirdPartyEndpoints != nil && thirdPartyEndpoints.Obj != nil {
for _, item := range thirdPartyEndpoints.Obj {
ep := m[fmt.Sprintf("%s:%d", item.Ip, item.Port)]
if ep != nil {
ep.Status = item.Status
continue
}
rep := &model.EndpointResp{
EpID: item.Uuid,
Address: item.Ip,
Status: item.Status,
IsStatic: false,
}
m[rep.Address] = rep
}
}
var res []*model.EndpointResp
for _, item := range m {
res = append(res, item)
}
sort.Sort(model.EndpointResps(res))
return res, nil
return endpoints, nil
}

View File

@ -34,30 +34,30 @@ type DelEndpiontsReq struct {
EpID string `json:"ep_id" validate:"required|len:32"`
}
// EndpointResp is one of the Endpoints list in the response to list, add,
// ThirdEndpoint is one of the Endpoints list in the response to list, add,
// update or delete the endpints.
type EndpointResp struct {
type ThirdEndpoint struct {
EpID string `json:"ep_id"`
Address string `json:"address"`
Status string `json:"status"`
IsStatic bool `json:"is_static"`
}
// EndpointResps -
type EndpointResps []*EndpointResp
// ThirdEndpoints -
type ThirdEndpoints []*ThirdEndpoint
// Len is part of sort.Interface.
func (e EndpointResps) Len() int {
func (e ThirdEndpoints) Len() int {
return len(e)
}
// Swap is part of sort.Interface.
func (e EndpointResps) Swap(i, j int) {
func (e ThirdEndpoints) Swap(i, j int) {
e[i], e[j] = e[j], e[i]
}
// Less is part of sort.Interface. It is implemented by calling the "by" closure in the sorter.
func (e EndpointResps) Less(i, j int) bool {
func (e ThirdEndpoints) Less(i, j int) bool {
return e[i].Address < e[j].Address
}

View File

@ -50,6 +50,9 @@ spec:
description: Specify a private certificate when the protocol
is HTTPS
type: string
name:
description: Then Name of the Endpoint.
type: string
protocol:
description: 'Address protocols, including: HTTP, TCP, UDP,
HTTPS'
@ -160,6 +163,9 @@ spec:
address:
description: The address including the port number.
type: string
name:
description: Then Name of the Endpoint.
type: string
reason:
description: Reason probe not passed reason
type: string

View File

@ -112,6 +112,9 @@ type ThirdComponentEndpointSource struct {
type ThirdComponentEndpoint struct {
// The address including the port number.
Address string `json:"address"`
// Then Name of the Endpoint.
// +optional
Name string `json:"name"`
// Address protocols, including: HTTP, TCP, UDP, HTTPS
// +optional
Protocol string `json:"protocol,omitempty"`
@ -362,6 +365,10 @@ func (e EndpointAddress) GetPort() int {
// EnsureScheme -
func (e EndpointAddress) EnsureScheme() string {
address := string(e)
return ensureScheme(address)
}
func ensureScheme(address string) string {
if strings.HasPrefix(address, "http://") || strings.HasPrefix(address, "https://") {
return address
}
@ -382,12 +389,11 @@ func NewEndpointAddress(host string, port int) *EndpointAddress {
return &ea
}
u, err := url.Parse(host)
_, err := url.Parse(ensureScheme(host))
if err != nil {
return nil
}
u.Path = ""
ea := EndpointAddress(u.String())
ea := EndpointAddress(host)
return &ea
}
@ -395,6 +401,9 @@ func NewEndpointAddress(host string, port int) *EndpointAddress {
type ThirdComponentEndpointStatus struct {
// The address including the port number.
Address EndpointAddress `json:"address"`
// Then Name of the Endpoint.
// +optional
Name string `json:"name"`
// Reference to object providing the endpoint.
// +optional
TargetRef *v1.ObjectReference `json:"targetRef,omitempty" protobuf:"bytes,2,opt,name=targetRef"`

View File

@ -171,6 +171,7 @@ func (c *Builder) listStaticEndpoints(componentID string) ([]*v1alpha1.ThirdComp
for _, ep := range endpoints {
res = append(res, &v1alpha1.ThirdComponentEndpoint{
Address: ep.GetAddress(),
Name: ep.UUID,
})
}
return res, nil

View File

@ -59,8 +59,9 @@ parameter: {
name: string
}
endpoints?: [...{
address: string
protocol?: string
address: string
name?: string
protocol?: string
clientSecret?: string
}]
port?: [...{

View File

@ -189,7 +189,8 @@ func (r *Reconciler) applyEndpointService(ctx context.Context, log *logrus.Entry
var old corev1.Endpoints
if err := r.Client.Get(ctx, types.NamespacedName{Namespace: ep.Namespace, Name: ep.Name}, &old); err == nil {
// no change not apply
if reflect.DeepEqual(old.Subsets, ep.Subsets) {
if reflect.DeepEqual(old.Subsets, ep.Subsets) &&
reflect.DeepEqual(old.Annotations, ep.Annotations) {
return
}
}

View File

@ -36,46 +36,44 @@ func (s *staticEndpoint) Discover(ctx context.Context, update chan *v1alpha1.Thi
func (s *staticEndpoint) DiscoverOne(ctx context.Context) ([]*v1alpha1.ThirdComponentEndpointStatus, error) {
component := s.component
var res []*v1alpha1.ThirdComponentEndpointStatus
var endpoints []*v1alpha1.ThirdComponentEndpointStatus
for _, ep := range component.Spec.EndpointSource.StaticEndpoints {
var addresses []*v1alpha1.EndpointAddress
if ep.GetPort() != 0 {
address := v1alpha1.NewEndpointAddress(ep.GetIP(), ep.GetPort())
if address != nil {
addresses = append(addresses, address)
endpoints = append(endpoints, &v1alpha1.ThirdComponentEndpointStatus{
Address: *address,
Name: ep.Name,
})
}
} else {
for _, port := range component.Spec.Ports {
address := v1alpha1.NewEndpointAddress(ep.Address, port.Port)
if address != nil {
addresses = append(addresses, address)
endpoints = append(endpoints, &v1alpha1.ThirdComponentEndpointStatus{
Address: *address,
Name: ep.Name,
})
}
}
}
if len(addresses) == 0 {
if len(endpoints) == 0 {
continue
}
for _, address := range addresses {
address := address
es := &v1alpha1.ThirdComponentEndpointStatus{
Address: *address,
Status: v1alpha1.EndpointReady,
}
res = append(res, es)
for _, ep := range endpoints {
// Make ready as the default status
es.Status = v1alpha1.EndpointReady
ep.Status = v1alpha1.EndpointReady
if s.proberManager != nil {
result, found := s.proberManager.GetResult(s.component.GetEndpointID(es))
result, found := s.proberManager.GetResult(s.component.GetEndpointID(ep))
if found && result != results.Success {
es.Status = v1alpha1.EndpointNotReady
ep.Status = v1alpha1.EndpointNotReady
}
}
}
}
return res, nil
return endpoints, nil
}
func (s *staticEndpoint) SetProberManager(proberManager prober.Manager) {

File diff suppressed because it is too large Load Diff

View File

@ -133,16 +133,14 @@ message DelThirdPartyEndpointsReq {
}
message ThirdPartyEndpoint {
string uuid = 1;
string sid = 2;
string ip = 3;
int32 port = 4;
string status = 5;
bool is_online = 6;
string name = 1;
string componentID = 2;
string address = 3;
string status = 4;
}
message ThirdPartyEndpoints {
repeated ThirdPartyEndpoint obj = 1;
repeated ThirdPartyEndpoint items = 1;
}
message ListPodsBySIDReq {

View File

@ -55,8 +55,11 @@ import (
//RuntimeServer app runtime grpc server
type RuntimeServer struct {
ctx context.Context
cancel context.CancelFunc
ctx context.Context
cancel context.CancelFunc
logger *logrus.Entry
store store.Storer
conf option.Config
server *grpc.Server
@ -76,6 +79,7 @@ func CreaterRuntimeServer(conf option.Config,
conf: conf,
ctx: ctx,
cancel: cancel,
logger: logrus.WithField("WHO", "RuntimeServer"),
server: grpc.NewServer(),
hostIP: conf.HostIP,
store: store,
@ -480,65 +484,47 @@ func (r *RuntimeServer) ListThirdPartyEndpoints(ctx context.Context, re *pb.Serv
if as == nil {
return new(pb.ThirdPartyEndpoints), nil
}
var pbeps []*pb.ThirdPartyEndpoint
// The same IP may correspond to two endpoints, which are internal and external endpoints.
// So it is need to filter the same IP.
exists := make(map[string]bool)
addEndpoint := func(tpe *pb.ThirdPartyEndpoint) {
if !exists[fmt.Sprintf("%s:%d", tpe.Ip, tpe.Port)] {
pbeps = append(pbeps, tpe)
exists[fmt.Sprintf("%s:%d", tpe.Ip, tpe.Port)] = true
}
}
for _, ep := range as.GetEndpoints(false) {
if ep.Subsets == nil || len(ep.Subsets) == 0 {
logrus.Debugf("Key: %s; empty subsets", fmt.Sprintf("%s/%s", ep.Namespace, ep.Name))
continue
}
for _, subset := range ep.Subsets {
for _, port := range subset.Ports {
for _, address := range subset.Addresses {
ip := address.IP
if ip == "1.1.1.1" {
if len(as.GetServices(false)) > 0 {
ip = as.GetServices(false)[0].Annotations["domain"]
}
}
addEndpoint(&pb.ThirdPartyEndpoint{
Uuid: port.Name,
Sid: ep.GetLabels()["service_id"],
Ip: ip,
Port: port.Port,
Status: func() string {
return "healthy"
}(),
})
endpoints := r.listThirdEndpoints(as)
var items []*pb.ThirdPartyEndpoint
for _, ep := range endpoints {
items = append(items, &pb.ThirdPartyEndpoint{
Name: ep.Name,
ComponentID: as.ServiceID,
Address: string(ep.Address),
Status: func() string {
if ep.Status == v1alpha1.EndpointReady {
return "healthy"
}
for _, address := range subset.NotReadyAddresses {
ip := address.IP
if ip == "1.1.1.1" {
if len(as.GetServices(false)) > 0 {
ip = as.GetServices(false)[0].Annotations["domain"]
}
}
addEndpoint(&pb.ThirdPartyEndpoint{
Uuid: port.Name,
Sid: ep.GetLabels()["service_id"],
Ip: ip,
Port: port.Port,
Status: func() string {
return "unhealthy"
}(),
})
}
}
}
return "unhealthy"
}(),
})
}
return &pb.ThirdPartyEndpoints{
Obj: pbeps,
Items: items,
}, nil
}
func (r *RuntimeServer) listThirdEndpoints(as *v1.AppService) []*v1alpha1.ThirdComponentEndpointStatus {
logger := r.logger.WithField("Method", "listThirdComponentEndpoints").
WithField("ComponentID", as.ServiceID)
workload := as.GetWorkload()
if workload == nil {
// workload not found
return nil
}
component, ok := workload.(*v1alpha1.ThirdComponent)
if !ok {
logger.Warningf("expect thirdcomponents.rainbond.io, but got %s", workload.GetObjectKind())
return nil
}
return component.Status.Endpoints
}
// AddThirdPartyEndpoint creates a create event.
func (r *RuntimeServer) AddThirdPartyEndpoint(ctx context.Context, re *pb.AddThirdPartyEndpointsReq) (*pb.Empty, error) {
as := r.store.GetAppService(re.Sid)