mirror of
https://gitee.com/rainbond/Rainbond.git
synced 2024-12-02 11:47:36 +08:00
148 lines
4.1 KiB
Go
148 lines
4.1 KiB
Go
// Copyright 2016 The etcd Authors
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package client
|
|
|
|
import (
|
|
"encoding/json"
|
|
"fmt"
|
|
"time"
|
|
|
|
etcd "github.com/coreos/etcd/clientv3"
|
|
|
|
"google.golang.org/grpc/codes"
|
|
"google.golang.org/grpc/naming"
|
|
"google.golang.org/grpc/status"
|
|
|
|
"golang.org/x/net/context"
|
|
)
|
|
|
|
var ErrWatcherClosed = fmt.Errorf("naming: watch closed")
|
|
|
|
// GRPCResolver creates a grpc.Watcher for a target to track its resolution changes.
|
|
type GRPCResolver struct {
|
|
// Client is an initialized etcd client.
|
|
Client *etcd.Client
|
|
}
|
|
|
|
//Update Update
|
|
func (gr *GRPCResolver) Update(ctx context.Context, target string, nm naming.Update, opts ...etcd.OpOption) (err error) {
|
|
switch nm.Op {
|
|
case naming.Add:
|
|
var v []byte
|
|
if v, err = json.Marshal(nm); err != nil {
|
|
return status.Error(codes.InvalidArgument, err.Error())
|
|
}
|
|
_, err = gr.Client.KV.Put(ctx, target+"/"+nm.Addr, string(v), opts...)
|
|
case naming.Delete:
|
|
if gr.Client != nil {
|
|
_, err = gr.Client.Delete(ctx, target+"/"+nm.Addr, opts...)
|
|
}
|
|
default:
|
|
return status.Error(codes.InvalidArgument, "naming: bad naming op")
|
|
}
|
|
return err
|
|
}
|
|
|
|
//Resolve Resolve
|
|
func (gr *GRPCResolver) Resolve(target string) (naming.Watcher, error) {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
w := &gRPCWatcher{c: gr.Client, target: target + "/", ctx: ctx, cancel: cancel}
|
|
return w, nil
|
|
}
|
|
|
|
type gRPCWatcher struct {
|
|
c *etcd.Client
|
|
target string
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
watchCancel context.CancelFunc
|
|
wch etcd.WatchChan
|
|
err error
|
|
}
|
|
|
|
// Next gets the next set of updates from the etcd resolver.
|
|
// Calls to Next should be serialized; concurrent calls are not safe since
|
|
// there is no way to reconcile the update ordering.
|
|
func (gw *gRPCWatcher) Next() ([]*naming.Update, error) {
|
|
|
|
if gw.wch == nil {
|
|
// first Next() returns all addresses
|
|
ctx, cancel := context.WithCancel(gw.ctx)
|
|
gw.watchCancel = cancel
|
|
all, err := gw.firstNext(ctx)
|
|
return all, err
|
|
}
|
|
if gw.err != nil {
|
|
return nil, gw.err
|
|
}
|
|
|
|
// process new events on target/*
|
|
timer := time.NewTimer(time.Second * 20)
|
|
defer timer.Stop()
|
|
select {
|
|
case wr, ok := <-gw.wch:
|
|
if !ok {
|
|
gw.err = status.Error(codes.Unavailable, ErrWatcherClosed.Error())
|
|
return nil, gw.err
|
|
}
|
|
if gw.err = wr.Err(); gw.err != nil {
|
|
return nil, gw.err
|
|
}
|
|
updates := make([]*naming.Update, 0, len(wr.Events))
|
|
for _, e := range wr.Events {
|
|
var jupdate naming.Update
|
|
var err error
|
|
switch e.Type {
|
|
case etcd.EventTypePut:
|
|
err = json.Unmarshal(e.Kv.Value, &jupdate)
|
|
jupdate.Op = naming.Add
|
|
case etcd.EventTypeDelete:
|
|
err = json.Unmarshal(e.PrevKv.Value, &jupdate)
|
|
jupdate.Op = naming.Delete
|
|
}
|
|
if err == nil {
|
|
updates = append(updates, &jupdate)
|
|
}
|
|
}
|
|
return updates, nil
|
|
case <-timer.C:
|
|
gw.watchCancel()
|
|
gw.wch = nil
|
|
return gw.Next()
|
|
}
|
|
}
|
|
|
|
func (gw *gRPCWatcher) firstNext(ctx context.Context) ([]*naming.Update, error) {
|
|
// Use serialized request so resolution still works if the target etcd
|
|
// server is partitioned away from the quorum.
|
|
resp, err := gw.c.Get(ctx, gw.target, etcd.WithPrefix(), etcd.WithSerializable())
|
|
if gw.err = err; err != nil {
|
|
return nil, err
|
|
}
|
|
updates := make([]*naming.Update, 0, len(resp.Kvs))
|
|
for _, kv := range resp.Kvs {
|
|
var jupdate naming.Update
|
|
if err := json.Unmarshal(kv.Value, &jupdate); err != nil {
|
|
continue
|
|
}
|
|
updates = append(updates, &jupdate)
|
|
}
|
|
opts := []etcd.OpOption{etcd.WithRev(resp.Header.Revision + 1), etcd.WithPrefix(), etcd.WithPrevKV()}
|
|
gw.wch = gw.c.Watch(ctx, gw.target, opts...)
|
|
return updates, nil
|
|
}
|
|
|
|
func (gw *gRPCWatcher) Close() { gw.cancel() }
|