File: //usr/local/qcloud/monitor/barad/plugin/collector/metal/kernelevent.py
# -*- coding: utf-8 -*-
import sys
import time
import random
import re
import urllib2
import json
import os
from os import path
sys.path.append(os.getcwd() + '/../../../comm/')
# sys.path.append('..')
# sys.path.append('../../base/')
import constant
from utils.log_utils import LogUtils
from utils.metric_handler import MetricHandler
from plugin_base import VmBaseCollector, HostBaseCollector
import cutils
USERE = False #是否使用正则表达式
INTERVAL = 10 # 默认的收集间隔
MAXLINES = 20 # 每次扫描默认读取的最大异常事件数
FILENAME = '/var/log/messages' #扫描的日志文件名
CONF_REFRESH = 240 # 配置更新周期
TMP_PATH = os.getcwd() + '/../tmp/' # 临时文件目录
CONFIG_FILE_PATH = os.getcwd() + '/../etc/kernel_event_config.json'# 配置文件目录
MAXSIZE = 65536 # 每条日志记录的处理缓冲区大小
LASTSIZE = 5120 # 文件尾部
class KernelEventCollector(VmBaseCollector):
def init(self):
self.set_frequency(10)
self.__nextrefresh = 0 #更新配置的时间间隔
self.__nextcheck = 0 #收集日志的时间间隔
self.__loginfo = LogInfo(self.logger())
self.__lanip = cutils.local_ip()
def do_collect(self):
if time.time() > self.__nextrefresh:
try:
self.__loginfo.conf_refresh()
# 随机分配各个agent的更新时间,以达到分散请求的目的
self.__nextrefresh = time.time() + CONF_REFRESH + int(random.uniform(0, 60))
except Exception, e:
self.logger().error(e)
self.__nextrefresh = time.time() + 60
if time.time() > self.__nextcheck:
try:
self.__loginfo.active_checks() # 日志的收集
self.__nextcheck = time.time() + INTERVAL
self.__send_data()
except Exception, e:
self.logger().error(e)
self.__nextcheck = time.time() + 10
def __send_data(self):
data = self.__loginfo.getCollectData()
if 0 == len(data):
return
timestamp = int(time.time())
for logobj,keywords in data.iteritems():
for keyword, keyword_count in keywords.iteritems():
self.logger().info("[logobj] = "+ str(logobj) + " [keyword] = " + str(keyword) + " [keyword_count] = " + str(keyword_count['count']))
if 0 < keyword_count['count']:
additionalMsg = []
dim = [
{"key": "uuid", "value": self.get_vm_uuid()}
]
event_metric = {"version": 1, "Action": "SendEventAlarm", "caller": "cvm", "callee": "QCMonitor",
"productName": "cvm", "timestamp": timestamp, "occurTime": timestamp,
"dimensions": dim}
if str(logobj) == "guest_oom":
event = {"eventName": "guest_oom", "status": 1, "additionalMsg": additionalMsg}
else:
event = {"eventName": "guest_core_error", "status": 1, "additionalMsg": additionalMsg}
data = {'sender': 'event_sender', 'datas': dict(event_metric, **event)}
self.put_data(data)
break # 如果本事件类型(logobj)在本周期已经发生,则跳过本事件的其他关键字,开始下一个事件。避免重复发送
self.__loginfo.cleanCollectData()
class LogInfo:
def __init__(self, logger=None):
self.__logObjs = {}
self.__files_old = {}
self.__buffer = {}
self.__log_utils = LogUtils()
self.__logger = logger if logger is not None else cutils.console_logger()
def getCollectData(self):
return self.__buffer
def cleanCollectData(self):
self.__buffer = {}
def conf_refresh(self):
try:
file = open(CONFIG_FILE_PATH, 'r')
retData = json.load(file)
self.__logObjs = retData
except ValueError, e:
self.__logger.error("got a invalid json file")
def active_checks(self):
if not path.exists(TMP_PATH):
os.mkdir(TMP_PATH)
for obj_name, logobj in self.__logObjs.iteritems():
try:
self.__process_logrt(obj_name, logobj)
except Exception, e:
self.__logger.error(e)
def __process_logrt(self, obj_name, log_obj):
if 0 >= MAXLINES:
return
self.__buffer[obj_name] = {}
tmp_file = path.join(TMP_PATH, obj_name.upper())# 某个指标的临时文件
if not self.__files_old.has_key(obj_name):
self.__files_old[obj_name] = self.__log_utils.read_old_fileinfos(tmp_file)
files_old = self.__files_old[obj_name]
# print files_old
files_new = self.__log_utils.make_logfile_list(FILENAME)
# print files_new
num_old = len(files_old)
num_new = len(files_new)
if 0 < num_old and 0 < num_new:
old2new = self.__log_utils.setup_old2new(files_old, num_old, files_new, num_new)
for i in range(num_old):
j = self.__log_utils.find_old2new(old2new, num_new, i)
if -1 != j and 0 < files_old[i]['processed_size']:
files_new[j]['processed_size'] = files_old[i]['processed_size']
elif 0 < num_new:
for file in files_new:
if file['size'] > LASTSIZE:
file['processed_size'] = file['size'] - LASTSIZE
keywords = {}
for keyword in log_obj['keywords']:
self.__buffer[obj_name][keyword] = {}
self.__buffer[obj_name][keyword]['count'] = 0
self.__buffer[obj_name][keyword]['msg'] = [''] * MAXLINES
if USERE:
try:
keywords[keyword] = re.compile(keyword, re.IGNORECASE)
except:
self.__logger.error(e)
else:
keywords[keyword] = keyword
if USERE:
find = lambda line, keyword: keyword.search(line)
else:
find = lambda line, keyword: -1 != (line.upper().find(keyword.upper()))
for fileinfo in files_new:
if fileinfo['processed_size'] != fileinfo['size']:
try:
self.__process_log(self.__buffer[obj_name], fileinfo,
keywords, MAXLINES, find)
except Exception, e:
self.__logger.error(e)
self.__files_old[obj_name] = files_new
# print self.__files_old[obj_name]
self.__log_utils.write_new_fileinfo(tmp_file, files_new)
def __process_log(self, buffer, fileinfo, keywords, maxlines, func_find):
with open(fileinfo['filename']) as file:
file.seek(fileinfo['processed_size'])
while self.__process_line(buffer, file, keywords, maxlines, func_find):
pass
fileinfo['processed_size'] = file.tell()
def __process_line(self, buffer, file, keywords, maxlines, func_find):
tmp_key = dict(keywords)
line = file.readline(MAXSIZE).decode('utf-8')
while line:
for key, value in tmp_key.items():
if func_find(line, value):
buffer[key]['msg'][buffer[key]['count'] % maxlines] = line
buffer[key]['count'] += 1
del tmp_key[key]
if line[-1] == '\n':
return True
line = file.readline(MAXSIZE).decode('utf-8')
else:
return False
def test_LogInfo():
log = LogInfo()
start = time.time()
log.conf_refresh()
print "Refresh time: %s" % (time.time() - start)#the time of refresh cost
start = time.time()
log.active_checks()
print "Check time: %s" % (time.time() - start)#the time of active_checks cost
data = log.getCollectData()
data = json.dumps(data)
print data
def test_KernelEventCollector():
coll = KernelEventCollector()
coll.init()
coll.do_collect()
coll.dump_data()
if __name__ == '__main__':
# while 1:
# test_LogInfo()
# time.sleep(10)
test_KernelEventCollector()