HEX
Server: Apache/2.4.52 (Ubuntu)
System: Linux WebLive 5.15.0-79-generic #86-Ubuntu SMP Mon Jul 10 16:07:21 UTC 2023 x86_64
User: ubuntu (1000)
PHP: 7.4.33
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
Upload Files
File: //usr/local/qcloud/monitor/barad/plugin/collector/metal/kernelevent.py
# -*- coding: utf-8 -*-

import sys
import time
import random
import re
import urllib2
import json
import os
from os import path

sys.path.append(os.getcwd() + '/../../../comm/')
# sys.path.append('..')
# sys.path.append('../../base/')
import constant

from utils.log_utils import LogUtils
from utils.metric_handler import MetricHandler
from plugin_base import VmBaseCollector, HostBaseCollector
import cutils

USERE = False #是否使用正则表达式
INTERVAL = 10 # 默认的收集间隔
MAXLINES = 20 # 每次扫描默认读取的最大异常事件数
FILENAME = '/var/log/messages' #扫描的日志文件名
CONF_REFRESH = 240   # 配置更新周期

TMP_PATH = os.getcwd() + '/../tmp/'    # 临时文件目录
CONFIG_FILE_PATH = os.getcwd() + '/../etc/kernel_event_config.json'# 配置文件目录
MAXSIZE = 65536      # 每条日志记录的处理缓冲区大小
LASTSIZE = 5120      # 文件尾部

class KernelEventCollector(VmBaseCollector):
    def init(self):
        self.set_frequency(10)
        self.__nextrefresh = 0  #更新配置的时间间隔
        self.__nextcheck = 0    #收集日志的时间间隔
        self.__loginfo = LogInfo(self.logger())
        self.__lanip = cutils.local_ip()

    def do_collect(self):
        if time.time() > self.__nextrefresh:
            try:
                self.__loginfo.conf_refresh()
                # 随机分配各个agent的更新时间,以达到分散请求的目的
                self.__nextrefresh = time.time() + CONF_REFRESH + int(random.uniform(0, 60))
            except Exception, e:
                self.logger().error(e)
                self.__nextrefresh = time.time() + 60

        if time.time() > self.__nextcheck:
            try:
                self.__loginfo.active_checks() # 日志的收集
                self.__nextcheck = time.time() + INTERVAL
                self.__send_data()
            except Exception, e:
                self.logger().error(e)
                self.__nextcheck = time.time() + 10

    def __send_data(self):
        data = self.__loginfo.getCollectData()
        if 0 == len(data):
            return
        timestamp = int(time.time())
        for logobj,keywords in data.iteritems():
            for keyword, keyword_count in keywords.iteritems():
                self.logger().info("[logobj] = "+ str(logobj) + " [keyword] = " + str(keyword) + " [keyword_count] = " + str(keyword_count['count']))
                if 0 < keyword_count['count']:
                    additionalMsg = []
                    dim = [
                        {"key": "uuid", "value": self.get_vm_uuid()}
                    ]
                    event_metric = {"version": 1, "Action": "SendEventAlarm", "caller": "cvm", "callee": "QCMonitor",
                                    "productName": "cvm", "timestamp": timestamp, "occurTime": timestamp,
                                    "dimensions": dim}
                    
                    if str(logobj) == "guest_oom":                
                        event = {"eventName": "guest_oom", "status": 1, "additionalMsg": additionalMsg}
                    else:
                        event = {"eventName": "guest_core_error", "status": 1, "additionalMsg": additionalMsg}
                    data = {'sender': 'event_sender', 'datas': dict(event_metric, **event)}
                    self.put_data(data)
                    break # 如果本事件类型(logobj)在本周期已经发生,则跳过本事件的其他关键字,开始下一个事件。避免重复发送

        self.__loginfo.cleanCollectData()


