Fixed the issues of too many notready endpoints

This commit is contained in:
barnettZQG 2020-02-05 22:24:17 +08:00
parent b53cb49952
commit 16d57beaa9
4 changed files with 56 additions and 34 deletions

View File

@ -135,7 +135,7 @@ func (t *ThirdPartyServiceHanlder) ListEndpoints(sid string) ([]*model.EndpointR
}
m := make(map[string]*model.EndpointResp)
for _, item := range endpoints {
m[item.UUID] = &model.EndpointResp{
ep := &model.EndpointResp{
EpID: item.UUID,
Address: func(ip string, p int) string {
if p != 0 {
@ -147,8 +147,8 @@ func (t *ThirdPartyServiceHanlder) ListEndpoints(sid string) ([]*model.EndpointR
IsOnline: false,
IsStatic: true,
}
m[ep.Address] = ep
}
thirdPartyEndpoints, err := t.statusCli.ListThirdPartyEndpoints(sid)
if err != nil {
logrus.Warningf("ServiceID: %s; grpc; error listing third-party endpoints: %v", sid, err)
@ -156,26 +156,25 @@ func (t *ThirdPartyServiceHanlder) ListEndpoints(sid string) ([]*model.EndpointR
}
if thirdPartyEndpoints != nil && thirdPartyEndpoints.Obj != nil {
for _, item := range thirdPartyEndpoints.Obj {
ep := m[item.Uuid]
ep := m[fmt.Sprintf("%s:%d", item.Ip, item.Port)]
if ep != nil {
ep.IsOnline = true
ep.Status = item.Status
continue
}
m[item.Uuid] = &model.EndpointResp{
rep := &model.EndpointResp{
EpID: item.Uuid,
Address: item.Ip,
Status: item.Status,
IsOnline: true,
IsStatic: false,
}
m[rep.Address] = rep
}
}
var res []*model.EndpointResp
for _, item := range m {
res = append(res, item)
}
return res, nil
}

View File

@ -83,7 +83,7 @@ func NewDefault() Configuration {
cfg := Configuration{
Backend: defaults.Backend{
ProxyBodySize: bodySize,
ProxyConnectTimeout: 75,
ProxyConnectTimeout: 60,
ProxyReadTimeout: 60,
ProxySendTimeout: 60,
ProxyBuffersNumber: 4,

View File

@ -402,15 +402,15 @@ func (s *k8sStore) ListPool() ([]*v1.Pool, []*v1.Pool) {
pool.UpstreamHashBy = backend.hashBy
l7Pools[backend.name] = pool
}
var notReadyAddress *corev1.EndpointAddress
var notReadyPort *corev1.EndpointPort
for _, ss := range ep.Subsets {
for _, port := range ss.Ports {
var addresses []corev1.EndpointAddress
if ss.Addresses != nil && len(ss.Addresses) > 0 {
addresses = append(addresses, ss.Addresses...)
} else if len(ss.NotReadyAddresses) > 0 {
addresses = append(addresses, ss.NotReadyAddresses[0])
for i, port := range ss.Ports {
if (ss.Addresses == nil || len(ss.Addresses) == 0) && len(ss.NotReadyAddresses) > 0 {
notReadyAddress = &ss.NotReadyAddresses[0]
notReadyPort = &ss.Ports[i]
}
for _, address := range addresses {
for _, address := range ss.Addresses {
if _, ok := l7PoolMap[epn]; ok { // l7
pool.Nodes = append(pool.Nodes, &v1.Node{
Host: address.IP,
@ -421,6 +421,14 @@ func (s *k8sStore) ListPool() ([]*v1.Pool, []*v1.Pool) {
}
}
}
// If you have an address, make sure you have at least one node, regardless of its health
if len(pool.Nodes) == 0 && notReadyAddress != nil && notReadyPort != nil {
pool.Nodes = append(pool.Nodes, &v1.Node{
Host: notReadyAddress.IP,
Port: notReadyPort.Port,
Weight: backend.weight,
})
}
}
// l4
backends = l4PoolBackendMap[ep.ObjectMeta.Name]

View File

@ -49,8 +49,11 @@ func OneNodeClusterLoadAssignment(serviceAlias, namespace string, endpoints []*c
logrus.Debugf("select endpoints %d for service %s", len(selectEndpoint), service.Name)
var lendpoints []endpoint.LocalityLbEndpoints // localityLbEndpoints just support only one content
for _, en := range selectEndpoint {
var notReadyAddress *corev1.EndpointAddress
var notReadyPort *corev1.EndpointPort
var notreadyToPort int
for _, subset := range en.Subsets {
for _, port := range subset.Ports {
for i, port := range subset.Ports {
toport := int(port.Port)
if serviceAlias == destServiceAlias {
//use real port
@ -62,35 +65,47 @@ func OneNodeClusterLoadAssignment(serviceAlias, namespace string, endpoints []*c
}
}
protocol := string(port.Protocol)
addressList := subset.Addresses
var notready bool
if len(addressList) == 0 && len(subset.NotReadyAddresses) > 0 {
notready = true
addressList = subset.NotReadyAddresses[:1]
if len(subset.Addresses) == 0 && len(subset.NotReadyAddresses) > 0 {
notReadyAddress = &subset.NotReadyAddresses[0]
notreadyToPort = toport
notReadyPort = &subset.Ports[i]
}
getHealty := func() *endpoint.Endpoint_HealthCheckConfig {
if notready {
return nil
}
return &endpoint.Endpoint_HealthCheckConfig{
PortValue: uint32(toport),
}
}
var lbe []endpoint.LbEndpoint // just support one content
for _, address := range addressList {
envoyAddress := envoyv2.CreateSocketAddress(protocol, address.IP, uint32(toport))
lbe = append(lbe, endpoint.LbEndpoint{
HostIdentifier: &endpoint.LbEndpoint_Endpoint{
Endpoint: &endpoint.Endpoint{
Address: &envoyAddress,
HealthCheckConfig: getHealty(),
if len(subset.Addresses) > 0 {
var lbe []endpoint.LbEndpoint
for _, address := range subset.Addresses {
envoyAddress := envoyv2.CreateSocketAddress(protocol, address.IP, uint32(toport))
lbe = append(lbe, endpoint.LbEndpoint{
HostIdentifier: &endpoint.LbEndpoint_Endpoint{
Endpoint: &endpoint.Endpoint{
Address: &envoyAddress,
HealthCheckConfig: getHealty(),
},
},
},
})
})
}
if len(lbe) > 0 {
lendpoints = append(lendpoints, endpoint.LocalityLbEndpoints{LbEndpoints: lbe})
}
}
lendpoints = append(lendpoints, endpoint.LocalityLbEndpoints{LbEndpoints: lbe})
}
}
if len(lendpoints) == 0 && notReadyAddress != nil && notReadyPort != nil {
var lbe []endpoint.LbEndpoint
envoyAddress := envoyv2.CreateSocketAddress(string(notReadyPort.Protocol), notReadyAddress.IP, uint32(notreadyToPort))
lbe = append(lbe, endpoint.LbEndpoint{
HostIdentifier: &endpoint.LbEndpoint_Endpoint{
Endpoint: &endpoint.Endpoint{
Address: &envoyAddress,
},
},
})
lendpoints = append(lendpoints, endpoint.LocalityLbEndpoints{LbEndpoints: lbe})
}
}
cla := &v2.ClusterLoadAssignment{
ClusterName: clusterName,