[FIX] fix the bug that inconsistent name of envoy listener

This commit is contained in:
barnettZQG 2018-07-10 11:32:23 +08:00
parent 2c3e1e8805
commit ff1ff248c3
4 changed files with 20 additions and 15 deletions

View File

@ -79,6 +79,7 @@ func Run(c *option.Conf) error {
sharedInformers.Core().V1().Services().Informer() sharedInformers.Core().V1().Services().Informer()
sharedInformers.Core().V1().Endpoints().Informer() sharedInformers.Core().V1().Endpoints().Informer()
sharedInformers.Start(stop) sharedInformers.Start(stop)
defer close(stop)
s, err := nodeserver.NewNodeServer(c) //todo 配置文件 done s, err := nodeserver.NewNodeServer(c) //todo 配置文件 done
if err != nil { if err != nil {

View File

@ -823,7 +823,7 @@ func CreateHTTPCommonListener(name string, vh ...*VirtualHost) *Listener {
//CreateTCPCommonListener create tcp simple common listener //CreateTCPCommonListener create tcp simple common listener
//listen the specified port //listen the specified port
//associate the specified cluster. //associate the specified cluster.
func CreateTCPCommonListener(clusterName string, address string) *Listener { func CreateTCPCommonListener(listenerName, clusterName string, address string) *Listener {
ptr := &TCPRoute{ ptr := &TCPRoute{
Cluster: clusterName, Cluster: clusterName,
} }
@ -831,7 +831,7 @@ func CreateTCPCommonListener(clusterName string, address string) *Listener {
Routes: []*TCPRoute{ptr}, Routes: []*TCPRoute{ptr},
} }
lcg := &TCPProxyFilterConfig{ lcg := &TCPProxyFilterConfig{
StatPrefix: clusterName, StatPrefix: listenerName,
RouteConfig: lrs, RouteConfig: lrs,
} }
lfs := &NetworkFilter{ lfs := &NetworkFilter{
@ -839,7 +839,7 @@ func CreateTCPCommonListener(clusterName string, address string) *Listener {
Config: lcg, Config: lcg,
} }
plds := &Listener{ plds := &Listener{
Name: clusterName, Name: listenerName,
Address: address, Address: address,
Filters: []*NetworkFilter{lfs}, Filters: []*NetworkFilter{lfs},
BindToPort: true, BindToPort: true,

View File

@ -43,18 +43,28 @@ func TestGetPodsByNodeName(t *testing.T) {
func TestSharedInformerFactory(t *testing.T) { func TestSharedInformerFactory(t *testing.T) {
sharedInformers := informers.NewSharedInformerFactory(K8S, time.Hour*10) sharedInformers := informers.NewSharedInformerFactory(K8S, time.Hour*10)
sharedInformers.Core().V1().Nodes().Informer() sharedInformers.Core().V1().Nodes().Informer()
sharedInformers.Core().V1().Services().Informer()
stop := make(chan struct{}) stop := make(chan struct{})
sharedInformers.Start(stop) sharedInformers.Start(stop)
time.Sleep(time.Second * 30)
selector, err := labels.Parse("") selector, err := labels.Parse("")
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
for i := 0; i < 10; i++ { for i := 0; i < 2; i++ {
nodes, err := sharedInformers.Core().V1().Nodes().Lister().List(selector) nodes, err := sharedInformers.Core().V1().Nodes().Lister().List(selector)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
t.Log(nodes) t.Log(nodes)
}
for i := 0; i < 2; i++ {
selector, _ := labels.Parse("name=gr87b487Service")
nodes, err := sharedInformers.Core().V1().Services().Lister().Services("824b2e9dcc4d461a852ddea20369d377").List(selector)
if err != nil {
t.Fatal(err)
}
t.Log(nodes)
time.Sleep(time.Second * 5) time.Sleep(time.Second * 5)
} }

View File

@ -322,25 +322,19 @@ func (d *DiscoverAction) upstreamListener(serviceAlias, namespace string, depend
continue continue
} }
port := service.Spec.Ports[0].Port port := service.Spec.Ports[0].Port
clusterName := fmt.Sprintf("%s_%s_%s_%d", namespace, serviceAlias, destServiceAlias, port)
// Unique by listen port // Unique by listen port
if index, ok := portMap[port]; !ok { if _, ok := portMap[port]; !ok {
clusterName := fmt.Sprintf("%s_%s_%s_%d", namespace, serviceAlias, destServiceAlias, port) listenerName := fmt.Sprintf("%s_%s_%d", namespace, serviceAlias, port)
plds := envoyv1.CreateTCPCommonListener(clusterName, fmt.Sprintf("tcp://127.0.0.1:%d", port)) plds := envoyv1.CreateTCPCommonListener(listenerName, clusterName, fmt.Sprintf("tcp://127.0.0.1:%d", port))
ldsL = append(ldsL, plds) ldsL = append(ldsL, plds)
portMap[port] = len(ldsL) - 1 portMap[port] = len(ldsL) - 1
} else if index != -1 {
clusterName := fmt.Sprintf("%s_%s_%d", namespace, serviceAlias, port)
plds := envoyv1.CreateTCPCommonListener(clusterName, fmt.Sprintf("tcp://127.0.0.1:%d", port))
ldsL[index] = plds
//only create one cluster for same port
portMap[port] = -1
} }
portProtocol, ok := service.Labels["port_protocol"] portProtocol, ok := service.Labels["port_protocol"]
if !ok { if !ok {
portProtocol = destService.Protocol portProtocol = destService.Protocol
} }
if portProtocol != "" { if portProtocol != "" {
clusterName := fmt.Sprintf("%s_%s_%s_%d", namespace, serviceAlias, destServiceAlias, port)
//TODO: support more protocol //TODO: support more protocol
switch portProtocol { switch portProtocol {
case "http", "https": case "http", "https":
@ -393,7 +387,7 @@ func (d *DiscoverAction) downstreamListener(serviceAlias, namespace string, port
port := int32(p.Port) port := int32(p.Port)
clusterName := fmt.Sprintf("%s_%s_%d", namespace, serviceAlias, port) clusterName := fmt.Sprintf("%s_%s_%d", namespace, serviceAlias, port)
if _, ok := portMap[port]; !ok { if _, ok := portMap[port]; !ok {
plds := envoyv1.CreateTCPCommonListener(clusterName, fmt.Sprintf("tcp://0.0.0.0:%d", p.ListenPort)) plds := envoyv1.CreateTCPCommonListener(clusterName, clusterName, fmt.Sprintf("tcp://0.0.0.0:%d", p.ListenPort))
ldsL = append(ldsL, plds) ldsL = append(ldsL, plds)
portMap[port] = 1 portMap[port] = 1
} }