File: //usr/local/qcloud/monitor/barad/comm/phealth.py
#!/usr/bin/env python
# -*- coding: UTF-8 -*-
"""
_ _ _ _ _ _
| | (_) | | | | | | |
_ __ | |_ _ __ _ _ _ __ | |__ ___ __ _| | |_| |__
| '_ \| | | | |/ _` | | '_ \ | '_ \ / _ \/ _` | | __| '_ \
| |_) | | |_| | (_| | | | | | | | | | __/ (_| | | |_| | | |
| .__/|_|\__,_|\__, |_|_| |_| |_| |_|\___|\__,_|_|\__|_| |_|
| | __/ |
|_| |___/
组件健康检查管理模块 (phealth)
PluginHealth -- 组件健康管理单例类
-- 支持组件 pidfile 记录;
-- 支持组件健康检查 (a. 常驻进程大小是否超过配置限制; )
"""
import os
import re
import copy
import signal
import threading
import time
class SingletonMeta(type):
_instances = {}
_lock = threading.Lock()
def __call__(cls, *args, **kwargs):
if cls not in cls._instances:
with cls._lock:
if cls not in cls._instances:
cls._instances[cls] = super(SingletonMeta, cls).__call__(*args, **kwargs)
return cls._instances[cls]
class PluginHealth(object):
__metaclass__ = SingletonMeta
def __init__(self, plugin_name="barad_agent", plugin_process=["barad_agent"]):
"""PluginHealth init
:param plugin_name: str; plugin name; pidfile: "/var/run/.{plugin_name}.pid";
:param plugin_process: list; plugin process name;
"""
self.plugin_name = plugin_name
self.plugin_process = plugin_process
self.plugin_process_pids = []
self.plugin_pidfile = "/var/run/.%s.pid" % plugin_name
self.plugin_conf = self.init_plugin_conf()
self.rss_match_rule = re.compile(r"VmRSS:(.*)kB", re.MULTILINE)
self.proc_name_match_rule = re.compile(r"Name:\s+(\S+)", re.MULTILINE)
self.pidfile_update_time = 0
self.cache_data = {"cpu_usage": {"clk_tck": None, "last": 0, "timestamp": time.time()}}
def init_plugin_conf(self):
"""初始化组件资源配置信息
:note: 组件资源限制配置项:
0. 组件健康检查配置项;
1. 所有进程 VmRss 内存 (常驻内存大小, 即进程实际使用的物理内存大小) 占用阈值: 1G;
:todo: 后续将所有组件资源限制配置设计在配置文件中
"""
plugin_conf = {"hc_conf": {"interval": 10, "timestamp": time.time()},
"rss_limit_size": 1024 * 1024}
return plugin_conf
def get_pidfile_is_update(self):
update_time = os.path.getmtime(self.plugin_pidfile)
if self.pidfile_update_time == update_time:
return False
self.pidfile_update_time = update_time
return True
def get_pids(self):
"""获取组件进程PID列表"""
if self.get_pidfile_is_update():
self.plugin_process_pids = self.read_pidfile()
return self.plugin_process_pids
def read_pidfile(self):
"""读取 plugin 组件 pidfile 中的进程列表
:return: list; plugin all process list
"""
pids = []
if not os.path.exists(self.plugin_pidfile):
return pids
with open(self.plugin_pidfile, "r") as fd:
for line in fd:
pid = line.strip()
if pid.isdigit():
pids.append(int(pid))
return pids
def write_pidfile(self, pids):
"""覆盖写入 plugin 组件进程列表到 pidfile 文件
:param pids: plugin all process list
:return: None
"""
with open(self.plugin_pidfile, "w") as fd:
if not pids:
fd.write("")
for pid in pids:
fd.write(str(pid) + "\n")
self.plugin_process_pids = pids
def get_plugin_pids(self, process_name):
"""通过命令获取进程 pid 列表
:param process_name: process name
:return: list; plugin all process list; eg. [108403, 108404, 108405, 108406];
:note: ps --no-headers -o pid -C moagent
"""
plugin_pids = []
try:
cmd = "ps --no-headers -o pid -C %s" % process_name
results = os.popen(cmd).readlines()
for line in results:
plugin_pids.append(int(line.strip()))
except Exception as e:
pass
return plugin_pids
def get_rss_memory(self):
"""获取客户端所有进程的常驻内存大小, 即 VmRSS
:return: total_rss, resident memory set size
:note: 等同于 ps axo rss,comm |grep moagent |awk '{sum+=$1} END {print sum}'
"""
total_rss = 0
for pid in self.get_pids():
with open("/proc/%s/status" % pid, "r") as fd:
tmp_read = fd.read()
proc_name = self.proc_name_match_rule.search(tmp_read)
if proc_name.group(1).strip() != self.plugin_name:
raise Exception("the pidfile and %s process names are different!!!"% self.plugin_name)
match_obj = self.rss_match_rule.search(tmp_read)
if match_obj is not None:
total_rss += int(match_obj.group(1).strip())
return total_rss
def get_cpu_percent(self):
"""获取客户端所有进程的 CPU 使用率
self.cache_data = {"cpu_usage": {"clk_tck": 100, "last": 0, "timestamp": 0}}
cpu_usage_percent = diff(sum(user_time + sys_time)) / interval / sc_clk_tck
note: 首次 cpu_usage_percent = -1 用于标记组件重启时刻;
"""
cpu_usage_percent = 0.0
try:
timestamp = time.time()
cache_data = self.cache_data.get("cpu_usage")
if cache_data.get("clk_tck", None) is None:
clock_ticks = os.sysconf(os.sysconf_names["SC_CLK_TCK"])
cache_data["clk_tck"] = clock_ticks if isinstance(clock_ticks, int) else 100
total_time = 0
for pid in self.get_pids():
stat = "/proc/%s/stat" % pid
with open(stat, "r") as fd:
data = fd.read().split()
total_time += int(data[13]) + int(data[14])
# First collection
if cache_data["last"] == 0:
cpu_usage_percent = -1
# Follow-up Collection
else:
delta_time = total_time - cache_data["last"]
interval = timestamp - cache_data["timestamp"]
if interval > 0:
cpu_usage = delta_time / interval / cache_data["clk_tck"]
cpu_usage_percent = round(cpu_usage * 100, 3)
cache_data["last"] = total_time
cache_data["timestamp"] = timestamp
except Exception as e:
pass
return cpu_usage_percent
def get_thread_num(self):
"""获取客户端所有进程线程数量"""
thread_num = 0
try:
for pid in self.get_pids():
thread_num += len(os.listdir("/proc/%s/task/" % pid))
except Exception as e:
pass
return thread_num
def heart_beat(self):
"""组件心跳配置文件
:todo: 将运行时间, 及其他一些运行时数据写到心跳文件中
"""
pass
def health_check(self):
"""组件各项健康检查
:return: tuple(bool, str);
check result: True: plugin health; False: plugin unhealthy;
check msg: description message when unhealthy;
"""
def is_check():
hc_flag = False
cur_timestamp = time.time()
hc_conf = self.plugin_conf.get("hc_conf", {})
if cur_timestamp - hc_conf.get("timestamp") >= hc_conf.get("interval"):
hc_conf["timestamp"] = cur_timestamp
hc_flag = True
return hc_flag
check_flag = True
check_msg = ""
try:
if not is_check():
return check_flag, check_msg
# 1. VmRSS check
rss_curr_size = self.get_rss_memory()
rss_limit_size = self.plugin_conf.get("rss_limit_size", 1024 * 1024)
if rss_curr_size > rss_limit_size:
check_msg = "VmRSS size exceeds limit: %d > %d KB" % (rss_curr_size, rss_limit_size)
check_flag = False
# 2. Insufficient process count
except Exception as e:
print(e)
return check_flag, check_msg
def exit_plugin_process(self):
"""主动退出所有 plugin 组件下所有进程
:note: 发送 SIGKILL 信号杀死 pidfile 缓存进程; 如果外部有自己维护 Process 对象, 请优先考虑 Process.terminate() 退出;
"""
def process_name(pid):
comm_path = "/proc/%d/comm" % pid
with open(comm_path, "r") as fd:
return fd.read().strip()
def adjust_sequence():
self_pid = os.getpid()
kill_pids = copy.deepcopy(self.plugin_process_pids)
kill_pids.remove(self_pid)
kill_pids.append(self_pid)
return kill_pids
try:
pidfile_pids = self.read_pidfile()
cache_pids = adjust_sequence()
if set(pidfile_pids) != set(cache_pids):
return
for pid in cache_pids:
if process_name(pid) in self.plugin_process:
os.kill(pid, signal.SIGKILL)
except Exception as e:
pass
def main():
plugin_name = "barad_agent"
plugin_process = ["barad_agent"]
plugin_hc = PluginHealth(plugin_name, plugin_process)
plugin_pids = plugin_hc.get_plugin_pids(plugin_name)
plugin_hc.write_pidfile(plugin_pids)
while True:
plugin_info = {
"rss_memory": "%sMb" % (plugin_hc.get_rss_memory() / 1024.0),
"cpu_percent": plugin_hc.get_cpu_percent(),
"thread_num": plugin_hc.get_thread_num(),
"plugin_pids": plugin_hc.read_pidfile(),
"health_check": plugin_hc.health_check()
}
print(plugin_info)
time.sleep(10)
if __name__ == "__main__":
main()