File: //usr/local/qcloud/monitor/barad/plugin/collector/vm/cfs_long_frequency_collector.py
#!/usr/local/qcloud/monitor/python26/bin/python
# -*- coding: utf-8 -*-
import sys
import os
import threading
import re
import time
from collections import defaultdict
import urllib2
import socket
from contextlib import closing
sys.path.append(os.path.dirname(os.path.realpath(__file__)) + '/../../../../comm/')
sys.path.append(os.path.dirname(os.path.realpath(__file__)) + '/../../../base/')
sys.path.append(os.path.dirname(os.path.realpath(__file__)) + '/../../utils/')
import constant
from plugin_base import VmBaseCollector
from utils.metric_handler import MetricHandler
from cutils import CommUtils, generate_config
class CfsLongFrequencyCollector(VmBaseCollector):
META_URL = generate_config(constant.PLUGIN_CONFIG_PATH, "MetaDataServer")["metadata_url"]
VM_APPID = ''
APPID_MUTEX = threading.Lock()
VM_VPCID = ''
VPCID_MUTEX = threading.Lock()
def init(self):
self.set_frequency(60)
self.handler = MetricHandler()
self.handler.dimensions = ['uuid', 'cfsvip', 'mountdir']
self.stat_mount_point_collector = None
self.lustre_obd_device_collector = None
self.lustre_log_filter_collector = None
self.check_network_collector = None
@staticmethod
def getset_vm_appid():
if CfsLongFrequencyCollector.VM_APPID != '':
return CfsLongFrequencyCollector.VM_APPID
with CfsLongFrequencyCollector.APPID_MUTEX:
if CfsLongFrequencyCollector.VM_APPID != '':
return CfsLongFrequencyCollector.VM_APPID
try:
url = CfsLongFrequencyCollector.META_URL + 'app-id'
response = urllib2.urlopen(url, timeout=2)
content = response.read()
if content:
CfsLongFrequencyCollector.VM_APPID = content
except Exception as e:
CfsLongFrequencyCollector.VM_APPID = ''
return CfsLongFrequencyCollector.VM_APPID
@staticmethod
def getset_vm_vpcid():
if CfsLongFrequencyCollector.VM_VPCID != '':
return CfsLongFrequencyCollector.VM_VPCID
with CfsLongFrequencyCollector.VPCID_MUTEX:
if CfsLongFrequencyCollector.VM_VPCID != '':
return CfsLongFrequencyCollector.VM_VPCID
try:
url = CfsLongFrequencyCollector.META_URL + 'mac'
response = urllib2.urlopen(url, timeout=2)
content = response.read()
mac = content
if not mac:
CfsLongFrequencyCollector.VM_VPCID = ''
url = CfsLongFrequencyCollector.META_URL + 'network/interfaces/macs/' + mac + '/vpc-id'
response = urllib2.urlopen(url, timeout=2)
content = response.read()
if content:
CfsLongFrequencyCollector.VM_VPCID = content
except Exception:
CfsLongFrequencyCollector.VM_VPCID = ''
return CfsLongFrequencyCollector.VM_VPCID
def do_collect(self):
if self.stat_mount_point_collector is None:
vm_uuid = self.get_vm_uuid()
appid = self.getset_vm_appid()
vpcid = self.getset_vm_vpcid()
vmip = self.get_vmip()
if self.stat_mount_point_collector is None:
self.stat_mount_point_collector = CfsAdaptorStatMountPoint(vm_uuid, appid, vpcid, vmip)
if self.lustre_obd_device_collector is None:
self.lustre_obd_device_collector = CfsAdaptorLusterObdDevice(vm_uuid, appid, vpcid, vmip)
if self.lustre_log_filter_collector is None:
self.lustre_log_filter_collector = CfsAdaptorLusterLogFilter(vm_uuid, appid, vpcid, vmip)
if self.check_network_collector is None:
self.check_network_collector = CfsAdaptorCheckNetwork(vm_uuid, appid, vpcid, vmip)
self.stat_mount_point_collector.do_collect()
self.lustre_obd_device_collector.do_collect()
self.lustre_log_filter_collector.do_collect()
self.check_network_collector.do_collect()
for item in self.stat_mount_point_collector.do_dump_data():
self.put_data(item)
for item in self.lustre_obd_device_collector.do_dump_data():
self.put_data(item)
for item in self.lustre_log_filter_collector.do_dump_data():
self.put_data(item)
for item in self.check_network_collector.do_dump_data():
self.put_data(item)
class CfsMonitorStatMountPoint(object):
def __init__(self):
self.mountstats = CfsMonitorMountInfo()
def stat_mount_point(self):
result = dict()
# when one cfs mounted by multi mount points, just stat once
stat_result = dict()
mount_info_list = self.mountstats.nfs_mount_info + self.mountstats.lustre_mount_info
if not mount_info_list:
return result
for mount_info in mount_info_list:
fstype = mount_info['filesystem_type']
mount_point = mount_info['mount_point']
if "nfs" in mount_info['filesystem_type']:
cfsvip = mount_info['mount_source'].split(':')[0]
elif "lustre" == mount_info['filesystem_type']:
cfsvip = mount_info['mount_source'].split(':')[0]
if "@" in cfsvip:
cfsvip = cfsvip.split("@")[0]
else:
continue
history_result = stat_result.get((cfsvip, fstype), None)
if history_result is not None:
result[(mount_point, cfsvip)] = history_result
continue
cmd = "ps aux | grep 'stat %s' | grep -v 'grep' | wc -l" % mount_point
output = CommUtils.ExecuteTimeoutCommand(cmd, 3).splitlines()
if not output:
stat_result[(cfsvip, fstype)] = 0
result[(mount_point, cfsvip)] = 0
continue
cnt = int(output[0])
if cnt >= 1:
stat_result[(cfsvip, fstype)] = cnt
result[(mount_point, cfsvip)] = cnt
else:
cmd = "stat %s" % mount_point
output = CommUtils.ExecuteTimeoutCommand(cmd, 3)
if output:
stat_result[(cfsvip, fstype)] = 0
result[(mount_point, cfsvip)] = 0
else:
stat_result[(cfsvip, fstype)] = 1
result[(mount_point, cfsvip)] = 1
return result
class CfsAdaptorStatMountPoint(object):
"""
stat mount points and generate data in barad format
使用stat探测挂载点,并生成barad格式
"""
name = "stat_mount_point_adaptor"
def __init__(self, vm_uuid, app_id, vpc_id, vm_ip):
super(CfsAdaptorStatMountPoint, self).__init__()
self.handler = MetricHandler()
self.barad_data = dict()
self.vm_uuid = vm_uuid
self.app_id = app_id
self.vpc_id = vpc_id
self.vm_ip = vm_ip
def do_collect(self):
monitor = CfsMonitorStatMountPoint()
self.barad_data = monitor.stat_mount_point()
def do_dump_data(self):
result = list()
now = int(time.time())
self.handler.namespace = 'qce/cfs'
self.handler.dimensions = [
'uuid', 'cfsvip', 'mountdir', 'vmip', 'appid', 'vpcid']
for mount_info, stat_data in self.barad_data.items():
mountdir = mount_info[0]
cfsvip = mount_info[1]
dimensions = {
'uuid': self.vm_uuid,
'cfsvip': cfsvip,
'mountdir': mountdir,
'vmip': self.vm_ip,
'appid': self.app_id,
'vpcid': self.vpc_id
}
if stat_data != 0:
issucc = 0
else:
issucc = 1
batch_metric = [
{
'name': 'issucc',
'value': int(issucc)
}
]
self.handler.add_batch_metric(
batch=batch_metric,
dimensions=dimensions,
timestamp=now
)
if self.handler.get_metrics():
data = {
'sender': 'cfs_sender',
'datas': self.handler.pop_metrics()
}
result.append(data)
return result
class CfsAdaptorLusterObdDevice(object):
"""
collect lustre obd device and generate data in barad format
收集obd device信息,并生成barad格式
"""
name = "lustre_obd_device_adaptor"
def __init__(self, vm_uuid, app_id, vpc_id, vm_ip):
super(CfsAdaptorLusterObdDevice, self).__init__()
self.handler = MetricHandler()
self.obd_device_data = list()
self.vm_uuid = vm_uuid
self.app_id = app_id
self.vpc_id = vpc_id
self.vm_ip = vm_ip
def do_collect(self):
self.obd_device_data = CfsMonitorLustreObdDevice.get_obd_device_data()
def do_dump_data(self):
data = list()
obd_device_count_data = self.dump_obd_count_data()
if obd_device_count_data:
data += obd_device_count_data
return data
def dump_obd_count_data(self):
result = list()
now = int(time.time())
self.handler.namespace = 'qce/cfs'
self.handler.dimensions = [
'uuid', 'vmip', 'appid', 'vpcid']
if self.obd_device_data:
dimensions = {
'uuid': self.vm_uuid,
'vmip': self.vm_ip,
'appid': self.app_id,
'vpcid': self.vpc_id
}
batch_metric = [
{
'name': 'lustre_obd_device_count',
'value': int(len(self.obd_device_data))
}
]
self.handler.add_batch_metric(
batch=batch_metric,
dimensions=dimensions,
timestamp=now
)
if self.handler.get_metrics():
data = {
'sender': 'cfs_sender',
'datas': self.handler.pop_metrics()
}
result.append(data)
return result
class CfsMonitorLustreObdDevice(object):
"""
obd device monitor
"""
@classmethod
def get_obd_device_data(cls, obd_device_file="/sys/kernel/debug/lustre/devices"):
"""
since obd devices file has no more than 8192 lines, just read all lines
return: none if "/sys/kernel/debug/lustre/devices" is not exist
lines if if "/sys/kernel/debug/lustre/devices" is exist
"""
if not os.path.exists(obd_device_file):
return None
lines = []
with open(obd_device_file, "r") as f:
lines = f.readlines()
return lines
class CfsAdaptorLusterLogFilter(object):
"""
filte lustre log and generate data in barad format
过滤lustre日志信息,并生成barad格式
"""
name = "lustre_log_filter_adaptor"
def __init__(self, vm_uuid, app_id, vpc_id, vm_ip):
super(CfsAdaptorLusterLogFilter, self).__init__()
self.handler = MetricHandler()
self.err_monitor = CfsMonitorLustreLogFilter()
self.err_data = dict
self.vm_uuid = vm_uuid
self.app_id = app_id
self.vpc_id = vpc_id
self.vm_ip = vm_ip
def do_collect(self):
self.err_data = self.err_monitor.get_errors_from_messages_in_last_1min()
def do_dump_data(self):
data = list()
err_count_data = self.dump_err_count_data()
if err_count_data:
data += err_count_data
return data
def dump_err_count_data(self):
result = list()
now = int(time.time())
self.handler.namespace = 'qce/cfs'
self.handler.dimensions = [
'uuid', 'vmip', 'appid', 'vpcid', 'error_name', 'func_name', 'server_ip', 'target_name', 'fs_name']
if self.err_data:
for err_name, err_list in self.err_data.items():
tmp_result = defaultdict(int)
err_log_list = err_list
# 暂时去除func_name维度上报,减少上报量
group_name = CfsMonitorLustreLogFilter.ERR_REG_MAP[err_name]['group_name']
if 'func_name' in group_name:
i = group_name.index('func_name')
err_log_list = list()
for err in err_list:
tmp_list = list(err[0])
tmp_list[i] = 'NA'
err_log_list.append((tuple(tmp_list),))
for err in err_log_list:
err_dimensions = err[0]
tmp_result[err_dimensions] += 1;
for err_dimensions, err_count in tmp_result.items():
group_name = CfsMonitorLustreLogFilter.ERR_REG_MAP[err_name]['group_name']
err_dimensions_dict = zip(group_name, err_dimensions)
dimensions = {
'uuid': self.vm_uuid,
'vmip': self.vm_ip,
'appid': self.app_id,
'vpcid': self.vpc_id,
'error_name': err_name,
'func_name': 'NA',
'server_ip': 'NA',
'target_name': 'NA',
'fs_name': 'NA',
}
dimensions.update(err_dimensions_dict)
batch_metric = [
{
'name': 'lustre_log_error_count',
'value': int(err_count)
}
]
self.handler.add_batch_metric(
batch=batch_metric,
dimensions=dimensions,
timestamp=now
)
if self.handler.get_metrics():
data = {
'sender': 'cfs_sender',
'datas': self.handler.pop_metrics()
}
result.append(data)
return result
class CfsMonitorLustreLogFilter(object):
"""
log filter monitor
"""
MAX_LOG_FILE_SIZE = 5*1024*1024
G_FUNC_NAME = 'func_name'
G_IP = 'server_ip'
G_TARGET_NAME = 'target_name'
G_FS_NAME = 'fs_name'
ERR_REG_MAP = {
'cfs_turbo_lnet_error': {
'reg': r'.*\s*LNetError:.*?(\d+\.\d+\.\d+\.\d+)@tcp.*',
'group_name': [G_IP]
},
'cfs_turbo_client_evict': {
'reg': r'.*\s*LustreError:.*?This\sclient\swas\sevicted\sby\s(.*)-(.*);.*',
'group_name': [G_FS_NAME, G_TARGET_NAME]
},
'cfs_turbo_req_timeout': {
'reg': r'.*\s*Lustre:.*\(.*:(.*)\(\)\).*Request\ssent\shas\stimed\sout\sfor.*->(.*?)-(.*?)-.*',
'group_name': [G_FUNC_NAME, G_FS_NAME, G_TARGET_NAME]
},
'cfs_turbo_target_error': {
'reg': r'.*\s*LustreError:.*\s(.*?)-(.*?)-.*rc\s=.*',
'group_name': [G_FS_NAME, G_TARGET_NAME]
},
'cfs_turbo_fs_error': {
'reg': r'.*\s*LustreError:.*\(.*:(.*)\(\)\)\s*(.*?):.*rc\s*=.*',
'group_name': [G_FUNC_NAME, G_FS_NAME]
},
}
def __init__(self):
self.log_path = self.get_log_path()
self.compiled_reg_map = self.get_compiled_reg_map()
def is_lustre_mounted(self, mountinfo_path='/proc/self/mountinfo'):
if not os.path.exists(mountinfo_path):
return False
for line in open(mountinfo_path):
if line.split(' ')[-3] == 'lustre':
return True
return False
def get_compiled_reg_map(self):
compiled_reg_map = dict()
for key, item in self.ERR_REG_MAP.items():
compiled_reg_map[key] = {
'reg': re.compile(item['reg']),
'group_name': item['group_name']
}
return compiled_reg_map
def get_log_path(self):
if os.path.exists('/var/log/messages'):
# normal case: centos,tencentos,etc.
return '/var/log/messages'
elif os.path.exists('/var/log/kern.log'):
# ubuntu
return '/var/log/kern.log'
else:
# skip this client
return None
def gen_last_1min_date_str(self):
last_1min = time.strftime("%b %d %H:%M", time.localtime(time.time()-60))
# change 'Jan 06 20:25:43' to 'Jan 6 20:25:43'
if last_1min[4] == 0:
last_1min = last_1min[:4]+" "+last_1min[5:]
return last_1min
def get_errors_from_messages_in_last_1min(self):
"""
get last 1 min lustre error from linux messages and upload in class
从前一分钟的messages里筛选lustre错误日志并分类上报
return: {
'cfs_turbo_lnet_error':[
(
('10.0.0.1','abcde'),
)
]
}
"""
result = defaultdict(list)
if not self.is_lustre_mounted():
return result
if self.log_path is None:
return result
max_line_count = 5000
with open(self.log_path) as f:
# 截取最后MAX_LOG_FILE_SIZE, 避免读取内容过多
log_size = os.path.getsize(self.log_path)
if log_size > self.MAX_LOG_FILE_SIZE:
f.seek(log_size - self.MAX_LOG_FILE_SIZE)
lines = f.readlines()
last_1min_str = self.gen_last_1min_date_str()
index = 0
for line in reversed(lines):
index += 1
if index > max_line_count:
break
if line.startswith(last_1min_str):
for error_name, error_item in self.compiled_reg_map.items():
reg_result = error_item['reg'].match(line)
if reg_result:
# 插入一个元组,后续需要原文的时候可以在这个元组里再加原文
result[error_name].append((reg_result.groups(),))
# no more reg check needed
break
return result
class CfsMonitorMountInfo(object):
"""
parse mount info
see https://man7.org/linux/man-pages/man5/proc.5.html#:~:text=/proc/%5Bpid%5D/mountinfo%20(since%20Linux%202.6.26)
"""
MOUNT_INFO_KEY_BEFORE_SEPARATOR = [
"mount_id",
"parent_id",
"major:minor",
"root",
"mount_point",
"mount_option",
"optional_fields",
]
MOUNT_INFO_KEY_AFTER_SEPARATOR = [
"filesystem_type",
"mount_source",
"super_options",
]
def __init__(self):
self.mount_info = list()
self.nfs_mount_info = list()
self.lustre_mount_info = list()
self._parse_mount_info()
def _parse_mount_info(self, mountinfo_path='/proc/self/mountinfo'):
if not os.path.exists(mountinfo_path):
return
for line in open(mountinfo_path):
mount_info_item = dict()
line_items = line.split('-')
if len(line_items) != 2:
continue
line_before = line_items[0].strip()
line_after = line_items[1].strip()
# '-'之前的optional_field数量不定,要分开处理
line_before_items = line_before.split(" ", 6)
if len(line_before_items) == 6:
line_before_items.append("")
if len(line_before_items) != 7:
continue
mount_info_item.update(zip(self.MOUNT_INFO_KEY_BEFORE_SEPARATOR, line_before_items))
# 处理后半
line_after_items = line_after.split(" ")
if len(line_after_items) != 3:
continue
mount_info_item.update(zip(self.MOUNT_INFO_KEY_AFTER_SEPARATOR, line_after_items))
self.mount_info.append(mount_info_item)
if 'nfs' in mount_info_item['filesystem_type']:
self.nfs_mount_info.append(mount_info_item)
if mount_info_item['filesystem_type'] == 'lustre':
self.lustre_mount_info.append(mount_info_item)
class CfsAdaptorCheckNetwork(object):
"""
check if cfs vip is available
检查CFS VIP是否可用, 并生成barad格式
"""
name = "check_network_adaptor"
def __init__(self, vm_uuid, app_id, vpc_id, vm_ip):
super(CfsAdaptorCheckNetwork, self).__init__()
self.handler = MetricHandler()
self.barad_data = dict()
self.vm_uuid = vm_uuid
self.app_id = app_id
self.vpc_id = vpc_id
self.vm_ip = vm_ip
def do_collect(self):
monitor = CfsMonitorCheckNetwork()
self.barad_data = monitor.check_network()
def do_dump_data(self):
result = list()
now = int(time.time())
self.handler.namespace = 'qce/cfs'
self.handler.dimensions = [
'uuid', 'cfsvip', 'mountdir', 'vmip', 'appid', 'vpcid']
for mount_info, check_network_data in self.barad_data.items():
mountdir = mount_info[0]
cfsvip = mount_info[1]
dimensions = {
'uuid': self.vm_uuid,
'cfsvip': cfsvip,
'mountdir': mountdir,
'vmip': self.vm_ip,
'appid': self.app_id,
'vpcid': self.vpc_id
}
batch_metric = [
{
'name': 'net_issucc',
'value': int(check_network_data)
}
]
self.handler.add_batch_metric(
batch=batch_metric,
dimensions=dimensions,
timestamp=now
)
if self.handler.get_metrics():
data = {
'sender': 'cfs_sender',
'datas': self.handler.pop_metrics()
}
result.append(data)
return result
class CfsMonitorCheckNetwork(object):
def __init__(self):
self.mountstats = CfsMonitorMountInfo()
def check_network(self):
result = dict()
# when one cfs mounted by multi mount points, just check once
check_result = dict()
mount_info_list = self.mountstats.nfs_mount_info + self.mountstats.lustre_mount_info
for mount_info in mount_info_list:
fstype = mount_info['filesystem_type']
mount_point = mount_info['mount_point']
if "nfs" in mount_info['filesystem_type']:
cfsvip = mount_info['mount_source'].split(':')[0]
cfsport = 2049
elif "lustre" == mount_info['filesystem_type']:
cfsvip = mount_info['mount_source'].split(':')[0]
if "@" in cfsvip:
cfsvip = cfsvip.split("@")[0]
cfsport = 988
else:
continue
if not self.validate_ipv4(cfsvip):
continue
history_result = check_result.get((cfsvip, fstype), None)
if history_result is not None:
result[(mount_point, cfsvip)] = history_result
continue
if self.check_socket(cfsvip, cfsport):
check_result[(cfsvip, fstype)] = 1
result[(mount_point, cfsvip)] = 1
else:
check_result[(cfsvip, fstype)] = 0
result[(mount_point, cfsvip)] = 0
return result
def check_socket(self, host, port):
with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
sock.settimeout(3)
try:
if sock.connect_ex((host, port)) == 0:
return True
else:
return False
except socket.error as e:
return False
def validate_ipv4(self, s):
a = s.split('.')
if len(a) != 4:
return False
for x in a:
if not x.isdigit():
return False
i = int(x)
if i < 0 or i > 255:
return False
return True
def main():
collector = CfsLongFrequencyCollector()
collector.init()
collector.collect()
collector.dump_data()
if __name__ == '__main__':
main()