HEX
Server: Apache/2.4.52 (Ubuntu)
System: Linux WebLive 5.15.0-79-generic #86-Ubuntu SMP Mon Jul 10 16:07:21 UTC 2023 x86_64
User: ubuntu (1000)
PHP: 7.4.33
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
Upload Files
File: //usr/local/qcloud/monitor/barad/plugin/collector/vm/cfs_long_frequency_collector.py
#!/usr/local/qcloud/monitor/python26/bin/python
# -*- coding: utf-8 -*-

import sys
import os
import threading
import re
import time
from collections import defaultdict
import urllib2
import socket
from contextlib import closing

sys.path.append(os.path.dirname(os.path.realpath(__file__)) + '/../../../../comm/')
sys.path.append(os.path.dirname(os.path.realpath(__file__)) + '/../../../base/')
sys.path.append(os.path.dirname(os.path.realpath(__file__)) + '/../../utils/')
import constant
from plugin_base import VmBaseCollector
from utils.metric_handler import MetricHandler
from cutils import CommUtils, generate_config


class CfsLongFrequencyCollector(VmBaseCollector):

    META_URL = generate_config(constant.PLUGIN_CONFIG_PATH, "MetaDataServer")["metadata_url"]

    VM_APPID = ''
    APPID_MUTEX = threading.Lock()

    VM_VPCID = ''
    VPCID_MUTEX = threading.Lock()

    def init(self):
        self.set_frequency(60)
        self.handler = MetricHandler()
        self.handler.dimensions = ['uuid', 'cfsvip', 'mountdir']
        self.stat_mount_point_collector = None
        self.lustre_obd_device_collector = None
        self.lustre_log_filter_collector = None
        self.check_network_collector = None


    @staticmethod
    def getset_vm_appid():
        if CfsLongFrequencyCollector.VM_APPID != '':
            return CfsLongFrequencyCollector.VM_APPID

        with CfsLongFrequencyCollector.APPID_MUTEX:
            if CfsLongFrequencyCollector.VM_APPID != '':
                return CfsLongFrequencyCollector.VM_APPID
            try:
                url = CfsLongFrequencyCollector.META_URL + 'app-id'
                response = urllib2.urlopen(url, timeout=2)
                content = response.read()
                if content:
                    CfsLongFrequencyCollector.VM_APPID = content
            except Exception as e:
                CfsLongFrequencyCollector.VM_APPID = ''

        return CfsLongFrequencyCollector.VM_APPID

    @staticmethod
    def getset_vm_vpcid():
        if CfsLongFrequencyCollector.VM_VPCID != '':
            return CfsLongFrequencyCollector.VM_VPCID

        with CfsLongFrequencyCollector.VPCID_MUTEX:
            if CfsLongFrequencyCollector.VM_VPCID != '':
                return CfsLongFrequencyCollector.VM_VPCID
            try:
                url = CfsLongFrequencyCollector.META_URL + 'mac'
                response = urllib2.urlopen(url, timeout=2)
                content = response.read()
                mac = content
                if not mac:
                    CfsLongFrequencyCollector.VM_VPCID = ''

                url = CfsLongFrequencyCollector.META_URL + 'network/interfaces/macs/' + mac + '/vpc-id'
                response = urllib2.urlopen(url, timeout=2)
                content = response.read()
                if content:
                    CfsLongFrequencyCollector.VM_VPCID = content
            except Exception:
                CfsLongFrequencyCollector.VM_VPCID = ''

        return CfsLongFrequencyCollector.VM_VPCID

    def do_collect(self):
        if self.stat_mount_point_collector is None:
            vm_uuid = self.get_vm_uuid()
            appid = self.getset_vm_appid()
            vpcid = self.getset_vm_vpcid()
            vmip = self.get_vmip()

            if self.stat_mount_point_collector is None:
                self.stat_mount_point_collector = CfsAdaptorStatMountPoint(vm_uuid, appid, vpcid, vmip)
            if self.lustre_obd_device_collector is None:
                self.lustre_obd_device_collector = CfsAdaptorLusterObdDevice(vm_uuid, appid, vpcid, vmip)
            if self.lustre_log_filter_collector is None:
                self.lustre_log_filter_collector = CfsAdaptorLusterLogFilter(vm_uuid, appid, vpcid, vmip)
            if self.check_network_collector is None:
                self.check_network_collector = CfsAdaptorCheckNetwork(vm_uuid, appid, vpcid, vmip)

        self.stat_mount_point_collector.do_collect()
        self.lustre_obd_device_collector.do_collect()
        self.lustre_log_filter_collector.do_collect()
        self.check_network_collector.do_collect()

        for item in self.stat_mount_point_collector.do_dump_data():
            self.put_data(item)
        for item in self.lustre_obd_device_collector.do_dump_data():
            self.put_data(item)
        for item in self.lustre_log_filter_collector.do_dump_data():
            self.put_data(item)
        for item in self.check_network_collector.do_dump_data():
            self.put_data(item)


