DolphinScheduler/monitor_server.py

88 lines
3.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Author:qiaozhanwei
'''
yum 安装pip
yum -y install python-pip
pip install kazoo 安装
conda install -c conda-forge kazoo 安装
运行脚本:
nohup python -u monitor_server.py > nohup.out 2>&1 &
'''
import socket
import os
import sched
import time
from datetime import datetime
from kazoo.client import KazooClient
schedule = sched.scheduler(time.time, time.sleep)
class ZkClient:
def __init__(self):
# hosts配置zk地址集群
self.zk = KazooClient(hosts='ark0:2181,ark1:2181,ark2:2181')
self.zk.start()
# 读取配置文件,组装成字典
def read_file(self,path):
with open(path, 'r') as f:
dict = {}
for line in f.readlines():
arr = line.strip().split('=')
if (len(arr) == 2):
dict[arr[0]] = arr[1]
return dict
# 根据hostname获取ip地址
def get_ip_by_hostname(self,hostname):
return socket.gethostbyname(hostname)
# 重启服务
def restart_server(self,inc):
config_dict = self.read_file('/data1_1T/escheduler/conf/config/run_config.conf')
master_list = config_dict.get('masters').split(',')
master_list = list(map(lambda item : self.get_ip_by_hostname(item),master_list))
worker_list = config_dict.get('workers').split(',')
worker_list = list(map(lambda item: self.get_ip_by_hostname(item), worker_list))
if (self.zk.exists('/escheduler/masters')):
zk_master_list = []
zk_master_nodes = self.zk.get_children('/escheduler/masters')
for zk_master_node in zk_master_nodes:
zk_master_list.append(zk_master_node.split('_')[0])
restart_master_list = list(set(master_list) - set(zk_master_list))
if (len(restart_master_list) != 0):
for master in restart_master_list:
print("master " + self.get_ip_by_hostname(master) + " 服务已经掉了")
os.system('ssh ' + self.get_ip_by_hostname(master) + ' sh /data1_1T/escheduler/bin/escheduler-daemon.sh start master-server')
if (self.zk.exists('/escheduler/workers')):
zk_worker_list = []
zk_worker_nodes = self.zk.get_children('/escheduler/workers')
for zk_worker_node in zk_worker_nodes:
zk_worker_list.append(zk_worker_node.split('_')[0])
restart_worker_list = list(set(worker_list) - set(zk_worker_list))
if (len(restart_worker_list) != 0):
for worker in restart_worker_list:
print("worker " + self.get_ip_by_hostname(worker) + " 服务已经掉了")
os.system('ssh ' + self.get_ip_by_hostname(worker) + ' sh /data1_1T/escheduler/bin/escheduler-daemon.sh start worker-server')
print(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
schedule.enter(inc, 0, self.restart_server, (inc,))
# 默认参数60s
def main(self,inc=60):
# enter四个参数分别为间隔事件、优先级用于同时间到达的两个事件同时执行时定序、被调用触发的函数
# 给该触发函数的参数tuple形式
schedule.enter(0, 0, self.restart_server, (inc,))
schedule.run()
if __name__ == '__main__':
zkClient = ZkClient()
zkClient.main(300)