HEX
Server: Apache/2.4.52 (Ubuntu)
System: Linux WebLive 5.15.0-79-generic #86-Ubuntu SMP Mon Jul 10 16:07:21 UTC 2023 x86_64
User: ubuntu (1000)
PHP: 7.4.33
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
Upload Files
File: //usr/local/qcloud/monitor/barad/comm/phealth.py
#!/usr/bin/env python
# -*- coding: UTF-8 -*-

"""
       _             _          _                _ _   _
      | |           (_)        | |              | | | | |
 _ __ | |_   _  __ _ _ _ __    | |__   ___  __ _| | |_| |__
| '_ \| | | | |/ _` | | '_ \   | '_ \ / _ \/ _` | | __| '_ \
| |_) | | |_| | (_| | | | | |  | | | |  __/ (_| | | |_| | | |
| .__/|_|\__,_|\__, |_|_| |_|  |_| |_|\___|\__,_|_|\__|_| |_|
| |             __/ |
|_|            |___/

组件健康检查管理模块 (phealth)
    PluginHealth -- 组件健康管理单例类
        -- 支持组件 pidfile 记录;
        -- 支持组件健康检查 (a. 常驻进程大小是否超过配置限制; )
"""

import os
import re
import copy
import signal
import threading
import time


class SingletonMeta(type):
    _instances = {}
    _lock = threading.Lock()

    def __call__(cls, *args, **kwargs):
        if cls not in cls._instances:
            with cls._lock:
                if cls not in cls._instances:
                    cls._instances[cls] = super(SingletonMeta, cls).__call__(*args, **kwargs)
        return cls._instances[cls]