class CfsMonitorStatMountPoint(object):

    def __init__(self):
        self.mountstats = CfsMonitorMountInfo()

    def stat_mount_point(self):
        result = dict()
        # when one cfs mounted by multi mount points, just stat once
        stat_result = dict()
        mount_info_list = self.mountstats.nfs_mount_info + self.mountstats.lustre_mount_info
        if not mount_info_list:
            return result
        for mount_info in mount_info_list:
            fstype = mount_info['filesystem_type']
            mount_point = mount_info['mount_point']

            if "nfs" in mount_info['filesystem_type']:
                cfsvip = mount_info['mount_source'].split(':')[0]
            elif "lustre" == mount_info['filesystem_type']:
                cfsvip = mount_info['mount_source'].split(':')[0]
                if "@" in cfsvip:
                    cfsvip = cfsvip.split("@")[0]
            else:
                continue


            history_result = stat_result.get((cfsvip, fstype), None)
            if history_result is not None:
                result[(mount_point, cfsvip)] = history_result
                continue

            cmd = "ps aux | grep 'stat %s' | grep -v 'grep' | wc -l" % mount_point
            output = CommUtils.ExecuteTimeoutCommand(cmd, 3).splitlines()
            if not output:
                stat_result[(cfsvip, fstype)] = 0
                result[(mount_point, cfsvip)] = 0
                continue
            cnt = int(output[0])
            if cnt >= 1:
                stat_result[(cfsvip, fstype)] = cnt
                result[(mount_point, cfsvip)] = cnt
            else:
                cmd = "stat %s" % mount_point
                output = CommUtils.ExecuteTimeoutCommand(cmd, 3)
                if output:
                    stat_result[(cfsvip, fstype)] = 0
                    result[(mount_point, cfsvip)] = 0
                else:
                    stat_result[(cfsvip, fstype)] = 1
                    result[(mount_point, cfsvip)] = 1

        return result


class CfsAdaptorStatMountPoint(object):
    """
    stat mount points and generate data in barad format
    使用stat探测挂载点,并生成barad格式
    """

    name = "stat_mount_point_adaptor"

    def __init__(self, vm_uuid, app_id, vpc_id, vm_ip):
        super(CfsAdaptorStatMountPoint, self).__init__()
        self.handler = MetricHandler()
        self.barad_data = dict()
        self.vm_uuid = vm_uuid
        self.app_id = app_id
        self.vpc_id = vpc_id
        self.vm_ip = vm_ip

    def do_collect(self):
        monitor = CfsMonitorStatMountPoint()
        self.barad_data = monitor.stat_mount_point()

    def do_dump_data(self):
        result = list()
        now = int(time.time())
        self.handler.namespace = 'qce/cfs'
        self.handler.dimensions = [
            'uuid', 'cfsvip', 'mountdir', 'vmip', 'appid', 'vpcid']
        for mount_info, stat_data in self.barad_data.items():
            mountdir = mount_info[0]
            cfsvip = mount_info[1]
            dimensions = {
                'uuid': self.vm_uuid,
                'cfsvip': cfsvip,
                'mountdir': mountdir,
                'vmip': self.vm_ip,
                'appid': self.app_id,
                'vpcid': self.vpc_id
            }

            if stat_data != 0:
                issucc = 0
            else:
                issucc = 1

            batch_metric = [
                {
                    'name': 'issucc',
                    'value': int(issucc)
                }
            ]
            self.handler.add_batch_metric(
                batch=batch_metric,
                dimensions=dimensions,
                timestamp=now
            )
        if self.handler.get_metrics():
            data = {
                'sender': 'cfs_sender',
                'datas': self.handler.pop_metrics()
            }
            result.append(data)

        return result