class LogInfo:
    def __init__(self, logger=None):
        self.__logObjs = {}
        self.__files_old = {}
        self.__buffer = {}
        self.__log_utils = LogUtils()
        self.__logger = logger if logger is not None else cutils.console_logger()

    def getCollectData(self):
        return self.__buffer

    def cleanCollectData(self):
        self.__buffer = {}

    def conf_refresh(self):
        try:
            file = open(CONFIG_FILE_PATH, 'r')
            retData = json.load(file)
            self.__logObjs = retData
        except ValueError, e:
            self.__logger.error("got a invalid json file")

    def active_checks(self):
        if not path.exists(TMP_PATH):
            os.mkdir(TMP_PATH)
        for obj_name, logobj in self.__logObjs.iteritems():
            try:
                self.__process_logrt(obj_name, logobj)
            except Exception, e:
                self.__logger.error(e)


    def __process_logrt(self, obj_name, log_obj):
        if 0 >= MAXLINES:
            return
        self.__buffer[obj_name] = {}
        tmp_file = path.join(TMP_PATH, obj_name.upper())# 某个指标的临时文件
        if not self.__files_old.has_key(obj_name):
            self.__files_old[obj_name] = self.__log_utils.read_old_fileinfos(tmp_file)
        files_old = self.__files_old[obj_name]
        # print files_old
        files_new = self.__log_utils.make_logfile_list(FILENAME)
        # print files_new
        num_old = len(files_old)
        num_new = len(files_new)
        if 0 < num_old and 0 < num_new:
            old2new = self.__log_utils.setup_old2new(files_old, num_old, files_new, num_new)
            for i in range(num_old):
                j = self.__log_utils.find_old2new(old2new, num_new, i)
                if -1 != j and 0 < files_old[i]['processed_size']:
                    files_new[j]['processed_size'] = files_old[i]['processed_size']
        elif 0 < num_new:
            for file in files_new:
                if file['size'] > LASTSIZE:
                    file['processed_size'] = file['size'] - LASTSIZE

        keywords = {}
        for keyword in log_obj['keywords']:
            self.__buffer[obj_name][keyword] = {}
            self.__buffer[obj_name][keyword]['count'] = 0
            self.__buffer[obj_name][keyword]['msg'] = [''] * MAXLINES

            if USERE:
                try:
                    keywords[keyword] = re.compile(keyword, re.IGNORECASE)
                except:
                    self.__logger.error(e)
            else:
                keywords[keyword] = keyword

        if USERE:
            find = lambda line, keyword: keyword.search(line)
        else:
            find = lambda line, keyword: -1 != (line.upper().find(keyword.upper()))
        for fileinfo in files_new:
            if fileinfo['processed_size'] != fileinfo['size']:
                try:
                    self.__process_log(self.__buffer[obj_name], fileinfo,
                                       keywords, MAXLINES, find)
                except Exception, e:
                    self.__logger.error(e)


        self.__files_old[obj_name] = files_new
        # print self.__files_old[obj_name]
        self.__log_utils.write_new_fileinfo(tmp_file, files_new)


    def __process_log(self, buffer, fileinfo, keywords, maxlines, func_find):
        with open(fileinfo['filename']) as file:
            file.seek(fileinfo['processed_size'])
            while self.__process_line(buffer, file, keywords, maxlines, func_find):
                pass
            fileinfo['processed_size'] = file.tell()


    def __process_line(self, buffer, file, keywords, maxlines, func_find):
        tmp_key = dict(keywords)
        line = file.readline(MAXSIZE).decode('utf-8')
        while line:
            for key, value in tmp_key.items():
                if func_find(line, value):
                    buffer[key]['msg'][buffer[key]['count'] % maxlines] = line
                    buffer[key]['count'] += 1
                    del tmp_key[key]
            if line[-1] == '\n':
                return True
            line = file.readline(MAXSIZE).decode('utf-8')
        else:
            return False


def test_LogInfo():
    log = LogInfo()
    start = time.time()
    log.conf_refresh()
    print "Refresh time: %s" % (time.time() - start)#the time of refresh cost
    start = time.time()
    log.active_checks()
    print "Check time: %s" % (time.time() - start)#the time of active_checks cost
    data = log.getCollectData()
    data = json.dumps(data)
    print data


def test_KernelEventCollector():
    coll = KernelEventCollector()
    coll.init()
    coll.do_collect()
    coll.dump_data()


if __name__ == '__main__':
    # while 1:
    #     test_LogInfo()
    #     time.sleep(10)
    test_KernelEventCollector()