HEX
Server: Apache/2.4.52 (Ubuntu)
System: Linux WebLive 5.15.0-79-generic #86-Ubuntu SMP Mon Jul 10 16:07:21 UTC 2023 x86_64
User: ubuntu (1000)
PHP: 7.4.33
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
Upload Files
File: //usr/local/qcloud/monitor/barad/plugin/collector/vm/cfs_tce_nfs_responding.py
#!/usr/local/qcloud/monitor/python26/bin/python
# -*- coding: utf-8 -*-
"""

"""
import time
import os
import re
import sys

cur_dir = os.path.dirname(os.path.realpath(__file__))
sys.path.append(cur_dir + '/../utils/')
sys.path.append(cur_dir + '/../../base/')
sys.path.append(cur_dir + '/../../../comm/')

from plugin_base import VmBaseCollector
from utils.metric_handler import MetricHandler
from utils.vpc import VpcIdManager

# cvm_ip cvm_uuid un_vpc_id cfs_vip
# not_responding > count
# not_responding > count

# {"192.168.12.17": {"timeouts": 0, "retries": 0}}
# 新增的时间被添加到池中,默认累计+1 并上报

# 100s 内not responding 多少次
# 100s 内not responding 超过100次
#            ||
# 我每100s对100s内的次数进行统计并上报

# 100s 时间点上报后,对原来的累计次数进行清零

# 1 2 3

# 1 100s 50c 100s push
# 2 100s 110c 80s push
# 2 100s 110c 80s push
# [Fri Mar 31 03:10:00 2023] nfs: server 192.168.12.17 not responding, still trying
# [Wed Apr 12 10:55:37 2023] nfs: server 192.168.12.17 not responding, timed out


class Regex:
    ip = re.compile("\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}")
    timeout = re.compile("nfs.*server.*not.*responding.*time.*out.*")
    retry = re.compile("nfs.*server.*not.*responding.*try.*")


class CFSNotRespondingConf:
    # 监听文件路径
    file_name = "/var/log/messages"
    # 监控时间间隔
    # 60s采集一次
    frequency = 60
    # 1分钟内最多解析行数, 超过这个值,则不解析
    max_lines_per_interval = frequency * 10000


I_CONF = CFSNotRespondingConf()
REGEX = Regex()


class CFSNotRespondingCollector(VmBaseCollector):

    ip_events_map = {}
    last_position = 0
    whence = os.SEEK_END

    def init(self):
        # 获取VPC相关信息的管理器
        self.vpc = VpcIdManager()
        # 设置当前收集器的执行频率
        # 这个频率和视图里面的上报频率没有任何关系,只是barad执行咱们脚本的固定时间间隔而已
        self.set_frequency(I_CONF.frequency)
        # 初始化指标处理器并配置对应域和dimensions
        self.handler = MetricHandler()
        self.handler.namespace = 'qce/cfs_tcloud'
        self.handler.dimensions = [
            "cfs_vip",
            "cvm_ip",
            "cvm_uuid",
            "un_vpc_id",
        ]

    def handle_event(self, ip, event):
        events = self.ip_events_map.get(ip, None)
        if not events:
            self.ip_events_map.update({str(ip): {"timeouts": 0, "retries": 0}})
            events = self.ip_events_map.get(ip)
        events.update({event: events.get(event, 0) + 1})
        return 

    def handle_line(self, line):

        ip = REGEX.ip.search(line)
        timeout = REGEX.timeout.search(line)
        retry = REGEX.retry.search(line)

        if ip: ip = ip.group(0)
        if not ip: return
        
        if timeout and ip:
            self.handle_event(ip, "timeouts")

        if retry and ip:
            self.handle_event(ip, "retries")

    def report_to_barad(self):
        for cfs_vip, items in self.ip_events_map.items():
            dimensions = {
                'cvm_uuid': self.get_vm_uuid(),
                'cvm_ip': self.get_vmip(),
                'un_vpc_id': self.vpc.un_vpc_id,
                'cfs_vip': cfs_vip,
            }

            metric = [
                {
                    'name': 'retries',
                    'value': int(items.get('retries', 0))
                },
                {
                    'name': 'timeouts',
                    'value': int(items.get('timeouts', 0))
                },
            ]

            self.handler.add_batch_metric(
                batch=metric,
                dimensions=dimensions,
                timestamp=int(time.time()),
            )
        if len(self.ip_events_map) == 0:
            return
        self.put_data({
            'sender': 'cfs_tcloud_sender',
            'datas': self.handler.pop_metrics()
        })
        self.ip_events_map = {}

    def readlines(self, file,new_last_position):
        count = 0
        while file.tell() < new_last_position and count < I_CONF.max_lines_per_interval:
            count += 1
            line = file.readline()
            if line:
                 # 解析最后一行
                if line and "nfs" in line:
                    self.handle_line(line)
                    line = ''
            else:
                break
        return

    #一个函数获取文件file的最后一行的位置
    def get_file_last_line_pos(self, file):
        file.seek(-1, os.SEEK_END)  # 将文件指针移动到最后一个字节位置
        while file.read(1) != b'\n':  # 从最后一个字节开始向前搜索换行符
            file.seek(-2, os.SEEK_SET)  # 将文件指针向前移动两个字节
        return file.tell() # 获取最后一个换行符的位置

    def do_collect(self):
        try:
            if not os.path.exists(I_CONF.file_name):
                return
            with open(I_CONF.file_name, 'r') as file:
                # 检查日志文件是否被切割
                new_last_position = self.get_file_last_line_pos(file)
                # 复置文件指针文件开始位置
                file.seek(0, os.SEEK_SET)
                if new_last_position < self.last_position:
                    self.last_position = 0  # 重新定位文件指针
                # 将文件指针移动到上次读取的位置
                file.seek(self.last_position,self.whence)
                if self.whence == os.SEEK_END and self.last_position == 0: # 第一次启动从文件末尾读取
                    self.whence = os.SEEK_SET # 从self.last_position开始读
                    self.last_position=new_last_position # 第一次不读到任何东西
                    return
                # 读取新增的日志消息
                self.readlines(file,new_last_position)
                # 更新上次读取的位置
                self.last_position = file.tell()
                self.report_to_barad()
        except  Exception as e:
            # 捕获所有异常,并打印出原因
            self.logger().info("An exception occurred:%s", str(e))


def main():
    collector = CFSNotRespondingCollector()
    collector.init()
    collector.collect()
    collector.dump_data()


if __name__ == '__main__':
    main()