class CfsAdaptorLusterObdDevice(object):
    """
    collect lustre obd device and generate data in barad format
    收集obd device信息,并生成barad格式
    """

    name = "lustre_obd_device_adaptor"

    def __init__(self, vm_uuid, app_id, vpc_id, vm_ip):
        super(CfsAdaptorLusterObdDevice, self).__init__()
        self.handler = MetricHandler()
        self.obd_device_data = list()
        self.vm_uuid = vm_uuid
        self.app_id = app_id
        self.vpc_id = vpc_id
        self.vm_ip = vm_ip

    def do_collect(self):
        self.obd_device_data = CfsMonitorLustreObdDevice.get_obd_device_data()

    def do_dump_data(self):
        data = list()

        obd_device_count_data = self.dump_obd_count_data()
        if obd_device_count_data:
            data += obd_device_count_data
        
        return data

    def dump_obd_count_data(self):
        result = list()
        now = int(time.time())
        self.handler.namespace = 'qce/cfs'
        self.handler.dimensions = [
            'uuid', 'vmip', 'appid', 'vpcid']
        if self.obd_device_data:
            dimensions = {
                'uuid': self.vm_uuid,
                'vmip': self.vm_ip,
                'appid': self.app_id,
                'vpcid': self.vpc_id
            }

            batch_metric = [
                {
                    'name': 'lustre_obd_device_count',
                    'value': int(len(self.obd_device_data))
                }
            ]
            self.handler.add_batch_metric(
                batch=batch_metric,
                dimensions=dimensions,
                timestamp=now
            )
            if self.handler.get_metrics():
                data = {
                    'sender': 'cfs_sender',
                    'datas': self.handler.pop_metrics()
                }
                result.append(data)

        return result


class CfsMonitorLustreObdDevice(object):
    """
    obd device monitor
    """
    @classmethod
    def get_obd_device_data(cls, obd_device_file="/sys/kernel/debug/lustre/devices"):
        """
        since obd devices file has no more than 8192 lines, just read all lines
        return: none if "/sys/kernel/debug/lustre/devices" is not exist
                lines if if "/sys/kernel/debug/lustre/devices" is exist
        """
        if not os.path.exists(obd_device_file):
            return None

        lines = []
        with open(obd_device_file, "r") as f:
            lines = f.readlines()
        return lines


