diff --git a/api/handler/third_party_service_handler.go b/api/handler/third_party_service_handler.go index 5b8d9777d..9b9ccb7d3 100644 --- a/api/handler/third_party_service_handler.go +++ b/api/handler/third_party_service_handler.go @@ -135,7 +135,7 @@ func (t *ThirdPartyServiceHanlder) ListEndpoints(sid string) ([]*model.EndpointR } m := make(map[string]*model.EndpointResp) for _, item := range endpoints { - m[item.UUID] = &model.EndpointResp{ + ep := &model.EndpointResp{ EpID: item.UUID, Address: func(ip string, p int) string { if p != 0 { @@ -147,8 +147,8 @@ func (t *ThirdPartyServiceHanlder) ListEndpoints(sid string) ([]*model.EndpointR IsOnline: false, IsStatic: true, } + m[ep.Address] = ep } - thirdPartyEndpoints, err := t.statusCli.ListThirdPartyEndpoints(sid) if err != nil { logrus.Warningf("ServiceID: %s; grpc; error listing third-party endpoints: %v", sid, err) @@ -156,26 +156,25 @@ func (t *ThirdPartyServiceHanlder) ListEndpoints(sid string) ([]*model.EndpointR } if thirdPartyEndpoints != nil && thirdPartyEndpoints.Obj != nil { for _, item := range thirdPartyEndpoints.Obj { - ep := m[item.Uuid] + ep := m[fmt.Sprintf("%s:%d", item.Ip, item.Port)] if ep != nil { ep.IsOnline = true ep.Status = item.Status continue } - m[item.Uuid] = &model.EndpointResp{ + rep := &model.EndpointResp{ EpID: item.Uuid, Address: item.Ip, Status: item.Status, IsOnline: true, IsStatic: false, } + m[rep.Address] = rep } } - var res []*model.EndpointResp for _, item := range m { res = append(res, item) } - return res, nil } diff --git a/gateway/controller/config/config.go b/gateway/controller/config/config.go index f0f8771f4..901970fd2 100644 --- a/gateway/controller/config/config.go +++ b/gateway/controller/config/config.go @@ -83,7 +83,7 @@ func NewDefault() Configuration { cfg := Configuration{ Backend: defaults.Backend{ ProxyBodySize: bodySize, - ProxyConnectTimeout: 75, + ProxyConnectTimeout: 60, ProxyReadTimeout: 60, ProxySendTimeout: 60, ProxyBuffersNumber: 4, diff --git a/gateway/store/store.go b/gateway/store/store.go index 59767dcbb..784b26e40 100644 --- a/gateway/store/store.go +++ b/gateway/store/store.go @@ -402,15 +402,15 @@ func (s *k8sStore) ListPool() ([]*v1.Pool, []*v1.Pool) { pool.UpstreamHashBy = backend.hashBy l7Pools[backend.name] = pool } + var notReadyAddress *corev1.EndpointAddress + var notReadyPort *corev1.EndpointPort for _, ss := range ep.Subsets { - for _, port := range ss.Ports { - var addresses []corev1.EndpointAddress - if ss.Addresses != nil && len(ss.Addresses) > 0 { - addresses = append(addresses, ss.Addresses...) - } else if len(ss.NotReadyAddresses) > 0 { - addresses = append(addresses, ss.NotReadyAddresses[0]) + for i, port := range ss.Ports { + if (ss.Addresses == nil || len(ss.Addresses) == 0) && len(ss.NotReadyAddresses) > 0 { + notReadyAddress = &ss.NotReadyAddresses[0] + notReadyPort = &ss.Ports[i] } - for _, address := range addresses { + for _, address := range ss.Addresses { if _, ok := l7PoolMap[epn]; ok { // l7 pool.Nodes = append(pool.Nodes, &v1.Node{ Host: address.IP, @@ -421,6 +421,14 @@ func (s *k8sStore) ListPool() ([]*v1.Pool, []*v1.Pool) { } } } + // If you have an address, make sure you have at least one node, regardless of its health + if len(pool.Nodes) == 0 && notReadyAddress != nil && notReadyPort != nil { + pool.Nodes = append(pool.Nodes, &v1.Node{ + Host: notReadyAddress.IP, + Port: notReadyPort.Port, + Weight: backend.weight, + }) + } } // l4 backends = l4PoolBackendMap[ep.ObjectMeta.Name] diff --git a/node/nodem/envoy/conver/eds_conver.go b/node/nodem/envoy/conver/eds_conver.go index b0c6d14d2..3336793cb 100644 --- a/node/nodem/envoy/conver/eds_conver.go +++ b/node/nodem/envoy/conver/eds_conver.go @@ -49,8 +49,11 @@ func OneNodeClusterLoadAssignment(serviceAlias, namespace string, endpoints []*c logrus.Debugf("select endpoints %d for service %s", len(selectEndpoint), service.Name) var lendpoints []endpoint.LocalityLbEndpoints // localityLbEndpoints just support only one content for _, en := range selectEndpoint { + var notReadyAddress *corev1.EndpointAddress + var notReadyPort *corev1.EndpointPort + var notreadyToPort int for _, subset := range en.Subsets { - for _, port := range subset.Ports { + for i, port := range subset.Ports { toport := int(port.Port) if serviceAlias == destServiceAlias { //use real port @@ -62,35 +65,47 @@ func OneNodeClusterLoadAssignment(serviceAlias, namespace string, endpoints []*c } } protocol := string(port.Protocol) - addressList := subset.Addresses - var notready bool - if len(addressList) == 0 && len(subset.NotReadyAddresses) > 0 { - notready = true - addressList = subset.NotReadyAddresses[:1] + if len(subset.Addresses) == 0 && len(subset.NotReadyAddresses) > 0 { + notReadyAddress = &subset.NotReadyAddresses[0] + notreadyToPort = toport + notReadyPort = &subset.Ports[i] } getHealty := func() *endpoint.Endpoint_HealthCheckConfig { - if notready { - return nil - } return &endpoint.Endpoint_HealthCheckConfig{ PortValue: uint32(toport), } } - var lbe []endpoint.LbEndpoint // just support one content - for _, address := range addressList { - envoyAddress := envoyv2.CreateSocketAddress(protocol, address.IP, uint32(toport)) - lbe = append(lbe, endpoint.LbEndpoint{ - HostIdentifier: &endpoint.LbEndpoint_Endpoint{ - Endpoint: &endpoint.Endpoint{ - Address: &envoyAddress, - HealthCheckConfig: getHealty(), + if len(subset.Addresses) > 0 { + var lbe []endpoint.LbEndpoint + for _, address := range subset.Addresses { + envoyAddress := envoyv2.CreateSocketAddress(protocol, address.IP, uint32(toport)) + lbe = append(lbe, endpoint.LbEndpoint{ + HostIdentifier: &endpoint.LbEndpoint_Endpoint{ + Endpoint: &endpoint.Endpoint{ + Address: &envoyAddress, + HealthCheckConfig: getHealty(), + }, }, - }, - }) + }) + } + if len(lbe) > 0 { + lendpoints = append(lendpoints, endpoint.LocalityLbEndpoints{LbEndpoints: lbe}) + } } - lendpoints = append(lendpoints, endpoint.LocalityLbEndpoints{LbEndpoints: lbe}) } } + if len(lendpoints) == 0 && notReadyAddress != nil && notReadyPort != nil { + var lbe []endpoint.LbEndpoint + envoyAddress := envoyv2.CreateSocketAddress(string(notReadyPort.Protocol), notReadyAddress.IP, uint32(notreadyToPort)) + lbe = append(lbe, endpoint.LbEndpoint{ + HostIdentifier: &endpoint.LbEndpoint_Endpoint{ + Endpoint: &endpoint.Endpoint{ + Address: &envoyAddress, + }, + }, + }) + lendpoints = append(lendpoints, endpoint.LocalityLbEndpoints{LbEndpoints: lbe}) + } } cla := &v2.ClusterLoadAssignment{ ClusterName: clusterName,