[REV] worker support HA

master node auto elect and switch (#61)
This commit is contained in:
barnett 2018-04-11 16:49:37 +08:00
parent d32f6dae07
commit 9655f9979b
9 changed files with 415 additions and 100 deletions

View File

@ -61,15 +61,12 @@ func Run(s *option.Worker) error {
}
defer event.CloseManager()
//step 2 : create status watching
if s.RunMode == "sync" {
ars := appruntimesync.CreateAppRuntimeSync(s.Config)
if err := ars.Start(); err != nil {
return err
}
defer ars.Stop()
go ars.SyncStatus()
}
//step 2 : create and start app runtime module
errchan := make(chan error, 2)
ars := appruntimesync.CreateAppRuntimeSync(s.Config)
ars.Start(errchan)
defer ars.Stop()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
statusClient, err := client.NewClient(ctx, client.AppRuntimeSyncClientConf{
@ -119,6 +116,8 @@ func Run(s *option.Worker) error {
select {
case <-term:
logrus.Warn("Received SIGTERM, exiting gracefully...")
case <-errchan:
logrus.Warnf("Received Master worker election error:%s", err.Error())
case err := <-errChan:
logrus.Errorf("Received a error %s, exiting gracefully...", err.Error())
}

View File

@ -1105,53 +1105,6 @@
}
}
},
"/v2/opentsdb/query": {
"post": {
"description": "query opentsdb",
"produces": [
"application/json",
"application/xml"
],
"tags": [
"v2"
],
"summary": "监控数据查询",
"operationId": "oentsdbquery",
"parameters": [
{
"name": "Body",
"in": "body",
"schema": {
"type": "object",
"required": [
"start",
"queries"
],
"properties": {
"queries": {
"description": "in: body",
"type": "string",
"x-go-name": "Queries"
},
"start": {
"description": "in: body",
"type": "string",
"x-go-name": "Start"
}
}
}
}
],
"responses": {
"default": {
"description": "统一返回格式",
"schema": {
"$ref": "#/responses/commandResponse"
}
}
}
}
},
"/v2/resources/services": {
"post": {
"description": "get service resources",
@ -1437,7 +1390,7 @@
}
},
"/v2/tenants/{tenant_name}/chargesverify": {
"post": {
"get": {
"description": "service charges verify",
"consumes": [
"application/json",
@ -1454,7 +1407,7 @@
"operationId": "chargesverify",
"responses": {
"default": {
"description": "统一返回格式",
"description": "状态码非200表示验证过程发生错误。状态码200msg代表实际状态success, illegal_quantity, missing_tenant, owned_fee, region_unauthorized, lack_of_memory",
"schema": {
"$ref": "#/responses/commandResponse"
}
@ -3319,6 +3272,31 @@
}
},
"/v2/tenants/{tenant_name}/services/{service_alias}/env": {
"put": {
"description": "update env var",
"consumes": [
"application/json",
"application/x-protobuf"
],
"produces": [
"application/json",
"application/xml"
],
"tags": [
"v2",
"update"
],
"summary": "修改环境变量",
"operationId": "Env",
"responses": {
"default": {
"description": "统一返回格式",
"schema": {
"$ref": "#/responses/commandResponse"
}
}
}
},
"post": {
"description": "add env var",
"consumes": [

View File

@ -23,12 +23,15 @@ import (
"fmt"
"net"
"github.com/goodrain/rainbond/pkg/util"
"github.com/Sirupsen/logrus"
client "github.com/coreos/etcd/clientv3"
"github.com/goodrain/rainbond/cmd/worker/option"
"github.com/goodrain/rainbond/pkg/appruntimesync/pb"
"github.com/goodrain/rainbond/pkg/appruntimesync/server"
discover "github.com/goodrain/rainbond/pkg/discover.v2"
"github.com/goodrain/rainbond/pkg/util/etcd/etcdlock"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
)
@ -43,19 +46,69 @@ type AppRuntimeSync struct {
cancel context.CancelFunc
srss *server.AppRuntimeSyncServer
keepalive *discover.KeepAlive
master etcdlock.MasterInterface
hostIP string
}
//Start start if have master right
//start grpc server
func (a *AppRuntimeSync) Start() error {
a.srss.Start()
go a.startAppRuntimeSync()
return a.registServer()
func (a *AppRuntimeSync) Start(errchan chan error) {
if a.hostIP == "" {
ip, err := util.LocalIP()
if err != nil {
logrus.Errorf("get ip failed,details %s", err.Error())
errchan <- err
}
a.hostIP = ip.String()
}
go a.start(errchan)
}
func (a *AppRuntimeSync) start(errchan chan error) {
for {
master, err := etcdlock.CreateMasterLock(a.conf.EtcdEndPoints, "/rainbond/workermaster", fmt.Sprintf("%s:%d", a.conf.HostIP, 6535), 10)
if err != nil {
errchan <- err
return
}
a.master = master
master.Start()
loop:
for {
select {
case event := <-master.EventsChan():
if event.Type == etcdlock.MasterAdded {
a.srss.Start()
go a.startAppRuntimeSync()
a.registServer()
}
if event.Type == etcdlock.MasterDeleted {
errchan <- fmt.Errorf("worker node %s exit ", fmt.Sprintf("%s:%d", a.conf.HostIP, 6535))
return
}
if event.Type == etcdlock.MasterError {
a.master.Stop()
if a.keepalive != nil {
a.keepalive.Stop()
a.keepalive = nil
}
break loop
}
}
}
select {
case <-a.ctx.Done():
return
default:
}
}
}
//Stop stop app runtime sync server
func (a *AppRuntimeSync) Stop() error {
a.srss.Stop()
if a.master != nil {
a.master.Stop()
}
if a.keepalive != nil {
a.keepalive.Stop()
}
@ -81,6 +134,7 @@ func CreateAppRuntimeSync(conf option.Config) *AppRuntimeSync {
conf: conf,
server: grpc.NewServer(),
srss: server.NewAppRuntimeSyncServer(conf),
hostIP: conf.HostIP,
}
pb.RegisterAppRuntimeSyncServer(ars.server, ars.srss)
// Register reflection service on gRPC server.
@ -97,8 +151,3 @@ func (a *AppRuntimeSync) startAppRuntimeSync() error {
}
return a.server.Serve(lis)
}
//SyncStatus sync status
func (a *AppRuntimeSync) SyncStatus() {
a.srss.StatusManager.SyncStatus()
}

View File

@ -86,6 +86,7 @@ func (s *Manager) Start() error {
s.cacheAllAPPStatus()
go s.checkStatus()
go s.handleUpdate()
go s.SyncStatus()
logrus.Info("status manager started")
return nil
}

View File

@ -20,7 +20,6 @@ package discover
import (
"context"
"fmt"
"os"
"time"
@ -39,23 +38,21 @@ const WTOPIC string = "builder"
//TaskManager task
type TaskManager struct {
ctx context.Context
cancel context.CancelFunc
config option.Config
stopChan chan struct{}
client pb.TaskQueueClient
exec exector.Manager
ctx context.Context
cancel context.CancelFunc
config option.Config
client pb.TaskQueueClient
exec exector.Manager
}
//NewTaskManager return *TaskManager
func NewTaskManager(c option.Config, exec exector.Manager) *TaskManager {
ctx, cancel := context.WithCancel(context.Background())
return &TaskManager{
ctx: ctx,
cancel: cancel,
config: c,
stopChan: make(chan struct{}),
exec: exec,
ctx: ctx,
cancel: cancel,
config: c,
exec: exec,
}
}
@ -115,13 +112,4 @@ func (t *TaskManager) Do() {
func (t *TaskManager) Stop() error {
logrus.Info("discover manager is stoping.")
t.cancel()
tick := time.NewTicker(time.Second * 3)
select {
case <-t.stopChan:
return nil
case <-tick.C:
logrus.Error("task queue listen closed time out")
return fmt.Errorf("task queue listen closed time out")
}
}

View File

@ -0,0 +1,163 @@
// RAINBOND, Application Management Platform
// Copyright (C) 2014-2017 Goodrain Co., Ltd.
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version. For any non-GPL usage of Rainbond,
// one or multiple Commercial Licenses authorized by Goodrain Co., Ltd.
// must be obtained first.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
//Package etcdlock Master election using etcd.
package etcdlock
import (
"context"
"errors"
"fmt"
"github.com/Sirupsen/logrus"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/clientv3/concurrency"
)
//MasterEventType Various event types for the events channel.
type MasterEventType int
const (
//MasterAdded this node has the lock.
MasterAdded MasterEventType = iota
//MasterDeleted MasterDeleted
MasterDeleted
//MasterModified MasterModified
MasterModified
//MasterError MasterError
MasterError
)
// MasterEvent represents a single event sent on the events channel.
type MasterEvent struct {
Type MasterEventType // event type
Master string // identity of the lock holder
}
//MasterInterface Interface used by the etcd master lock clients.
type MasterInterface interface {
// Start the election and attempt to acquire the lock. If acquired, the
// lock is refreshed periodically based on the ttl.
Start()
// Stops watching the lock. Closes the events channel.
Stop()
// Returns the event channel used by the etcd lock.
EventsChan() <-chan MasterEvent
// Method to get the current lockholder. Returns "" if free.
GetHolder() string
}
type masterLock struct {
ctx context.Context
cancel context.CancelFunc
client *clientv3.Client
electionname string
prop string
etcdEndpoints []string
election *concurrency.Election
session *concurrency.Session
eventchan chan MasterEvent
ttl int64
leaseID clientv3.LeaseID
}
//CreateMasterLock create master lock
func CreateMasterLock(etcdEndpoints []string, election string, prop string, ttl int64) (MasterInterface, error) {
if etcdEndpoints == nil || len(etcdEndpoints) == 0 {
etcdEndpoints = []string{"http://127.0.0.1:2379"}
}
ctx, cancel := context.WithCancel(context.Background())
client, err := clientv3.New(clientv3.Config{
Endpoints: etcdEndpoints,
Context: ctx,
})
if err != nil {
cancel()
return nil, fmt.Errorf("create etcd client error,%s", err.Error())
}
lease, err := client.Lease.Grant(ctx, ttl)
if err != nil {
cancel()
return nil, fmt.Errorf("create etcd lease error,%s", err.Error())
}
s, err := concurrency.NewSession(client, concurrency.WithContext(ctx), concurrency.WithLease(lease.ID))
if err != nil {
cancel()
return nil, fmt.Errorf("new election session error,%s", err.Error())
}
e := concurrency.NewElection(s, election)
ml := &masterLock{
ctx: ctx,
cancel: cancel,
client: client,
electionname: election,
prop: prop,
etcdEndpoints: etcdEndpoints,
election: e,
session: s,
eventchan: make(chan MasterEvent, 2),
leaseID: lease.ID,
}
return ml, nil
}
// Campaign puts a value as eligible for the election. It blocks until
// it is elected, an error occurs, or the context is cancelled.
func (m *masterLock) campaign() error {
logrus.Infof("start campaign master")
if err := m.election.Campaign(m.ctx, m.prop); err != nil {
return err
}
//elected
logrus.Infof("current node is be elected master")
select {
case res := <-m.election.Observe(m.ctx):
m.eventchan <- MasterEvent{Type: MasterAdded, Master: string(res.Kvs[0].Value)}
case <-m.ctx.Done():
return m.resign()
case <-m.session.Done():
m.eventchan <- MasterEvent{Type: MasterError, Master: ""}
return errors.New("elect: session expired")
}
return nil
}
func (m *masterLock) resign() error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
return m.election.Resign(ctx)
}
func (m *masterLock) Start() {
go m.campaign()
}
func (m *masterLock) Stop() {
m.cancel()
m.resign()
}
func (m *masterLock) EventsChan() <-chan MasterEvent {
return m.eventchan
}
func (m *masterLock) GetHolder() string {
return ""
}

View File

@ -0,0 +1,59 @@
// RAINBOND, Application Management Platform
// Copyright (C) 2014-2017 Goodrain Co., Ltd.
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version. For any non-GPL usage of Rainbond,
// one or multiple Commercial Licenses authorized by Goodrain Co., Ltd.
// must be obtained first.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package etcdlock
import (
"testing"
"time"
"github.com/Sirupsen/logrus"
)
func TestMasterLock(t *testing.T) {
master1, err := CreateMasterLock(nil, "/rainbond/appruntimesyncmaster", "127.0.0.1:1", 10)
if err != nil {
t.Fatal(err)
}
master1.Start()
defer master1.Stop()
master2, err := CreateMasterLock(nil, "/rainbond/appruntimesyncmaster", "127.0.0.1:2", 10)
if err != nil {
t.Fatal(err)
}
master2.Start()
defer master2.Stop()
logrus.Info("start receive event")
for {
select {
case event := <-master1.EventsChan():
logrus.Info(event, time.Now())
master1.Stop()
if event.Type == MasterDeleted {
logrus.Info("delete")
return
}
case event := <-master2.EventsChan():
logrus.Info(event, time.Now())
if event.Type == MasterDeleted {
logrus.Info("delete")
return
}
}
}
}

View File

@ -20,7 +20,6 @@ package discover
import (
"context"
"fmt"
"os"
"time"
@ -45,7 +44,6 @@ type TaskManager struct {
ctx context.Context
cancel context.CancelFunc
config option.Config
stopChan chan struct{}
handleManager *handle.Manager
client pb.TaskQueueClient
}
@ -58,7 +56,6 @@ func NewTaskManager(c option.Config, executor executor.Manager, statusManager *s
ctx: ctx,
cancel: cancel,
config: c,
stopChan: make(chan struct{}),
handleManager: handleManager,
}
}
@ -133,13 +130,5 @@ func (t *TaskManager) Do() {
func (t *TaskManager) Stop() error {
logrus.Info("discover manager is stoping.")
t.cancel()
tick := time.NewTicker(time.Second * 3)
select {
case <-t.stopChan:
return nil
case <-tick.C:
logrus.Error("task queue listen closed time out")
return fmt.Errorf("task queue listen closed time out")
}
return nil
}

View File

@ -0,0 +1,89 @@
// Copyright 2017 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package concurrency_test
import (
"context"
"sync"
"testing"
"time"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/clientv3/concurrency"
)
func TestCampaign(t *testing.T) {
cli, err := clientv3.New(clientv3.Config{Endpoints: []string{"http://127.0.0.1:2379"}})
if err != nil {
t.Fatal(err)
}
defer cli.Close()
// create two separate sessions for election competition
s1, err := concurrency.NewSession(cli)
if err != nil {
t.Fatal(err)
}
defer s1.Close()
e1 := concurrency.NewElection(s1, "/my-election/")
s2, err := concurrency.NewSession(cli)
if err != nil {
t.Fatal(err)
}
defer s2.Close()
e2 := concurrency.NewElection(s2, "/my-election/")
// create competing candidates, with e1 initially losing to e2
var wg sync.WaitGroup
wg.Add(2)
electc := make(chan *concurrency.Election, 2)
go func() {
defer wg.Done()
// delay candidacy so e2 wins first
time.Sleep(3 * time.Second)
if err := e1.Campaign(context.Background(), "e1"); err != nil {
t.Fatal(err)
}
electc <- e1
}()
go func() {
defer wg.Done()
if err := e2.Campaign(context.Background(), "e2"); err != nil {
t.Fatal(err)
}
electc <- e2
}()
cctx, cancel := context.WithCancel(context.TODO())
defer cancel()
e := <-electc
t.Log("completed first election with", string((<-e.Observe(cctx)).Kvs[0].Value))
// resign so next candidate can be elected
if err := e.Resign(context.TODO()); err != nil {
t.Fatal(err)
}
e = <-electc
t.Log("completed second election with", string((<-e.Observe(cctx)).Kvs[0].Value))
wg.Wait()
// Output:
// completed first election with e2
// completed second election with e1
}