class CfsAdaptorLusterLogFilter(object):
    """
    filte lustre log and generate data in barad format
    过滤lustre日志信息,并生成barad格式
    """

    name = "lustre_log_filter_adaptor"

    def __init__(self, vm_uuid, app_id, vpc_id, vm_ip):
        super(CfsAdaptorLusterLogFilter, self).__init__()
        self.handler = MetricHandler()
        self.err_monitor = CfsMonitorLustreLogFilter()
        self.err_data = dict
        self.vm_uuid = vm_uuid
        self.app_id = app_id
        self.vpc_id = vpc_id
        self.vm_ip = vm_ip

    def do_collect(self):
        self.err_data = self.err_monitor.get_errors_from_messages_in_last_1min()

    def do_dump_data(self):
        data = list()

        err_count_data = self.dump_err_count_data()
        if err_count_data:
            data += err_count_data
        
        return data

    def dump_err_count_data(self):
        result = list()
        now = int(time.time())
        self.handler.namespace = 'qce/cfs'
        self.handler.dimensions = [
            'uuid', 'vmip', 'appid', 'vpcid', 'error_name', 'func_name', 'server_ip', 'target_name', 'fs_name']
        if self.err_data:
            for err_name, err_list in self.err_data.items():
                tmp_result = defaultdict(int)
                err_log_list = err_list

                # 暂时去除func_name维度上报,减少上报量
                group_name = CfsMonitorLustreLogFilter.ERR_REG_MAP[err_name]['group_name']
                if 'func_name' in group_name:
                    i = group_name.index('func_name')
                    err_log_list = list()
                    for err in err_list:
                        tmp_list = list(err[0])
                        tmp_list[i] = 'NA'
                        err_log_list.append((tuple(tmp_list),))

                for err in err_log_list:
                    err_dimensions = err[0]
                    tmp_result[err_dimensions] += 1;
                
                for err_dimensions, err_count in tmp_result.items():

                    group_name = CfsMonitorLustreLogFilter.ERR_REG_MAP[err_name]['group_name']
                    err_dimensions_dict = zip(group_name, err_dimensions)

                    dimensions = {
                        'uuid': self.vm_uuid,
                        'vmip': self.vm_ip,
                        'appid': self.app_id,
                        'vpcid': self.vpc_id,
                        'error_name': err_name,
                        'func_name': 'NA',
                        'server_ip': 'NA',
                        'target_name': 'NA',
                        'fs_name': 'NA',
                    }

                    dimensions.update(err_dimensions_dict)

                    batch_metric = [
                        {
                            'name': 'lustre_log_error_count',
                            'value': int(err_count)
                        }
                    ]
                    self.handler.add_batch_metric(
                        batch=batch_metric,
                        dimensions=dimensions,
                        timestamp=now
                    )

            if self.handler.get_metrics():
                data = {
                    'sender': 'cfs_sender',
                    'datas': self.handler.pop_metrics()
                }
                result.append(data)

        return result


