diff --git a/cmd/worker/server/server.go b/cmd/worker/server/server.go
index f415fa382..8a7362be5 100644
--- a/cmd/worker/server/server.go
+++ b/cmd/worker/server/server.go
@@ -61,15 +61,12 @@ func Run(s *option.Worker) error {
}
defer event.CloseManager()
- //step 2 : create status watching
- if s.RunMode == "sync" {
- ars := appruntimesync.CreateAppRuntimeSync(s.Config)
- if err := ars.Start(); err != nil {
- return err
- }
- defer ars.Stop()
- go ars.SyncStatus()
- }
+ //step 2 : create and start app runtime module
+ errchan := make(chan error, 2)
+ ars := appruntimesync.CreateAppRuntimeSync(s.Config)
+ ars.Start(errchan)
+ defer ars.Stop()
+
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
statusClient, err := client.NewClient(ctx, client.AppRuntimeSyncClientConf{
@@ -119,6 +116,8 @@ func Run(s *option.Worker) error {
select {
case <-term:
logrus.Warn("Received SIGTERM, exiting gracefully...")
+ case <-errchan:
+ logrus.Warnf("Received Master worker election error:%s", err.Error())
case err := <-errChan:
logrus.Errorf("Received a error %s, exiting gracefully...", err.Error())
}
diff --git a/hack/contrib/docker/api/html/swagger.json b/hack/contrib/docker/api/html/swagger.json
index b4e30b849..595923b9b 100644
--- a/hack/contrib/docker/api/html/swagger.json
+++ b/hack/contrib/docker/api/html/swagger.json
@@ -1105,53 +1105,6 @@
}
}
},
- "/v2/opentsdb/query": {
- "post": {
- "description": "query opentsdb",
- "produces": [
- "application/json",
- "application/xml"
- ],
- "tags": [
- "v2"
- ],
- "summary": "监控数据查询",
- "operationId": "oentsdbquery",
- "parameters": [
- {
- "name": "Body",
- "in": "body",
- "schema": {
- "type": "object",
- "required": [
- "start",
- "queries"
- ],
- "properties": {
- "queries": {
- "description": "in: body",
- "type": "string",
- "x-go-name": "Queries"
- },
- "start": {
- "description": "in: body",
- "type": "string",
- "x-go-name": "Start"
- }
- }
- }
- }
- ],
- "responses": {
- "default": {
- "description": "统一返回格式",
- "schema": {
- "$ref": "#/responses/commandResponse"
- }
- }
- }
- }
- },
"/v2/resources/services": {
"post": {
"description": "get service resources",
@@ -1437,7 +1390,7 @@
}
},
"/v2/tenants/{tenant_name}/chargesverify": {
- "post": {
+ "get": {
"description": "service charges verify",
"consumes": [
"application/json",
@@ -1454,7 +1407,7 @@
"operationId": "chargesverify",
"responses": {
"default": {
- "description": "统一返回格式",
+ "description": "状态码非200,表示验证过程发生错误。状态码200,msg代表实际状态:success, illegal_quantity, missing_tenant, owned_fee, region_unauthorized, lack_of_memory",
"schema": {
"$ref": "#/responses/commandResponse"
}
@@ -3319,6 +3272,31 @@
}
},
"/v2/tenants/{tenant_name}/services/{service_alias}/env": {
+ "put": {
+ "description": "update env var",
+ "consumes": [
+ "application/json",
+ "application/x-protobuf"
+ ],
+ "produces": [
+ "application/json",
+ "application/xml"
+ ],
+ "tags": [
+ "v2",
+ "update"
+ ],
+ "summary": "修改环境变量",
+ "operationId": "Env",
+ "responses": {
+ "default": {
+ "description": "统一返回格式",
+ "schema": {
+ "$ref": "#/responses/commandResponse"
+ }
+ }
+ }
+ },
"post": {
"description": "add env var",
"consumes": [
diff --git a/pkg/appruntimesync/appruntimesync.go b/pkg/appruntimesync/appruntimesync.go
index b19c264f2..d8f14d0f7 100644
--- a/pkg/appruntimesync/appruntimesync.go
+++ b/pkg/appruntimesync/appruntimesync.go
@@ -23,12 +23,15 @@ import (
"fmt"
"net"
+ "github.com/goodrain/rainbond/pkg/util"
+
"github.com/Sirupsen/logrus"
client "github.com/coreos/etcd/clientv3"
"github.com/goodrain/rainbond/cmd/worker/option"
"github.com/goodrain/rainbond/pkg/appruntimesync/pb"
"github.com/goodrain/rainbond/pkg/appruntimesync/server"
discover "github.com/goodrain/rainbond/pkg/discover.v2"
+ "github.com/goodrain/rainbond/pkg/util/etcd/etcdlock"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
)
@@ -43,19 +46,69 @@ type AppRuntimeSync struct {
cancel context.CancelFunc
srss *server.AppRuntimeSyncServer
keepalive *discover.KeepAlive
+ master etcdlock.MasterInterface
+ hostIP string
}
//Start start if have master right
//start grpc server
-func (a *AppRuntimeSync) Start() error {
- a.srss.Start()
- go a.startAppRuntimeSync()
- return a.registServer()
+func (a *AppRuntimeSync) Start(errchan chan error) {
+ if a.hostIP == "" {
+ ip, err := util.LocalIP()
+ if err != nil {
+ logrus.Errorf("get ip failed,details %s", err.Error())
+ errchan <- err
+ }
+ a.hostIP = ip.String()
+ }
+ go a.start(errchan)
+}
+func (a *AppRuntimeSync) start(errchan chan error) {
+ for {
+ master, err := etcdlock.CreateMasterLock(a.conf.EtcdEndPoints, "/rainbond/workermaster", fmt.Sprintf("%s:%d", a.conf.HostIP, 6535), 10)
+ if err != nil {
+ errchan <- err
+ return
+ }
+ a.master = master
+ master.Start()
+ loop:
+ for {
+ select {
+ case event := <-master.EventsChan():
+ if event.Type == etcdlock.MasterAdded {
+ a.srss.Start()
+ go a.startAppRuntimeSync()
+ a.registServer()
+ }
+ if event.Type == etcdlock.MasterDeleted {
+ errchan <- fmt.Errorf("worker node %s exit ", fmt.Sprintf("%s:%d", a.conf.HostIP, 6535))
+ return
+ }
+ if event.Type == etcdlock.MasterError {
+ a.master.Stop()
+ if a.keepalive != nil {
+ a.keepalive.Stop()
+ a.keepalive = nil
+ }
+ break loop
+ }
+ }
+ }
+ select {
+ case <-a.ctx.Done():
+ return
+ default:
+ }
+ }
}
//Stop stop app runtime sync server
func (a *AppRuntimeSync) Stop() error {
a.srss.Stop()
+ if a.master != nil {
+ a.master.Stop()
+ }
if a.keepalive != nil {
a.keepalive.Stop()
}
@@ -81,6 +134,7 @@ func CreateAppRuntimeSync(conf option.Config) *AppRuntimeSync {
conf: conf,
server: grpc.NewServer(),
srss: server.NewAppRuntimeSyncServer(conf),
+ hostIP: conf.HostIP,
}
pb.RegisterAppRuntimeSyncServer(ars.server, ars.srss)
// Register reflection service on gRPC server.
@@ -97,8 +151,3 @@ func (a *AppRuntimeSync) startAppRuntimeSync() error {
}
return a.server.Serve(lis)
}
-
-//SyncStatus sync status
-func (a *AppRuntimeSync) SyncStatus() {
- a.srss.StatusManager.SyncStatus()
-}
diff --git a/pkg/appruntimesync/status/status.go b/pkg/appruntimesync/status/status.go
index 7a01031cf..ce28c42df 100644
--- a/pkg/appruntimesync/status/status.go
+++ b/pkg/appruntimesync/status/status.go
@@ -86,6 +86,7 @@ func (s *Manager) Start() error {
s.cacheAllAPPStatus()
go s.checkStatus()
go s.handleUpdate()
+ go s.SyncStatus()
logrus.Info("status manager started")
return nil
}
diff --git a/pkg/builder/discover/discover.go b/pkg/builder/discover/discover.go
index 03c78a7c8..f1313fd08 100644
--- a/pkg/builder/discover/discover.go
+++ b/pkg/builder/discover/discover.go
@@ -20,7 +20,6 @@ package discover
import (
"context"
- "fmt"
"os"
"time"
@@ -39,23 +38,21 @@ const WTOPIC string = "builder"
//TaskManager task
type TaskManager struct {
- ctx context.Context
- cancel context.CancelFunc
- config option.Config
- stopChan chan struct{}
- client pb.TaskQueueClient
- exec exector.Manager
+ ctx context.Context
+ cancel context.CancelFunc
+ config option.Config
+ client pb.TaskQueueClient
+ exec exector.Manager
}
//NewTaskManager return *TaskManager
func NewTaskManager(c option.Config, exec exector.Manager) *TaskManager {
ctx, cancel := context.WithCancel(context.Background())
return &TaskManager{
- ctx: ctx,
- cancel: cancel,
- config: c,
- stopChan: make(chan struct{}),
- exec: exec,
+ ctx: ctx,
+ cancel: cancel,
+ config: c,
+ exec: exec,
}
}
@@ -115,13 +112,4 @@ func (t *TaskManager) Do() {
func (t *TaskManager) Stop() error {
logrus.Info("discover manager is stoping.")
t.cancel()
- tick := time.NewTicker(time.Second * 3)
- select {
- case <-t.stopChan:
- return nil
- case <-tick.C:
- logrus.Error("task queue listen closed time out")
- return fmt.Errorf("task queue listen closed time out")
- }
-
}
diff --git a/pkg/util/etcd/etcdlock/master.go b/pkg/util/etcd/etcdlock/master.go
new file mode 100644
index 000000000..3f0bc7261
--- /dev/null
+++ b/pkg/util/etcd/etcdlock/master.go
@@ -0,0 +1,163 @@
+// RAINBOND, Application Management Platform
+// Copyright (C) 2014-2017 Goodrain Co., Ltd.
+
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version. For any non-GPL usage of Rainbond,
+// one or multiple Commercial Licenses authorized by Goodrain Co., Ltd.
+// must be obtained first.
+
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see .
+
+//Package etcdlock Master election using etcd.
+package etcdlock
+
+import (
+ "context"
+ "errors"
+ "fmt"
+
+ "github.com/Sirupsen/logrus"
+
+ "github.com/coreos/etcd/clientv3"
+ "github.com/coreos/etcd/clientv3/concurrency"
+)
+
+//MasterEventType Various event types for the events channel.
+type MasterEventType int
+
+const (
+ //MasterAdded this node has the lock.
+ MasterAdded MasterEventType = iota
+ //MasterDeleted MasterDeleted
+ MasterDeleted
+ //MasterModified MasterModified
+ MasterModified
+ //MasterError MasterError
+ MasterError
+)
+
+// MasterEvent represents a single event sent on the events channel.
+type MasterEvent struct {
+ Type MasterEventType // event type
+ Master string // identity of the lock holder
+}
+
+//MasterInterface Interface used by the etcd master lock clients.
+type MasterInterface interface {
+ // Start the election and attempt to acquire the lock. If acquired, the
+ // lock is refreshed periodically based on the ttl.
+ Start()
+
+ // Stops watching the lock. Closes the events channel.
+ Stop()
+
+ // Returns the event channel used by the etcd lock.
+ EventsChan() <-chan MasterEvent
+
+ // Method to get the current lockholder. Returns "" if free.
+ GetHolder() string
+}
+
+type masterLock struct {
+ ctx context.Context
+ cancel context.CancelFunc
+ client *clientv3.Client
+ electionname string
+ prop string
+ etcdEndpoints []string
+ election *concurrency.Election
+ session *concurrency.Session
+ eventchan chan MasterEvent
+ ttl int64
+ leaseID clientv3.LeaseID
+}
+
+//CreateMasterLock create master lock
+func CreateMasterLock(etcdEndpoints []string, election string, prop string, ttl int64) (MasterInterface, error) {
+ if etcdEndpoints == nil || len(etcdEndpoints) == 0 {
+ etcdEndpoints = []string{"http://127.0.0.1:2379"}
+ }
+ ctx, cancel := context.WithCancel(context.Background())
+ client, err := clientv3.New(clientv3.Config{
+ Endpoints: etcdEndpoints,
+ Context: ctx,
+ })
+ if err != nil {
+ cancel()
+ return nil, fmt.Errorf("create etcd client error,%s", err.Error())
+ }
+ lease, err := client.Lease.Grant(ctx, ttl)
+ if err != nil {
+ cancel()
+ return nil, fmt.Errorf("create etcd lease error,%s", err.Error())
+ }
+ s, err := concurrency.NewSession(client, concurrency.WithContext(ctx), concurrency.WithLease(lease.ID))
+ if err != nil {
+ cancel()
+ return nil, fmt.Errorf("new election session error,%s", err.Error())
+ }
+ e := concurrency.NewElection(s, election)
+ ml := &masterLock{
+ ctx: ctx,
+ cancel: cancel,
+ client: client,
+ electionname: election,
+ prop: prop,
+ etcdEndpoints: etcdEndpoints,
+ election: e,
+ session: s,
+ eventchan: make(chan MasterEvent, 2),
+ leaseID: lease.ID,
+ }
+ return ml, nil
+}
+
+// Campaign puts a value as eligible for the election. It blocks until
+// it is elected, an error occurs, or the context is cancelled.
+func (m *masterLock) campaign() error {
+ logrus.Infof("start campaign master")
+ if err := m.election.Campaign(m.ctx, m.prop); err != nil {
+ return err
+ }
+ //elected
+ logrus.Infof("current node is be elected master")
+ select {
+ case res := <-m.election.Observe(m.ctx):
+ m.eventchan <- MasterEvent{Type: MasterAdded, Master: string(res.Kvs[0].Value)}
+ case <-m.ctx.Done():
+ return m.resign()
+ case <-m.session.Done():
+ m.eventchan <- MasterEvent{Type: MasterError, Master: ""}
+ return errors.New("elect: session expired")
+ }
+ return nil
+}
+func (m *masterLock) resign() error {
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+ return m.election.Resign(ctx)
+}
+func (m *masterLock) Start() {
+ go m.campaign()
+}
+
+func (m *masterLock) Stop() {
+ m.cancel()
+ m.resign()
+}
+
+func (m *masterLock) EventsChan() <-chan MasterEvent {
+ return m.eventchan
+}
+
+func (m *masterLock) GetHolder() string {
+ return ""
+}
diff --git a/pkg/util/etcd/etcdlock/master_test.go b/pkg/util/etcd/etcdlock/master_test.go
new file mode 100644
index 000000000..1c4cdf560
--- /dev/null
+++ b/pkg/util/etcd/etcdlock/master_test.go
@@ -0,0 +1,59 @@
+// RAINBOND, Application Management Platform
+// Copyright (C) 2014-2017 Goodrain Co., Ltd.
+
+// This program is free software: you can redistribute it and/or modify
+// it under the terms of the GNU General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version. For any non-GPL usage of Rainbond,
+// one or multiple Commercial Licenses authorized by Goodrain Co., Ltd.
+// must be obtained first.
+
+// This program is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU General Public License for more details.
+
+// You should have received a copy of the GNU General Public License
+// along with this program. If not, see .
+
+package etcdlock
+
+import (
+ "testing"
+ "time"
+
+ "github.com/Sirupsen/logrus"
+)
+
+func TestMasterLock(t *testing.T) {
+ master1, err := CreateMasterLock(nil, "/rainbond/appruntimesyncmaster", "127.0.0.1:1", 10)
+ if err != nil {
+ t.Fatal(err)
+ }
+ master1.Start()
+ defer master1.Stop()
+ master2, err := CreateMasterLock(nil, "/rainbond/appruntimesyncmaster", "127.0.0.1:2", 10)
+ if err != nil {
+ t.Fatal(err)
+ }
+ master2.Start()
+ defer master2.Stop()
+ logrus.Info("start receive event")
+ for {
+ select {
+ case event := <-master1.EventsChan():
+ logrus.Info(event, time.Now())
+ master1.Stop()
+ if event.Type == MasterDeleted {
+ logrus.Info("delete")
+ return
+ }
+ case event := <-master2.EventsChan():
+ logrus.Info(event, time.Now())
+ if event.Type == MasterDeleted {
+ logrus.Info("delete")
+ return
+ }
+ }
+ }
+}
diff --git a/pkg/worker/discover/manager.go b/pkg/worker/discover/manager.go
index 0a4564afa..2e309cced 100644
--- a/pkg/worker/discover/manager.go
+++ b/pkg/worker/discover/manager.go
@@ -20,7 +20,6 @@ package discover
import (
"context"
- "fmt"
"os"
"time"
@@ -45,7 +44,6 @@ type TaskManager struct {
ctx context.Context
cancel context.CancelFunc
config option.Config
- stopChan chan struct{}
handleManager *handle.Manager
client pb.TaskQueueClient
}
@@ -58,7 +56,6 @@ func NewTaskManager(c option.Config, executor executor.Manager, statusManager *s
ctx: ctx,
cancel: cancel,
config: c,
- stopChan: make(chan struct{}),
handleManager: handleManager,
}
}
@@ -133,13 +130,5 @@ func (t *TaskManager) Do() {
func (t *TaskManager) Stop() error {
logrus.Info("discover manager is stoping.")
t.cancel()
- tick := time.NewTicker(time.Second * 3)
- select {
- case <-t.stopChan:
- return nil
- case <-tick.C:
- logrus.Error("task queue listen closed time out")
- return fmt.Errorf("task queue listen closed time out")
- }
-
+ return nil
}
diff --git a/vendor/github.com/coreos/etcd/clientv3/concurrency/election_test.go b/vendor/github.com/coreos/etcd/clientv3/concurrency/election_test.go
new file mode 100644
index 000000000..239ff1e11
--- /dev/null
+++ b/vendor/github.com/coreos/etcd/clientv3/concurrency/election_test.go
@@ -0,0 +1,89 @@
+// Copyright 2017 The etcd Authors
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package concurrency_test
+
+import (
+ "context"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/coreos/etcd/clientv3"
+ "github.com/coreos/etcd/clientv3/concurrency"
+)
+
+func TestCampaign(t *testing.T) {
+ cli, err := clientv3.New(clientv3.Config{Endpoints: []string{"http://127.0.0.1:2379"}})
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer cli.Close()
+
+ // create two separate sessions for election competition
+ s1, err := concurrency.NewSession(cli)
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer s1.Close()
+ e1 := concurrency.NewElection(s1, "/my-election/")
+
+ s2, err := concurrency.NewSession(cli)
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer s2.Close()
+ e2 := concurrency.NewElection(s2, "/my-election/")
+
+ // create competing candidates, with e1 initially losing to e2
+ var wg sync.WaitGroup
+ wg.Add(2)
+ electc := make(chan *concurrency.Election, 2)
+ go func() {
+ defer wg.Done()
+ // delay candidacy so e2 wins first
+ time.Sleep(3 * time.Second)
+ if err := e1.Campaign(context.Background(), "e1"); err != nil {
+ t.Fatal(err)
+ }
+ electc <- e1
+ }()
+ go func() {
+ defer wg.Done()
+ if err := e2.Campaign(context.Background(), "e2"); err != nil {
+ t.Fatal(err)
+ }
+ electc <- e2
+ }()
+
+ cctx, cancel := context.WithCancel(context.TODO())
+ defer cancel()
+
+ e := <-electc
+ t.Log("completed first election with", string((<-e.Observe(cctx)).Kvs[0].Value))
+
+ // resign so next candidate can be elected
+ if err := e.Resign(context.TODO()); err != nil {
+ t.Fatal(err)
+ }
+
+ e = <-electc
+ t.Log("completed second election with", string((<-e.Observe(cctx)).Kvs[0].Value))
+
+ wg.Wait()
+
+ // Output:
+ // completed first election with e2
+ // completed second election with e1
+}