fix: polaris discouvery relation (#2603)

This commit is contained in:
houseme 2023-04-26 19:31:28 +08:00 committed by GitHub
parent ed63617aa0
commit 0126eb5470
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 49 additions and 84 deletions

View File

@ -4,7 +4,7 @@ go 1.15
require (
github.com/gogf/gf/v2 v2.4.0
github.com/polarismesh/polaris-go v1.3.0
github.com/polarismesh/polaris-go v1.4.1
)
replace github.com/gogf/gf/v2 => ../../../

View File

@ -35,10 +35,10 @@ import (
)
func main() {
conf := config.NewDefaultConfiguration([]string{"192.168.100.222:8091"})
conf := config.NewDefaultConfiguration([]string{"183.47.111.80:8091"})
// TTL egt 2*time.Second
gsvc.SetRegistry(polaris.NewRegistryWithConfig(conf, polaris.WithTTL(100)))
gsvc.SetRegistry(polaris.NewRegistryWithConfig(conf, polaris.WithTTL(10)))
s := g.Server(`hello.svc`)
s.BindHandler("/", func(r *ghttp.Request) {
@ -68,9 +68,9 @@ import (
)
func main() {
conf := config.NewDefaultConfiguration([]string{"192.168.100.222:8091"})
conf := config.NewDefaultConfiguration([]string{"183.47.111.80:8091"})
gsvc.SetRegistry(polaris.NewRegistryWithConfig(conf, polaris.WithTTL(100)))
gsvc.SetRegistry(polaris.NewRegistryWithConfig(conf, polaris.WithTTL(10)))
for i := 0; i < 100; i++ {
res, err := g.Client().Get(gctx.New(), `http://hello.svc/`)

View File

@ -41,10 +41,10 @@ import (
)
func main() {
conf := config.NewDefaultConfiguration([]string{"192.168.100.222:8091"})
conf := config.NewDefaultConfiguration([]string{"183.47.111.80:8091"})
// TTL egt 2*time.Second
gsvc.SetRegistry(polaris.NewRegistryWithConfig(conf, polaris.WithTTL(100)))
gsvc.SetRegistry(polaris.NewRegistryWithConfig(conf, polaris.WithTTL(10)))
s := g.Server(`hello.svc`)
s.BindHandler("/", func(r *ghttp.Request) {
@ -75,9 +75,9 @@ import (
)
func main() {
conf := config.NewDefaultConfiguration([]string{"192.168.100.222:8091"})
conf := config.NewDefaultConfiguration([]string{"183.47.111.80:8091"})
gsvc.SetRegistry(polaris.NewRegistryWithConfig(conf, polaris.WithTTL(100)))
gsvc.SetRegistry(polaris.NewRegistryWithConfig(conf, polaris.WithTTL(10)))
for i := 0; i < 100; i++ {
res, err := g.Client().Get(gctx.New(), `http://hello.svc/`)

View File

@ -47,12 +47,6 @@ type options struct {
// To show service is healthy or not. Default value is True.
Healthy bool
// Deprecated: Use RegisterInstance instead.
// Service registration is performed synchronously,
// and heartbeat reporting is automatically performed
// Heartbeat enable .Not in polaris. Default value is True.
Heartbeat bool
// To show service is isolate or not. Default value is False.
Isolate bool
@ -78,7 +72,6 @@ type Registry struct {
opt options
provider polaris.ProviderAPI
consumer polaris.ConsumerAPI
c chan struct{}
}
// WithNamespace with the Namespace option.
@ -126,12 +119,6 @@ func WithRetryCount(retryCount int) Option {
return func(o *options) { o.RetryCount = retryCount }
}
// WithHeartbeat with the Heartbeat option.
// Deprecated remove in v2.4.0
func WithHeartbeat(heartbeat bool) Option {
return func(o *options) { o.Heartbeat = heartbeat }
}
// WithLogger with the Logger option.
func WithLogger(logger glog.ILogger) Option {
return func(o *options) { o.Logger = logger }
@ -146,7 +133,6 @@ func New(provider polaris.ProviderAPI, consumer polaris.ConsumerAPI, opts ...Opt
Weight: 0,
Priority: 0,
Healthy: true,
Heartbeat: true,
Isolate: false,
TTL: 0,
Timeout: 0,
@ -160,7 +146,6 @@ func New(provider polaris.ProviderAPI, consumer polaris.ConsumerAPI, opts ...Opt
opt: op,
provider: provider,
consumer: consumer,
c: make(chan struct{}),
}
}

View File

@ -7,6 +7,7 @@
package polaris
import (
"bytes"
"context"
"fmt"
"strings"
@ -28,6 +29,7 @@ func (r *Registry) Search(ctx context.Context, in gsvc.SearchInput) ([]gsvc.Serv
}
in.Prefix = service.GetPrefix()
}
in.Prefix = trimAndReplace(in.Prefix)
// get all instances
instancesResponse, err := r.consumer.GetAllInstances(&polaris.GetAllInstancesRequest{
GetAllInstancesRequest: model.GetAllInstancesRequest{
@ -44,7 +46,7 @@ func (r *Registry) Search(ctx context.Context, in gsvc.SearchInput) ([]gsvc.Serv
// Service filter.
filteredServices := make([]gsvc.Service, 0)
for _, service := range serviceInstances {
if in.Prefix != "" && !gstr.HasPrefix(service.GetKey(), in.Prefix) {
if in.Prefix != "" && !gstr.HasPrefix(trimAndReplace(service.GetKey()), in.Prefix) {
continue
}
if in.Name != "" && service.GetName() != in.Name {
@ -67,8 +69,8 @@ func (r *Registry) Search(ctx context.Context, in gsvc.SearchInput) ([]gsvc.Serv
}
// Watch creates a watcher according to the service name.
func (r *Registry) Watch(ctx context.Context, serviceName string) (gsvc.Watcher, error) {
return newWatcher(ctx, r.opt.Namespace, serviceName, r.consumer)
func (r *Registry) Watch(ctx context.Context, key string) (gsvc.Watcher, error) {
return newWatcher(ctx, r.opt.Namespace, trimAndReplace(key), r.consumer)
}
func instancesToServiceInstances(instances []model.Instance) []gsvc.Service {
@ -89,11 +91,18 @@ func instanceToServiceInstance(instance model.Instance) gsvc.Service {
endpoints = gsvc.NewEndpoints(fmt.Sprintf("%s:%d", instance.GetHost(), instance.GetPort()))
)
if names != nil && len(names) > 4 {
var name bytes.Buffer
for i := 3; i < len(names)-1; i++ {
name.WriteString(names[i])
if i < len(names)-2 {
name.WriteString(instanceIDSeparator)
}
}
s = &gsvc.LocalService{
Head: names[0],
Deployment: names[1],
Namespace: names[2],
Name: names[3],
Name: name.String(),
Version: metadata[metadataKeyVersion],
Metadata: gconv.Map(metadata),
Endpoints: endpoints,
@ -111,3 +120,10 @@ func instanceToServiceInstance(instance model.Instance) gsvc.Service {
Service: s,
}
}
// trimAndReplace trims the prefix and suffix separator and replaces the separator in the middle.
func trimAndReplace(key string) string {
key = gstr.Trim(key, gsvc.DefaultSeparator)
key = gstr.Replace(key, gsvc.DefaultSeparator, instanceIDSeparator)
return key
}

View File

@ -8,7 +8,6 @@ package polaris
import (
"context"
"time"
"github.com/polarismesh/polaris-go"
"github.com/polarismesh/polaris-go/pkg/model"
@ -25,17 +24,18 @@ func (r *Registry) Register(ctx context.Context, service gsvc.Service) (gsvc.Ser
Service: service,
}
// Register logic.
var (
ids = make([]string, 0, len(service.GetEndpoints()))
serviceVersion = service.GetVersion()
)
var ids = make([]string, 0, len(service.GetEndpoints()))
for _, endpoint := range service.GetEndpoints() {
// medata
var rmd map[string]interface{}
var (
rmd map[string]interface{}
serviceName = service.GetPrefix()
serviceVersion = service.GetVersion()
)
if service.GetMetadata().IsEmpty() {
rmd = map[string]interface{}{
metadataKeyKind: gsvc.DefaultProtocol,
metadataKeyVersion: service.GetVersion(),
metadataKeyVersion: serviceVersion,
}
} else {
rmd = make(map[string]interface{}, len(service.GetMetadata())+2)
@ -53,7 +53,7 @@ func (r *Registry) Register(ctx context.Context, service gsvc.Service) (gsvc.Ser
registeredService, err := r.provider.RegisterInstance(
&polaris.InstanceRegisterRequest{
InstanceRegisterRequest: model.InstanceRegisterRequest{
Service: service.GetPrefix(),
Service: serviceName,
ServiceToken: r.opt.ServiceToken,
Namespace: r.opt.Namespace,
Host: endpoint.Host(),
@ -73,9 +73,6 @@ func (r *Registry) Register(ctx context.Context, service gsvc.Service) (gsvc.Ser
if err != nil {
return nil, err
}
if r.opt.Heartbeat {
// r.doHeartBeat(ctx, registeredService.InstanceID, service, endpoint)
}
ids = append(ids, registeredService.InstanceID)
}
// need to set InstanceID for Deregister
@ -85,7 +82,6 @@ func (r *Registry) Register(ctx context.Context, service gsvc.Service) (gsvc.Ser
// Deregister the registration.
func (r *Registry) Deregister(ctx context.Context, service gsvc.Service) error {
// r.c <- struct{}{}
var (
err error
split = gstr.Split(service.(*Service).ID, instanceIDSeparator)
@ -112,37 +108,3 @@ func (r *Registry) Deregister(ctx context.Context, service gsvc.Service) error {
}
return nil
}
// Deprecated .
func (r *Registry) doHeartBeat(ctx context.Context, instanceID string, service gsvc.Service, endpoint gsvc.Endpoint) {
go func() {
ticker := time.NewTicker(time.Second * time.Duration(r.opt.TTL))
defer ticker.Stop()
for {
select {
case <-ticker.C:
err := r.provider.Heartbeat(&polaris.InstanceHeartbeatRequest{
InstanceHeartbeatRequest: model.InstanceHeartbeatRequest{
Service: service.GetPrefix(),
Namespace: r.opt.Namespace,
Host: endpoint.Host(),
Port: endpoint.Port(),
ServiceToken: r.opt.ServiceToken,
InstanceID: instanceID,
Timeout: &r.opt.Timeout,
RetryCount: &r.opt.RetryCount,
},
})
if err != nil {
r.opt.Logger.Error(ctx, err.Error())
continue
}
r.opt.Logger.Debug(ctx, "heartbeat success")
case <-r.c:
r.opt.Logger.Debug(ctx, "stop heartbeat")
return
}
}
}()
}

View File

@ -9,9 +9,10 @@ package polaris
import (
"context"
"github.com/gogf/gf/v2/net/gsvc"
"github.com/polarismesh/polaris-go"
"github.com/polarismesh/polaris-go/pkg/model"
"github.com/gogf/gf/v2/net/gsvc"
)
// Watcher is a service watcher.
@ -24,12 +25,12 @@ type Watcher struct {
ServiceInstances []gsvc.Service
}
func newWatcher(ctx context.Context, namespace string, serviceName string, consumer polaris.ConsumerAPI) (*Watcher, error) {
func newWatcher(ctx context.Context, namespace string, key string, consumer polaris.ConsumerAPI) (*Watcher, error) {
watchServiceResponse, err := consumer.WatchService(&polaris.WatchServiceRequest{
WatchServiceRequest: model.WatchServiceRequest{
Key: model.ServiceKey{
Namespace: namespace,
Service: serviceName,
Service: key,
},
},
})
@ -39,7 +40,7 @@ func newWatcher(ctx context.Context, namespace string, serviceName string, consu
w := &Watcher{
Namespace: namespace,
ServiceName: serviceName,
ServiceName: key,
Channel: watchServiceResponse.EventChannel,
ServiceInstances: instancesToServiceInstances(watchServiceResponse.GetAllInstancesResp.GetInstances()),
}

View File

@ -1,11 +1,11 @@
global:
serverConnector:
addresses:
- 127.0.0.1:8091
- 183.47.111.80:8091
config:
configConnector:
addresses:
- 127.0.0.1:8093
- 183.47.111.80:8093
consumer:
localCache:
persistDir: "/tmp/polaris/backup"

View File

@ -15,7 +15,7 @@ import (
)
func main() {
conf := config.NewDefaultConfiguration([]string{"192.168.100.222:8091"})
conf := config.NewDefaultConfiguration([]string{"183.47.111.80:8091"})
conf.Consumer.LocalCache.SetPersistDir("/tmp/polaris/backup")
if err := api.SetLoggersDir("/tmp/polaris/log"); err != nil {
g.Log().Fatal(context.Background(), err)

View File

@ -3,16 +3,17 @@ package main
import (
"context"
"github.com/polarismesh/polaris-go/api"
"github.com/polarismesh/polaris-go/pkg/config"
"github.com/gogf/gf/contrib/registry/polaris/v2"
"github.com/gogf/gf/v2/frame/g"
"github.com/gogf/gf/v2/net/ghttp"
"github.com/gogf/gf/v2/net/gsvc"
"github.com/polarismesh/polaris-go/api"
"github.com/polarismesh/polaris-go/pkg/config"
)
func main() {
conf := config.NewDefaultConfiguration([]string{"192.168.100.222:8091"})
conf := config.NewDefaultConfiguration([]string{"183.47.111.80:8091"})
conf.Consumer.LocalCache.SetPersistDir("/tmp/polaris/backup")
if err := api.SetLoggersDir("/tmp/polaris/log"); err != nil {
g.Log().Fatal(context.Background(), err)