class CfsMonitorLustreLogFilter(object):
    """
    log filter monitor
    """
    MAX_LOG_FILE_SIZE = 5*1024*1024

    G_FUNC_NAME = 'func_name'
    G_IP = 'server_ip'
    G_TARGET_NAME = 'target_name'
    G_FS_NAME = 'fs_name'

    ERR_REG_MAP = {
        'cfs_turbo_lnet_error': {
           'reg': r'.*\s*LNetError:.*?(\d+\.\d+\.\d+\.\d+)@tcp.*',
           'group_name': [G_IP] 
        },
        'cfs_turbo_client_evict': {
           'reg': r'.*\s*LustreError:.*?This\sclient\swas\sevicted\sby\s(.*)-(.*);.*',
           'group_name': [G_FS_NAME, G_TARGET_NAME] 
        },
        'cfs_turbo_req_timeout': {
           'reg': r'.*\s*Lustre:.*\(.*:(.*)\(\)\).*Request\ssent\shas\stimed\sout\sfor.*->(.*?)-(.*?)-.*',
           'group_name': [G_FUNC_NAME, G_FS_NAME, G_TARGET_NAME] 
        },
        'cfs_turbo_target_error': {
           'reg': r'.*\s*LustreError:.*\s(.*?)-(.*?)-.*rc\s=.*',
           'group_name': [G_FS_NAME, G_TARGET_NAME] 
        },
        'cfs_turbo_fs_error': {
           'reg': r'.*\s*LustreError:.*\(.*:(.*)\(\)\)\s*(.*?):.*rc\s*=.*',
           'group_name': [G_FUNC_NAME, G_FS_NAME] 
        },
    }

    def __init__(self):
        self.log_path = self.get_log_path()
        self.compiled_reg_map = self.get_compiled_reg_map()

    def is_lustre_mounted(self, mountinfo_path='/proc/self/mountinfo'):
        if not os.path.exists(mountinfo_path):
            return False
        for line in open(mountinfo_path):
            if line.split(' ')[-3] == 'lustre':
                return True
        return False
    
    def get_compiled_reg_map(self):
        compiled_reg_map = dict()
        for key, item in self.ERR_REG_MAP.items():
            compiled_reg_map[key] = {
                'reg': re.compile(item['reg']),
                'group_name': item['group_name']
            }
        return compiled_reg_map

    def get_log_path(self):
        if os.path.exists('/var/log/messages'):
            # normal case: centos,tencentos,etc.
            return '/var/log/messages'
        elif os.path.exists('/var/log/kern.log'):
            # ubuntu
            return '/var/log/kern.log'
        else:
            # skip this client
            return None

    def gen_last_1min_date_str(self):
        last_1min = time.strftime("%b %d %H:%M", time.localtime(time.time()-60))
        # change 'Jan 06 20:25:43' to 'Jan  6 20:25:43'
        if last_1min[4] == 0:
            last_1min = last_1min[:4]+" "+last_1min[5:]
        return last_1min

    def get_errors_from_messages_in_last_1min(self):
        """
        get last 1 min lustre error from linux messages and upload in class
        从前一分钟的messages里筛选lustre错误日志并分类上报
        return: {
            'cfs_turbo_lnet_error':[
                (
                    ('10.0.0.1','abcde'),
                )
            ]
        }
        """
        result = defaultdict(list)
        if not self.is_lustre_mounted():
            return result
        if self.log_path is None:
            return result
        
        max_line_count = 5000

        with open(self.log_path) as f:
            # 截取最后MAX_LOG_FILE_SIZE, 避免读取内容过多
            log_size = os.path.getsize(self.log_path)
            if log_size > self.MAX_LOG_FILE_SIZE:
                f.seek(log_size - self.MAX_LOG_FILE_SIZE)
            lines = f.readlines()
        
        last_1min_str = self.gen_last_1min_date_str()
        index = 0
        for line in reversed(lines):
            index += 1
            if index > max_line_count:
                break
            if line.startswith(last_1min_str):
                for error_name, error_item in self.compiled_reg_map.items():
                    reg_result =  error_item['reg'].match(line)
                    if reg_result:
                        # 插入一个元组,后续需要原文的时候可以在这个元组里再加原文
                        result[error_name].append((reg_result.groups(),))
                        # no more reg check needed
                        break
        return result

class CfsMonitorMountInfo(object):
    """
    parse mount info
    see https://man7.org/linux/man-pages/man5/proc.5.html#:~:text=/proc/%5Bpid%5D/mountinfo%20(since%20Linux%202.6.26)
    """

    MOUNT_INFO_KEY_BEFORE_SEPARATOR = [
        "mount_id",
        "parent_id",
        "major:minor",
        "root",
        "mount_point",
        "mount_option",
        "optional_fields",
    ]

    MOUNT_INFO_KEY_AFTER_SEPARATOR = [
        "filesystem_type",
        "mount_source",
        "super_options",
    ]

    def __init__(self):
        self.mount_info = list()
        self.nfs_mount_info = list()
        self.lustre_mount_info = list()
        
        self._parse_mount_info()

    def _parse_mount_info(self, mountinfo_path='/proc/self/mountinfo'):
        if not os.path.exists(mountinfo_path):
            return
        for line in open(mountinfo_path):
            mount_info_item = dict()
            line_items = line.split('-')
            if len(line_items) != 2:
                continue
            
            line_before = line_items[0].strip()
            line_after = line_items[1].strip()

            # '-'之前的optional_field数量不定,要分开处理
            line_before_items = line_before.split(" ", 6)
            if len(line_before_items) == 6:
                line_before_items.append("")
            if len(line_before_items) != 7:
                continue
            mount_info_item.update(zip(self.MOUNT_INFO_KEY_BEFORE_SEPARATOR, line_before_items))

            # 处理后半
            line_after_items = line_after.split(" ")
            if len(line_after_items) != 3:
                continue
            mount_info_item.update(zip(self.MOUNT_INFO_KEY_AFTER_SEPARATOR, line_after_items))

            self.mount_info.append(mount_info_item)
            if 'nfs' in mount_info_item['filesystem_type']:
                self.nfs_mount_info.append(mount_info_item)
            if mount_info_item['filesystem_type'] == 'lustre':
                self.lustre_mount_info.append(mount_info_item)