class PluginHealth(object):
    __metaclass__ = SingletonMeta

    def __init__(self, plugin_name="barad_agent", plugin_process=["barad_agent"]):
        """PluginHealth init
        :param plugin_name: str; plugin name; pidfile: "/var/run/.{plugin_name}.pid";
        :param plugin_process: list; plugin process name;
        """
        self.plugin_name = plugin_name
        self.plugin_process = plugin_process
        self.plugin_process_pids = []
        self.plugin_pidfile = "/var/run/.%s.pid" % plugin_name
        self.plugin_conf = self.init_plugin_conf()
        self.rss_match_rule = re.compile(r"VmRSS:(.*)kB", re.MULTILINE)
        self.proc_name_match_rule = re.compile(r"Name:\s+(\S+)", re.MULTILINE)
        self.pidfile_update_time = 0
        self.cache_data = {"cpu_usage": {"clk_tck": None, "last": 0, "timestamp": time.time()}}

    def init_plugin_conf(self):
        """初始化组件资源配置信息
        :note: 组件资源限制配置项:
            0. 组件健康检查配置项;
            1. 所有进程 VmRss 内存 (常驻内存大小, 即进程实际使用的物理内存大小) 占用阈值: 1G;
        :todo: 后续将所有组件资源限制配置设计在配置文件中
        """
        plugin_conf = {"hc_conf": {"interval": 10, "timestamp": time.time()},
                       "rss_limit_size": 1024 * 1024}
        return plugin_conf
    
    def get_pidfile_is_update(self):
        update_time = os.path.getmtime(self.plugin_pidfile)
        if self.pidfile_update_time == update_time:
            return False
        self.pidfile_update_time = update_time
        return True

    def get_pids(self):
        """获取组件进程PID列表"""
        if self.get_pidfile_is_update():
            self.plugin_process_pids = self.read_pidfile()
        return self.plugin_process_pids

    def read_pidfile(self):
        """读取 plugin 组件 pidfile 中的进程列表
        :return: list; plugin all process list
        """
        pids = []
        if not os.path.exists(self.plugin_pidfile):
            return pids
        with open(self.plugin_pidfile, "r") as fd:
            for line in fd:
                pid = line.strip()
                if pid.isdigit():
                    pids.append(int(pid))
        return pids

    def write_pidfile(self, pids):
        """覆盖写入 plugin 组件进程列表到 pidfile 文件
        :param pids: plugin all process list
        :return: None
        """
        with open(self.plugin_pidfile, "w") as fd:
            if not pids:
                fd.write("")
            for pid in pids:
                fd.write(str(pid) + "\n")
        self.plugin_process_pids = pids

    def get_plugin_pids(self, process_name):
        """通过命令获取进程 pid 列表
        :param process_name: process name
        :return: list; plugin all process list; eg. [108403, 108404, 108405, 108406];
        :note: ps --no-headers -o pid -C moagent
        """
        plugin_pids = []
        try:
            cmd = "ps --no-headers -o pid -C %s" % process_name
            results = os.popen(cmd).readlines()
            for line in results:
                plugin_pids.append(int(line.strip()))
        except Exception as e:
            pass
        return plugin_pids

    def get_rss_memory(self):
        """获取客户端所有进程的常驻内存大小, 即 VmRSS
        :return: total_rss, resident memory set size
        :note: 等同于 ps axo rss,comm |grep moagent |awk '{sum+=$1} END {print sum}'
        """
        total_rss = 0
        for pid in self.get_pids():
            with open("/proc/%s/status" % pid, "r") as fd:
                tmp_read = fd.read()
                proc_name = self.proc_name_match_rule.search(tmp_read)
                if proc_name.group(1).strip() != self.plugin_name:
                    raise Exception("the pidfile and %s process names are different!!!"% self.plugin_name)
                match_obj = self.rss_match_rule.search(tmp_read)
                if match_obj is not None:
                    total_rss += int(match_obj.group(1).strip())
        return total_rss

    def get_cpu_percent(self):
        """获取客户端所有进程的 CPU 使用率
            self.cache_data = {"cpu_usage": {"clk_tck": 100, "last": 0, "timestamp": 0}}
            cpu_usage_percent = diff(sum(user_time + sys_time)) / interval / sc_clk_tck
        note: 首次 cpu_usage_percent = -1 用于标记组件重启时刻;
        """
        cpu_usage_percent = 0.0
        try:
            timestamp = time.time()
            cache_data = self.cache_data.get("cpu_usage")
            if cache_data.get("clk_tck", None) is None:
                clock_ticks = os.sysconf(os.sysconf_names["SC_CLK_TCK"])
                cache_data["clk_tck"] = clock_ticks if isinstance(clock_ticks, int) else 100

            total_time = 0
            for pid in self.get_pids():
                stat = "/proc/%s/stat" % pid
                with open(stat, "r") as fd:
                    data = fd.read().split()
                    total_time += int(data[13]) + int(data[14])
            # First collection
            if cache_data["last"] == 0:
                cpu_usage_percent = -1
            # Follow-up Collection
            else:
                delta_time = total_time - cache_data["last"]
                interval = timestamp - cache_data["timestamp"]
                if interval > 0:
                    cpu_usage = delta_time / interval / cache_data["clk_tck"]
                    cpu_usage_percent = round(cpu_usage * 100, 3)
            cache_data["last"] = total_time
            cache_data["timestamp"] = timestamp
        except Exception as e:
            pass
        return cpu_usage_percent

    def get_thread_num(self):
        """获取客户端所有进程线程数量"""
        thread_num = 0
        try:
            for pid in self.get_pids():
                thread_num += len(os.listdir("/proc/%s/task/" % pid))
        except Exception as e:
            pass
        return thread_num

    def heart_beat(self):
        """组件心跳配置文件
        :todo: 将运行时间, 及其他一些运行时数据写到心跳文件中
        """
        pass

    def health_check(self):
        """组件各项健康检查
        :return: tuple(bool, str);
                    check result: True: plugin health; False: plugin unhealthy;
                    check msg: description message when unhealthy;
        """
        def is_check():
            hc_flag = False
            cur_timestamp = time.time()
            hc_conf = self.plugin_conf.get("hc_conf", {})
            if cur_timestamp - hc_conf.get("timestamp") >= hc_conf.get("interval"):
                hc_conf["timestamp"] = cur_timestamp
                hc_flag = True
            return hc_flag

        check_flag = True
        check_msg = ""
        try:
            if not is_check():
                return check_flag, check_msg
            # 1. VmRSS check
            rss_curr_size = self.get_rss_memory()
            rss_limit_size = self.plugin_conf.get("rss_limit_size", 1024 * 1024)
            if rss_curr_size > rss_limit_size:
                check_msg = "VmRSS size exceeds limit: %d > %d KB" % (rss_curr_size, rss_limit_size)
                check_flag = False
            # 2. Insufficient process count
        except Exception as e:
            print(e)
        return check_flag, check_msg

    def exit_plugin_process(self):
        """主动退出所有 plugin 组件下所有进程
        :note: 发送 SIGKILL 信号杀死 pidfile 缓存进程; 如果外部有自己维护 Process 对象, 请优先考虑 Process.terminate() 退出;
        """
        def process_name(pid):
            comm_path = "/proc/%d/comm" % pid
            with open(comm_path, "r") as fd:
                return fd.read().strip()

        def adjust_sequence():
            self_pid = os.getpid()
            kill_pids = copy.deepcopy(self.plugin_process_pids)
            kill_pids.remove(self_pid)
            kill_pids.append(self_pid)
            return kill_pids

        try:
            pidfile_pids = self.read_pidfile()
            cache_pids = adjust_sequence()
            if set(pidfile_pids) != set(cache_pids):
                return
            for pid in cache_pids:
                if process_name(pid) in self.plugin_process:
                    os.kill(pid, signal.SIGKILL)
        except Exception as e:
            pass


def main():
    plugin_name = "barad_agent"
    plugin_process = ["barad_agent"]
    plugin_hc = PluginHealth(plugin_name, plugin_process)
    plugin_pids = plugin_hc.get_plugin_pids(plugin_name)
    plugin_hc.write_pidfile(plugin_pids)
    while True:
        plugin_info = {
            "rss_memory": "%sMb" % (plugin_hc.get_rss_memory() / 1024.0),
            "cpu_percent": plugin_hc.get_cpu_percent(),
            "thread_num": plugin_hc.get_thread_num(),
            "plugin_pids": plugin_hc.read_pidfile(),
            "health_check": plugin_hc.health_check()
        }
        print(plugin_info)
        time.sleep(10)


if __name__ == "__main__":
    main()