Rainbond/worker/appm/manager_rc.go

478 lines
20 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

// Copyright (C) 2014-2018 Goodrain Co., Ltd.
// RAINBOND, Application Management Platform
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version. For any non-GPL usage of Rainbond,
// one or multiple Commercial Licenses authorized by Goodrain Co., Ltd.
// must be obtained first.
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.
package appm
import (
"github.com/goodrain/rainbond/db/model"
"github.com/goodrain/rainbond/event"
"fmt"
"strings"
"time"
"github.com/Sirupsen/logrus"
"github.com/jinzhu/gorm"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/pkg/api/v1"
)
//StartReplicationController 部署StartReplicationController
//返回部署结果
func (m *manager) StartReplicationController(serviceID string, logger event.Logger) (*v1.ReplicationController, error) {
logger.Info("创建ReplicationController资源开始", map[string]string{"step": "worker-appm", "status": "starting"})
builder, err := ReplicationControllerBuilder(serviceID, logger, m.conf.NodeAPI)
if err != nil {
logrus.Error("create ReplicationController builder error.", err.Error())
logger.Error("创建ReplicationController Builder失败", map[string]string{"step": "worker-appm", "status": "error"})
return nil, err
}
//判断应用镜像名称是否合法,非法镜像名进制启动
deployVersion,err:=m.dbmanager.VersionInfoDao().GetVersionByDeployVersion(builder.service.DeployVersion,serviceID)
imageName:=builder.service.ImageName
if err != nil {
logrus.Warnf("error get version info by deployversion %s,details %s",builder.service.DeployVersion,err.Error())
}else{
if CheckVersionInfo(deployVersion) {
imageName=deployVersion.ImageName
}
}
if !strings.HasPrefix(imageName, "goodrain.me/") {
logger.Error("启动应用失败,镜像名(%s)非法,请重新构建应用", map[string]string{"step": "callback", "status": "error"})
return nil, fmt.Errorf("service image name invoid, it only can with prefix goodrain.me/")
}
rc, err := builder.Build()
if err != nil {
logrus.Error("build ReplicationController error.", err.Error())
logger.Error("创建ReplicationController失败", map[string]string{"step": "worker-appm", "status": "error"})
return nil, err
}
result, err := m.kubeclient.Core().ReplicationControllers(builder.GetTenant()).Create(rc)
if err != nil {
logrus.Error("deploy ReplicationController to apiserver error.", err.Error())
logger.Error("部署ReplicationController到集群失败", map[string]string{"step": "worker-appm", "status": "error"})
return nil, err
}
err = m.dbmanager.K8sDeployReplicationDao().AddModel(&model.K8sDeployReplication{
TenantID: builder.GetTenant(),
ServiceID: serviceID,
ReplicationID: rc.Name,
ReplicationType: model.TypeReplicationController,
DeployVersion: builder.service.DeployVersion,
})
if err != nil {
logrus.Error("save ReplicationController info to db error.", err.Error())
logger.Error("存储ReplicationController信息到数据库错误", map[string]string{"step": "worker-appm", "status": "error"})
}
err = m.waitRCReplicasReady(*rc.Spec.Replicas, serviceID, logger, result)
if err != nil {
if err == ErrTimeOut {
return result, err
}
logrus.Error("deploy ReplicationController to apiserver then watch error.", err.Error())
logger.Error("ReplicationController实例启动情况检测失败", map[string]string{"step": "worker-appm", "status": "error"})
return result, err
}
return result, nil
}
//CheckVersionInfo CheckVersionInfo
func CheckVersionInfo(version *model.VersionInfo) bool {
if !strings.Contains(strings.ToLower(version.FinalStatus),"success") {
return false
}
if len(version.ImageName)==0||!strings.Contains(version.ImageName,"goodrain.me/") {
return false
}
return true
}
//StopReplicationController 停止
func (m *manager) StopReplicationController(serviceID string, logger event.Logger) error {
logger.Info("停止删除ReplicationController资源开始", map[string]string{"step": "worker-appm", "status": "starting"})
service, err := m.dbmanager.TenantServiceDao().GetServiceByID(serviceID)
if err != nil {
logrus.Errorf("delete ReplicationController of service(%s) error. find service from db error %v", serviceID, err.Error())
logger.Error("查询应用信息失败", map[string]string{"step": "worker-appm", "status": "error"})
return err
}
deploys, err := m.dbmanager.K8sDeployReplicationDao().GetK8sDeployReplicationByService(serviceID)
if err != nil {
if err == gorm.ErrRecordNotFound {
logger.Error("应用未部署", map[string]string{"step": "worker-appm", "status": "error"})
return ErrNotDeploy
}
logrus.Error("find service deploy info from db error", err.Error())
logger.Error("查询应用部署信息失败", map[string]string{"step": "worker-appm", "status": "error"})
return err
}
var deploy *model.K8sDeployReplication
if deploys != nil || len(deploys) > 0 {
for _, d := range deploys {
if !d.IsDelete {
deploy = d
}
}
}
if deploy == nil {
logger.Error("应用未部署", map[string]string{"step": "worker-appm", "status": "error"})
return ErrNotDeploy
}
for _, deploy := range deploys {
//更新rc pod数量为0
rc, err := m.kubeclient.Core().ReplicationControllers(service.TenantID).Patch(deploy.ReplicationID, types.StrategicMergePatchType, Replicas0)
if err != nil {
if err = checkNotFoundError(err); err != nil {
logrus.Error("patch ReplicationController info error.", err.Error())
logger.Error("更改ReplicationController Pod数量为0失败", map[string]string{"step": "worker-appm", "status": "error"})
return err
}
logger.Info("集群中ReplicationController已不存在", map[string]string{"step": "worker-appm", "status": "error"})
err = m.dbmanager.K8sDeployReplicationDao().DeleteK8sDeployReplicationByService(serviceID)
if err != nil {
if err != gorm.ErrRecordNotFound {
logrus.Error("delete deploy info from db error.", err.Error())
}
}
return nil
}
//判断pod数量为0
err = m.waitRCReplicas(0, logger, rc)
if err != nil {
if err != ErrTimeOut {
logger.Error("更改RC Pod数量为0结果检测错误", map[string]string{"step": "worker-appm", "status": "error"})
logrus.Error("patch ReplicationController replicas to 0 and watch error.", err.Error())
return err
}
logger.Error("更改RC Pod数量为0结果检测超时,继续删除RC", map[string]string{"step": "worker-appm", "status": "error"})
}
//删除rc
err = m.kubeclient.Core().ReplicationControllers(service.TenantID).Delete(deploy.ReplicationID, &metav1.DeleteOptions{})
if err != nil {
if err = checkNotFoundError(err); err != nil {
logrus.Error("delete ReplicationController error.", err.Error())
logger.Error("从集群中删除ReplicationController失败", map[string]string{"step": "worker-appm", "status": "error"})
return err
}
}
err = m.dbmanager.K8sDeployReplicationDao().DeleteK8sDeployReplicationByService(serviceID)
if err != nil {
if err != gorm.ErrRecordNotFound {
logrus.Error("delete deploy info from db error.", err.Error())
}
}
}
//删除未移除成功的pod
logger.Info("开始移除残留的Pod实例", map[string]string{"step": "worker-appm", "status": "starting"})
pods, err := m.dbmanager.K8sPodDao().GetPodByService(serviceID)
if err != nil {
logrus.Error("get more than need by deleted pod from db error.", err.Error())
logger.Error("查询更过需要被移除的Pod失败", map[string]string{"step": "worker-appm", "status": "error"})
}
if pods != nil && len(pods) > 0 {
for i := range pods {
pod := pods[i]
err = m.kubeclient.CoreV1().Pods(service.TenantID).Delete(pod.PodName, &metav1.DeleteOptions{})
if err != nil {
if err = checkNotFoundError(err); err != nil {
logrus.Errorf("delete pod (%s) from k8s api error %s", pod.PodName, err.Error())
}
} else {
logger.Info(fmt.Sprintf("实例(%s)已停止并移除", pod.PodName), map[string]string{"step": "worker-appm"})
}
}
err = m.dbmanager.K8sPodDao().DeleteK8sPod(serviceID)
if err != nil {
if err != gorm.ErrRecordNotFound {
logrus.Error("delete pods by service id error.", err.Error())
}
}
}
logger.Info("移除残留的Pod实例完成", map[string]string{"step": "worker-appm", "status": "starting"})
return nil
}
func (m *manager) waitReplicationController(mode, serviceID string, n int32, logger event.Logger, rc *v1.ReplicationController) error {
if mode == "up" {
logger.Info("扩容结果监听开始", map[string]string{"step": "worker-appm", "status": "starting"})
return m.waitRCReplicasReady(n, serviceID, logger, rc)
}
if mode == "down" {
logger.Info("缩容结果监听开始", map[string]string{"step": "worker-appm", "status": "starting"})
return m.waitRCReplicas(n, logger, rc)
}
return nil
}
//RollingUpgradeReplicationController 滚动升级RC
func (m *manager) RollingUpgradeReplicationController(serviceID string, stopChan chan struct{}, logger event.Logger) (*v1.ReplicationController, error) {
deploys, err := m.dbmanager.K8sDeployReplicationDao().GetK8sDeployReplicationByService(serviceID)
if err != nil {
if err == gorm.ErrRecordNotFound {
logger.Info("应用未部署,开始启动应用", map[string]string{"step": "worker-appm", "status": "success"})
return m.StartReplicationController(serviceID, logger)
}
logrus.Error("get old deploy info error.", err.Error())
logger.Error("获取当前应用部署信息失败", map[string]string{"step": "worker-appm", "status": "error"})
return nil, err
}
var deploy *model.K8sDeployReplication
if deploys != nil || len(deploys) > 0 {
for _, d := range deploys {
if !d.IsDelete {
deploy = d
}
}
}
if deploy == nil {
logger.Info("应用未部署,开始启动应用", map[string]string{"step": "worker-appm", "status": "success"})
return m.StartReplicationController(serviceID, logger)
}
logger.Info("创建ReplicationController资源开始", map[string]string{"step": "worker-appm", "status": "starting"})
builder, err := ReplicationControllerBuilder(serviceID, logger, m.conf.NodeAPI)
if err != nil {
logrus.Error("create ReplicationController builder error.", err.Error())
logger.Error("创建ReplicationController Builder失败", map[string]string{"step": "worker-appm", "status": "error"})
return nil, err
}
rc, err := builder.Build()
if err != nil {
logrus.Error("build ReplicationController error.", err.Error())
logger.Error("创建ReplicationController失败", map[string]string{"step": "worker-appm", "status": "error"})
return nil, err
}
var replicas = rc.Spec.Replicas
rc.Spec.Replicas = int32Ptr(0)
result, err := m.kubeclient.Core().ReplicationControllers(builder.GetTenant()).Create(rc)
if err != nil {
logrus.Error("deploy ReplicationController to apiserver error.", err.Error())
logger.Error("部署ReplicationController到集群失败", map[string]string{"step": "worker-appm", "status": "error"})
return nil, err
}
var oldRCName = deploy.ReplicationID
newDeploy := &model.K8sDeployReplication{
ReplicationID: rc.Name,
DeployVersion: rc.Labels["version"],
ReplicationType: model.TypeReplicationController,
TenantID: deploy.TenantID,
ServiceID: serviceID,
}
err = m.dbmanager.K8sDeployReplicationDao().AddModel(newDeploy)
if err != nil {
m.kubeclient.Core().ReplicationControllers(builder.GetTenant()).Delete(rc.Name, &metav1.DeleteOptions{})
logrus.Error("save new deploy info to db error.", err.Error())
logger.Error("添加部署信息失败", map[string]string{"step": "worker-appm", "status": "error"})
return nil, err
}
//保证删除旧RC
defer func() {
//step3 delete old rc
//注入status模块忽略本RC的感知
m.statusManager.IgnoreDelete(oldRCName)
err = m.kubeclient.Core().ReplicationControllers(builder.GetTenant()).Delete(oldRCName, &metav1.DeleteOptions{})
if err != nil {
if err = checkNotFoundError(err); err != nil {
logrus.Error("delete ReplicationController error.", err.Error())
logger.Error("从集群中删除ReplicationController失败", map[string]string{"step": "worker-appm", "status": "error"})
}
}
m.dbmanager.K8sDeployReplicationDao().DeleteK8sDeployReplicationByServiceAndVersion(deploy.ServiceID, deploy.DeployVersion)
}()
//step2 sync pod number
logger.Info("开始滚动替换实例", map[string]string{"step": "worker-appm", "status": "starting"})
var i int32
for i = 1; i <= *replicas; i++ {
if err := m.rollingUpgrade(builder.GetTenant(), serviceID, oldRCName, result.Name, (*replicas)-i, i, logger); err != nil {
return nil, err
}
select {
case <-stopChan:
logger.Info("应用滚动升级停止。", map[string]string{"step": "worker-appm"})
return nil, nil
default:
//延时1s
//TODO: 外层传入时间间隔
if i < *replicas {
time.Sleep(time.Second * 1)
}
}
}
//删除未移除成功的pod
logger.Info("开始移除残留的Pod实例", map[string]string{"step": "worker-appm", "status": "starting"})
pods, err := m.dbmanager.K8sPodDao().GetPodByReplicationID(oldRCName)
if err != nil {
logrus.Error("get more than need by deleted pod from db error.", err.Error())
logger.Error("查询更过需要被移除的Pod失败", map[string]string{"step": "worker-appm", "status": "error"})
}
if pods != nil && len(pods) > 0 {
for i := range pods {
pod := pods[i]
err = m.kubeclient.CoreV1().Pods(builder.GetTenant()).Delete(pod.PodName, &metav1.DeleteOptions{})
if err != nil {
if err = checkNotFoundError(err); err != nil {
logrus.Errorf("delete pod (%s) from k8s api error %s", pod.PodName, err.Error())
}
} else {
logger.Info(fmt.Sprintf("实例(%s)已停止并移除", pod.PodName), map[string]string{"step": "worker-appm"})
}
}
}
logger.Info("移除残留的Pod实例完成", map[string]string{"step": "worker-appm", "status": "success"})
return rc, nil
}
func (m *manager) rollingUpgrade(namespace, serviceID, oldRC, newRC string, oldReplicase, newReplicas int32, logger event.Logger) error {
logger.Info(fmt.Sprintf("新版实例数:%d,旧版实例数:%d,替换开始", newReplicas, oldReplicase), map[string]string{"step": "worker-appm", "status": "starting"})
rc, err := m.kubeclient.Core().ReplicationControllers(namespace).Patch(newRC, types.StrategicMergePatchType, Replicas(int(newReplicas)))
if err != nil {
logrus.Error("patch ReplicationController info error.", err.Error())
logger.Error(fmt.Sprintf("更改ReplicationController Pod数量为%d失败", newReplicas), map[string]string{"step": "worker-appm", "status": "error"})
return err
}
err = m.waitReplicationController("up", serviceID, newReplicas, logger, rc)
if err != nil && err.Error() != ErrTimeOut.Error() {
logrus.Errorf("patch ReplicationController replicas to %d and watch error.%v", newReplicas, err.Error())
logger.Error(fmt.Sprintf("更改ReplicationController Pod数量为%d结果检测失败", newReplicas), map[string]string{"step": "worker-appm", "status": "error"})
return err
}
rc, err = m.kubeclient.Core().ReplicationControllers(namespace).Patch(oldRC, types.StrategicMergePatchType, Replicas(int(oldReplicase)))
if err != nil {
logrus.Error("patch ReplicationController info error.", err.Error())
logger.Error(fmt.Sprintf("更改ReplicationController Pod数量为%d失败", oldReplicase), map[string]string{"step": "worker-appm", "status": "error"})
return err
}
err = m.waitReplicationController("down", serviceID, oldReplicase, logger, rc)
if err != nil && err.Error() != ErrTimeOut.Error() {
logrus.Errorf("patch ReplicationController replicas to %d and watch error.%v", oldReplicase, err.Error())
logger.Error(fmt.Sprintf("更改ReplicationController Pod数量为%d结果检测失败", oldReplicase), map[string]string{"step": "worker-appm", "status": "error"})
return err
}
logger.Info(fmt.Sprintf("新版实例数:%d,旧版实例数:%d,替换完成", newReplicas, oldReplicase), map[string]string{"step": "worker-appm", "status": "starting"})
return nil
}
//移除实例检测
func (m *manager) waitRCReplicas(n int32, logger event.Logger, rc *v1.ReplicationController) error {
if rc.Status.Replicas <= n {
return nil
}
second := int32(40)
var deleteCount int32
if rc.Status.Replicas-n > 0 {
deleteCount = rc.Status.Replicas - n
second = second * deleteCount
}
logger.Info(fmt.Sprintf("实例开始关闭,需要关闭实例数 %d, 超时时间:%d秒 ", rc.Status.Replicas-n, second), map[string]string{"step": "worker-appm"})
timeout := time.Tick(time.Duration(second) * time.Second)
watch, err := m.kubeclient.Core().ReplicationControllers(rc.Namespace).Watch(metav1.ListOptions{
LabelSelector: fmt.Sprintf("name=%s,version=%s", rc.Labels["name"], rc.Labels["version"]),
ResourceVersion: rc.ResourceVersion,
})
if err != nil {
return err
}
defer watch.Stop()
podWatch, err := m.kubeclient.CoreV1().Pods(rc.Namespace).Watch(metav1.ListOptions{
LabelSelector: fmt.Sprintf("name=%s,version=%s", rc.Labels["name"], rc.Labels["version"]),
})
if err != nil {
return err
}
defer podWatch.Stop()
for {
select {
case <-timeout:
logger.Error("实例关闭超时,请重试!", map[string]string{"step": "worker-appm", "status": "error"})
return ErrTimeOut
case event := <-watch.ResultChan():
state := event.Object.(*v1.ReplicationController)
logger.Info(fmt.Sprintf("实例正在关闭,当前应用实例数 %d", state.Status.Replicas), map[string]string{"step": "worker-appm"})
case event := <-podWatch.ResultChan():
if event.Type == "DELETED" {
deleteCount--
pod := event.Object.(*v1.Pod)
m.statusCache.RemovePod(pod.Name)
logger.Info(fmt.Sprintf("实例(%s)已停止并移除", pod.Name), map[string]string{"step": "worker-appm"})
if deleteCount <= 0 {
return nil
}
}
}
}
}
//增加实例检测
func (m *manager) waitRCReplicasReady(n int32, serviceID string, logger event.Logger, rc *v1.ReplicationController) error {
if rc.Status.Replicas >= n {
logger.Info(fmt.Sprintf("启动实例数 %d,已完成", rc.Status.Replicas), map[string]string{"step": "worker-appm"})
return nil
}
second := int32(60)
if rc.Spec.Template != nil && len(rc.Spec.Template.Spec.Containers) > 0 {
for _, c := range rc.Spec.Template.Spec.Containers {
if c.ReadinessProbe != nil {
second += c.ReadinessProbe.InitialDelaySeconds + c.ReadinessProbe.SuccessThreshold*c.ReadinessProbe.PeriodSeconds
}
}
}
if n > 0 {
second = second * n
}
logger.Info(fmt.Sprintf("实例开始启动,需要启动实例数 %d, 超时时间:%d秒 ", n, second), map[string]string{"step": "worker-appm"})
timeout := time.Tick(time.Duration(second) * time.Second)
watch, err := m.kubeclient.Core().ReplicationControllers(rc.Namespace).Watch(metav1.ListOptions{
LabelSelector: fmt.Sprintf("name=%s,version=%s", rc.Labels["name"], rc.Labels["version"]),
ResourceVersion: rc.ResourceVersion,
})
if err != nil {
return err
}
defer watch.Stop()
podWatch, err := m.kubeclient.CoreV1().Pods(rc.Namespace).Watch(metav1.ListOptions{
LabelSelector: fmt.Sprintf("name=%s,version=%s", rc.Labels["name"], rc.Labels["version"]),
})
if err != nil {
return err
}
defer podWatch.Stop()
var readyPodCount int32
for {
select {
case <-timeout:
logger.Error("实例启动超时,置于后台启动,请留意应用状态", map[string]string{"step": "worker-appm", "status": "error"})
return ErrTimeOut
case event := <-watch.ResultChan():
state := event.Object.(*v1.ReplicationController)
logger.Info(fmt.Sprintf("实例正在启动,当前启动实例数 %d,就绪实例数 %d, 未启动实例数 %d ", state.Status.Replicas, state.Status.ReadyReplicas, n-state.Status.Replicas), map[string]string{"step": "worker-appm"})
case event := <-podWatch.ResultChan():
if event.Type == "ADDED" || event.Type == "MODIFIED" {
pod := event.Object.(*v1.Pod)
status := m.statusCache.AddPod(pod.Name, logger)
if ok, err := status.AddStatus(pod.Status); ok {
readyPodCount++
if readyPodCount >= n {
return nil
}
} else if err != nil {
return err
}
}
}
}
}