class CfsAdaptorCheckNetwork(object):
    """
    check if cfs vip is available
    检查CFS VIP是否可用, 并生成barad格式
    """

    name = "check_network_adaptor"

    def __init__(self, vm_uuid, app_id, vpc_id, vm_ip):
        super(CfsAdaptorCheckNetwork, self).__init__()
        self.handler = MetricHandler()
        self.barad_data = dict()
        self.vm_uuid = vm_uuid
        self.app_id = app_id
        self.vpc_id = vpc_id
        self.vm_ip = vm_ip

    def do_collect(self):
        monitor = CfsMonitorCheckNetwork()
        self.barad_data = monitor.check_network()

    def do_dump_data(self):
        result = list()
        now = int(time.time())
        self.handler.namespace = 'qce/cfs'
        self.handler.dimensions = [
            'uuid', 'cfsvip', 'mountdir', 'vmip', 'appid', 'vpcid']
        for mount_info, check_network_data in self.barad_data.items():
            mountdir = mount_info[0]
            cfsvip = mount_info[1]
            dimensions = {
                'uuid': self.vm_uuid,
                'cfsvip': cfsvip,
                'mountdir': mountdir,
                'vmip': self.vm_ip,
                'appid': self.app_id,
                'vpcid': self.vpc_id
            }

            batch_metric = [
                {
                    'name': 'net_issucc',
                    'value': int(check_network_data)
                }
            ]
            self.handler.add_batch_metric(
                batch=batch_metric,
                dimensions=dimensions,
                timestamp=now
            )
        if self.handler.get_metrics():
            data = {
                'sender': 'cfs_sender',
                'datas': self.handler.pop_metrics()
            }
            result.append(data)

        return result

class CfsMonitorCheckNetwork(object):

    def __init__(self):
        self.mountstats = CfsMonitorMountInfo()

    def check_network(self):
        result = dict()
        # when one cfs mounted by multi mount points, just check once
        check_result = dict()
        mount_info_list = self.mountstats.nfs_mount_info + self.mountstats.lustre_mount_info
        for mount_info in mount_info_list:
            fstype = mount_info['filesystem_type']
            mount_point = mount_info['mount_point']

            if "nfs" in mount_info['filesystem_type']:
                cfsvip = mount_info['mount_source'].split(':')[0]
                cfsport = 2049
            elif "lustre" == mount_info['filesystem_type']:
                cfsvip = mount_info['mount_source'].split(':')[0]
                if "@" in cfsvip:
                    cfsvip = cfsvip.split("@")[0]
                cfsport = 988
            else:
                continue

            if not self.validate_ipv4(cfsvip):
                continue

            history_result = check_result.get((cfsvip, fstype), None)
            if history_result is not None:
                result[(mount_point, cfsvip)] = history_result
                continue

            if self.check_socket(cfsvip, cfsport):
                    check_result[(cfsvip, fstype)] = 1
                    result[(mount_point, cfsvip)] = 1
            else:
                    check_result[(cfsvip, fstype)] = 0
                    result[(mount_point, cfsvip)] = 0

        return result
    
    def check_socket(self, host, port):
        with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
            sock.settimeout(3)
            try:
                if sock.connect_ex((host, port)) == 0:
                    return True
                else:
                    return False
            except socket.error as e:
                return False
            
    def validate_ipv4(self, s):
        a = s.split('.')
        if len(a) != 4:
            return False
        for x in a:
            if not x.isdigit():
                return False
            i = int(x)
            if i < 0 or i > 255:
                return False
        return True

def main():
    collector = CfsLongFrequencyCollector()
    collector.init()
    collector.collect()
    collector.dump_data()


if __name__ == '__main__':
    main()