File: //usr/local/qcloud/monitor/barad/plugin/collector/vm/cfs_short_frequency_collector.py
#!/usr/local/qcloud/monitor/python26/bin/python
# -*- coding: utf-8 -*-
import sys
import os
import urllib2
import threading
import time
import pickle
from copy import deepcopy
sys.path.append(os.path.dirname(os.path.realpath(__file__)) + '/../../../../comm/')
sys.path.append(os.path.dirname(os.path.realpath(__file__)) + '/../../../base/')
sys.path.append(os.path.dirname(os.path.realpath(__file__)) + '/../../utils/')
import constant
from cutils import generate_config
from plugin_base import VmBaseCollector
from utils.metric_handler import MetricHandler
def difference(x, y):
"""Used for a map() function
"""
return x - y
FREQUENCY = 10
NfsEventCounters = [
'inoderevalidates',
'dentryrevalidates',
'datainvalidates',
'attrinvalidates',
'vfsopen',
'vfslookup',
'vfspermission',
'vfsupdatepage',
'vfsreadpage',
'vfsreadpages',
'vfswritepage',
'vfswritepages',
'vfsreaddir',
'vfssetattr',
'vfsflush',
'vfsfsync',
'vfslock',
'vfsrelease',
'congestionwait',
'setattrtrunc',
'extendwrite',
'sillyrenames',
'shortreads',
'shortwrites',
'delay',
'pnfsreads',
'pnfswrites'
]
NfsByteCounters = [
'normalreadbytes',
'normalwritebytes',
'directreadbytes',
'directwritebytes',
'serverreadbytes',
'serverwritebytes',
'readpages',
'writepages'
]
XprtUdpCounters = [
'port',
'bind_count',
'rpcsends',
'rpcreceives',
'badxids',
'inflightsends',
'backlogutil'
]
XprtTcpCounters = [
'port',
'bind_count',
'connect_count',
'connect_time',
'idle_time',
'rpcsends',
'rpcreceives',
'badxids',
'inflightsends',
'backlogutil',
'max_slots',
'sending_queue',
'pending_queue'
]
XprtRdmaCounters = [
'port',
'bind_count',
'connect_count',
'connect_time',
'idle_time',
'rpcsends',
'rpcreceives',
'badxids',
'backlogutil',
'read_chunks',
'write_chunks',
'reply_chunks',
'total_rdma_req',
'total_rdma_rep',
'pullup',
'fixup',
'hardway',
'failed_marshal',
'bad_reply'
]
CumulativeXprtCounters = [
'rpcsends',
'rpcreceives',
'badxids',
'backlogutil',
'inflightsends',
'sending_queue',
'pending_queue'
]
Nfsv3ops = [
'NULL',
'GETATTR',
'SETATTR',
'LOOKUP',
'ACCESS',
'READLINK',
'READ',
'WRITE',
'CREATE',
'MKDIR',
'SYMLINK',
'MKNOD',
'REMOVE',
'RMDIR',
'RENAME',
'LINK',
'READDIR',
'READDIRPLUS',
'FSSTAT',
'FSINFO',
'PATHCONF',
'COMMIT'
]
Nfsv4ops = [
'NULL',
'READ',
'WRITE',
'COMMIT',
'OPEN',
'OPEN_CONFIRM',
'OPEN_NOATTR',
'OPEN_DOWNGRADE',
'CLOSE',
'SETATTR',
'FSINFO',
'RENEW',
'SETCLIENTID',
'SETCLIENTID_CONFIRM',
'LOCK',
'LOCKT',
'LOCKU',
'ACCESS',
'GETATTR',
'LOOKUP',
'LOOKUP_ROOT',
'REMOVE',
'RENAME',
'LINK',
'SYMLINK',
'CREATE',
'PATHCONF',
'STATFS',
'READLINK',
'READDIR',
'SERVER_CAPS',
'DELEGRETURN',
'GETACL',
'SETACL',
'FS_LOCATIONS',
'RELEASE_LOCKOWNER',
'SECINFO',
'FSID_PRESENT',
'EXCHANGE_ID',
'CREATE_SESSION',
'DESTROY_SESSION',
'SEQUENCE',
'GET_LEASE_TIME',
'RECLAIM_COMPLETE',
'LAYOUTGET',
'GETDEVICEINFO',
'LAYOUTCOMMIT',
'LAYOUTRETURN',
'SECINFO_NO_NAME',
'TEST_STATEID',
'FREE_STATEID',
'GETDEVICELIST',
'BIND_CONN_TO_SESSION',
'DESTROY_CLIENTID'
]
class CfsShortFrequencyCollector(VmBaseCollector):
META_URL = generate_config(constant.PLUGIN_CONFIG_PATH, "MetaDataServer")["metadata_url"]
VM_APPID = ''
APPID_MUTEX = threading.Lock()
VM_VPCID = ''
VPCID_MUTEX = threading.Lock()
def init(self):
self.set_frequency(FREQUENCY)
self.handler = MetricHandler()
self.handler.dimensions = ['uuid', 'cfsvip', 'mountdir']
self.mountstats_collector = None
self.connect_stat = {
'CLOSED': 0,
'LISTEN': 1,
'SYN_SENT': 2,
'SYN_RCVD': 3,
'ESTABLISHED': 4,
'CLOSE_WAIT': 5,
'LAST_ACK': 6,
'FIN_WAIT_1': 7,
'FIN_WAIT_2': 8,
'CLOSING': 9,
'TIME_WAIT': 10,
'OTHER': 110
}
@staticmethod
def getset_vm_appid():
if CfsShortFrequencyCollector.VM_APPID != '':
return CfsShortFrequencyCollector.VM_APPID
with CfsShortFrequencyCollector.APPID_MUTEX:
if CfsShortFrequencyCollector.VM_APPID != '':
return CfsShortFrequencyCollector.VM_APPID
try:
url = CfsShortFrequencyCollector.META_URL + 'app-id'
response = urllib2.urlopen(url, timeout=2)
content = response.read()
if content:
CfsShortFrequencyCollector.VM_APPID = content
except Exception as e:
CfsShortFrequencyCollector.VM_APPID = ''
return CfsShortFrequencyCollector.VM_APPID
@staticmethod
def getset_vm_vpcid():
if CfsShortFrequencyCollector.VM_VPCID != '':
return CfsShortFrequencyCollector.VM_VPCID
with CfsShortFrequencyCollector.VPCID_MUTEX:
if CfsShortFrequencyCollector.VM_VPCID != '':
return CfsShortFrequencyCollector.VM_VPCID
try:
url = CfsShortFrequencyCollector.META_URL + 'mac'
response = urllib2.urlopen(url, timeout=2)
content = response.read()
mac = content
if not mac:
CfsShortFrequencyCollector.VM_VPCID = ''
url = CfsShortFrequencyCollector.META_URL + 'network/interfaces/macs/' + mac + '/vpc-id'
response = urllib2.urlopen(url, timeout=2)
content = response.read()
if content:
CfsShortFrequencyCollector.VM_VPCID = content
except Exception:
CfsShortFrequencyCollector.VM_VPCID = ''
return CfsShortFrequencyCollector.VM_VPCID
def do_collect(self):
if self.mountstats_collector is None:
vm_uuid = self.get_vm_uuid()
appid = self.getset_vm_appid()
vpcid = self.getset_vm_vpcid()
vmip = self.get_vmip()
self.mountstats_collector = CfsAdaptorMountStats(vm_uuid, appid, vpcid, vmip, FREQUENCY)
self.mountstats_collector.do_collect()
for item in self.mountstats_collector.do_dump_data():
self.put_data(item)
class CfsAdaptorMountStats(object):
"""
save latest mountstats data, and calc RTT per request, throughout per second etc..
保留上一次mountstat,以此计算每请求延迟,每秒吞吐等
"""
name = "mountstats_adaptor"
last_device_dict = None
def __init__(self, vm_uuid, app_id, vpc_id, vm_ip, gap, use_class_variable=True):
"""
use_class_variable为True时,使用类变量保存历史信息,适合像barad_agent这样的常驻进程
use_class_variable为False时,使用本地文件保存历史信息,适合使用crontab定期调用的方式
"""
super(CfsAdaptorMountStats, self).__init__()
self.handler = MetricHandler()
self.barad_data = dict()
self.vm_uuid = vm_uuid
self.app_id = app_id
self.vpc_id = vpc_id
self.vm_ip = vm_ip
self.use_class_variable = use_class_variable
self.gap = gap
def do_collect(self):
"""
collect mount stats data
"""
last_mount_stats_file = "last_mountstats.data"
old_device_dict = None
if self.use_class_variable:
if CfsAdaptorMountStats.last_device_dict:
old_device_dict = CfsAdaptorMountStats.last_device_dict
else:
if os.path.exists(last_mount_stats_file):
with open(last_mount_stats_file, "r") as f:
old_device_dict = pickle.load(f)
new_mount_stats = CfsMonitorMountStats()
new_mount_stats.parse_stats_from_path()
new_device_dict = new_mount_stats.nfs_device_dict
self.barad_data = dict()
for mount_point, new_device_data in new_device_dict.items():
# only collect nfs device
if not new_device_data.is_nfs_mountpoint():
continue
vip = new_device_data.nfs_data['export'].split(":")[0]
fstype = new_device_data.nfs_data['fstype']
# when one cfs mounted by multi mount points, just upload once
if self.barad_data.get((vip, fstype)):
continue
# if no history data,use None for defalut
self.barad_data[(vip, fstype)] = None
if old_device_dict:
old_device_data = old_device_dict.get(mount_point)
if old_device_data:
result = new_device_data - old_device_data
self.barad_data[(vip, fstype)] = self.calc_avg(result)
if self.use_class_variable:
CfsAdaptorMountStats.last_device_dict = new_device_dict
else:
with open(last_mount_stats_file, "w") as f:
pickle.dump(new_device_dict, f)
def deal_op_data(self, data):
data[3] = float(data[3]) / self.gap
data[4] = float(data[4]) / self.gap
if data[0] != 0:
data[5] = float(data[5]) / data[0]
data[6] = float(data[6]) / data[0]
data[7] = float(data[7]) / data[0]
else:
data[5] = 0
data[6] = 0
data[7] = 0
return data
def calc_avg(self, result):
# calc bytes
for key in NfsByteCounters:
result.nfs_data[key] = float(result.nfs_data[key]) / self.gap
# calc xprt
xprt_per_req_counter = ['backlogutil', 'inflightsends', 'sending_queue', 'pending_queue']
xprt_to_deal_list = [i for i in xprt_per_req_counter if i in result.rpc_data.keys()]
if result.rpc_data["rpcsends"] != 0:
for key in xprt_to_deal_list:
result.rpc_data[key] = float(result.rpc_data[key]) / result.rpc_data["rpcsends"]
else:
for key in xprt_to_deal_list:
result.nfs_data[key] = 0
# calc nfs operation
for key in set(Nfsv4ops + Nfsv3ops):
if result.rpc_data.get(key):
result.rpc_data[key] = self.deal_op_data(result.rpc_data[key])
return result
def dump_bytes(self):
output = list()
now = int(time.time())
self.handler.namespace = 'qce/cfs'
self.handler.dimensions = [
'uuid', 'cfsvip', 'mountdir', 'vmip', 'appid', 'vpcid']
for result in self.barad_data.values():
if result is None:
continue
cfsvip = result.nfs_data['export'].split(':')[0]
mountdir = result.nfs_data['mountpoint']
dimensions = {
'uuid': self.vm_uuid,
'cfsvip': cfsvip,
'mountdir': mountdir,
'vmip': self.vm_ip,
'appid': self.app_id,
'vpcid': self.vpc_id
}
batch_metric = [
{'name': 'normalreadbytes', 'value': float(result.nfs_data['normalreadbytes'])},
{'name': 'normalwritebytes', 'value': float(result.nfs_data['normalwritebytes'])},
{'name': 'directreadbytes', 'value': float(result.nfs_data['directreadbytes'])},
{'name': 'directwritebytes', 'value': float(result.nfs_data['directwritebytes'])},
{'name': 'serverreadbytes', 'value': float(result.nfs_data['serverreadbytes'])},
{'name': 'serverwritebytes', 'value': float(result.nfs_data['serverwritebytes'])},
{'name': 'readpages', 'value': float(result.nfs_data['readpages'])},
{'name': 'writepages', 'value': float(result.nfs_data['writepages'])}
]
self.handler.add_batch_metric(
batch=batch_metric,
dimensions=dimensions,
timestamp=now)
if self.handler.get_metrics():
data = {
'sender': 'cfs_sender',
'datas': self.handler.pop_metrics()
}
output.append(data)
return output
def dump_rpc_data(self):
output = list()
now = int(time.time())
self.handler.namespace = 'qce/cfs'
self.handler.dimensions = [
'uuid', 'cfsvip', 'mountdir', 'vmip', 'appid', 'vpcid']
for result in self.barad_data.values():
if result is None:
continue
cfsvip = result.nfs_data['export'].split(':')[0]
mountdir = result.nfs_data['mountpoint']
dimensions = {
'uuid': self.vm_uuid,
'cfsvip': cfsvip,
'mountdir': mountdir,
'vmip': self.vm_ip,
'appid': self.app_id,
'vpcid': self.vpc_id
}
batch_metric = [
{'name': 'srcport', 'value': int(result.rpc_data['port'])},
{'name': 'bind_count', 'value': int(result.rpc_data['bind_count'])},
{'name': 'connect_count', 'value': int(result.rpc_data['connect_count'])},
{'name': 'connect_time', 'value': int(result.rpc_data['connect_time'])},
{'name': 'idle_time', 'value': int(result.rpc_data['idle_time'])},
{'name': 'rpcsends', 'value': int(result.rpc_data['rpcsends'])},
{'name': 'rpcrecvs', 'value': int(result.rpc_data['rpcreceives'])},
{'name': 'badxids', 'value': int(result.rpc_data['badxids'])},
{'name': 'req_u', 'value': float(result.rpc_data['backlogutil'])},
{'name': 'bklog_u', 'value': float(result.rpc_data['inflightsends'])},
{'name': 'max_slots', 'value': int(result.rpc_data['max_slots'])},
{'name': 'sending_u', 'value': float(result.rpc_data['sending_queue'])},
{'name': 'pending_u', 'value': float(result.rpc_data['pending_queue'])},
]
self.handler.add_batch_metric(
batch=batch_metric,
dimensions=dimensions,
timestamp=now)
if self.handler.get_metrics():
data = {
'sender': 'cfs_sender',
'datas': self.handler.pop_metrics()
}
output.append(data)
return output
def dump_op_data(self):
output = list()
now = int(time.time())
self.handler.namespace = 'qce/cfs'
self.handler.dimensions = [
'uuid', 'cfsvip', 'mountdir', 'optype', 'vmip', 'appid', 'vpcid']
for result in self.barad_data.values():
if result is None:
continue
cfsvip = result.nfs_data['export'].split(':')[0]
mountdir = result.nfs_data['mountpoint']
ops = result.rpc_data['ops']
for optype in ops:
op_statistic = result.rpc_data[optype]
ops_cnt = int(op_statistic[0])
# 如果操作数为0就不上报了
if ops_cnt <= 0:
continue
dimensions = {
'uuid': self.vm_uuid,
'cfsvip': cfsvip,
'mountdir': mountdir,
'optype': optype,
'vmip': self.vm_ip,
'appid': self.app_id,
'vpcid': self.vpc_id
}
batch_metric = [
{'name': 'ops', 'value': int(op_statistic[0])},
{'name': 'trans', 'value': int(op_statistic[1])},
{'name': 'timeouts', 'value': int(op_statistic[2])},
{'name': 'bytes_sent', 'value': float(op_statistic[3])},
{'name': 'bytes_recv', 'value': float(op_statistic[4])},
{'name': 'queue', 'value': float(op_statistic[5])},
{'name': 'rtt', 'value': float(op_statistic[6])},
{'name': 'execute', 'value': float(op_statistic[7])}
]
self.handler.add_batch_metric(
batch=batch_metric,
dimensions=dimensions,
timestamp=now)
if self.handler.get_metrics():
data = {
'sender': 'cfs_sender',
'datas': self.handler.pop_metrics()
}
output.append(data)
return output
def dump_info_data(self):
output = list()
now = int(time.time())
self.handler.namespace = 'qce/cfs'
self.handler.dimensions = [
'uuid', 'cfsvip', 'mountdir', 'opts',
'caps', 'sec', 'vmip', 'appid', 'vpcid']
for result in self.barad_data.values():
if result is None:
continue
cfsvip = result.nfs_data['export'].split(':')[0]
mountdir = result.nfs_data['mountpoint']
dimensions = {
'uuid': self.vm_uuid,
'cfsvip': cfsvip,
'mountdir': mountdir,
'opts': "|".join(result.nfs_data['mountoptions']).replace('=', '-'),
'caps': "|".join(result.nfs_data['servercapabilities']).replace('=', '-'),
'sec': "flavor-{0}|pseudoflavor-{1}".format(str('flavor'), str('pseudoflavor')),
'vmip': self.vm_ip,
'appid': self.app_id,
'vpcid': self.vpc_id
}
batch_metric = [
{'name': 'age', 'value': int(result.nfs_data['age'])}
]
self.handler.add_batch_metric(
batch=batch_metric,
dimensions=dimensions,
timestamp=now
)
if self.handler.get_metrics():
data = {
'sender': 'cfs_sender',
'datas': self.handler.pop_metrics()
}
output.append(data)
return output
def do_dump_data(self):
data = list()
bytes_data = self.dump_bytes()
if bytes_data:
data += bytes_data
rpc_data = self.dump_rpc_data()
if rpc_data:
data += rpc_data
op_data = self.dump_op_data()
if op_data:
data += op_data
info_data = self.dump_info_data()
if info_data:
data += info_data
return data
class DeviceData(object):
"""DeviceData objects provide methods for parsing and displaying
data for a single mount grabbed from /proc/self/mountstats
"""
def __init__(self):
self.nfs_data = dict()
self.rpc_data = dict()
self.rpc_data['ops'] = []
def __sub__(self, rhs):
if not isinstance(rhs, DeviceData):
raise TypeError("DeviceData only sub with DeviceData instance")
new_ins = DeviceData()
new_ins.nfs_data = deepcopy(self.nfs_data)
new_ins.rpc_data = deepcopy(self.rpc_data)
# calc events
for key in NfsEventCounters:
new_ins.nfs_data[key] = self.nfs_data[key] - rhs.nfs_data[key]
# calc bytes
for key in NfsByteCounters:
new_ins.nfs_data[key] = self.nfs_data[key] - rhs.nfs_data[key]
# calc xprt
for key in CumulativeXprtCounters:
if self.rpc_data.get(key) and rhs.rpc_data.get(key):
new_ins.rpc_data[key] = self.rpc_data[key] - rhs.rpc_data[key]
# calc NFS operation
for key in set(Nfsv4ops + Nfsv3ops):
if self.rpc_data.get(key) and rhs.rpc_data.get(key):
new_ins.rpc_data[key] = map(difference, self.rpc_data[key], rhs.rpc_data[key])
return new_ins
def __parse_nfs_line(self, words):
if words[0] == 'device':
self.nfs_data['export'] = words[1]
self.nfs_data['mountpoint'] = words[4]
self.nfs_data['fstype'] = words[7]
if words[7].find('nfs') != -1 and words[7] != 'nfsd':
self.nfs_data['statvers'] = words[8]
elif 'nfs' in words or 'nfs4' in words:
self.nfs_data['export'] = words[0]
self.nfs_data['mountpoint'] = words[3]
self.nfs_data['fstype'] = words[6]
if words[6].find('nfs') != -1 and words[6] != 'nfsd':
self.nfs_data['statvers'] = words[7]
elif words[0] == 'age:':
self.nfs_data['age'] = int(words[1])
elif words[0] == 'opts:':
self.nfs_data['mountoptions'] = ''.join(words[1:]).split(',')
elif words[0] == 'caps:':
self.nfs_data['servercapabilities'] = ''.join(words[1:]).split(',')
elif words[0] == 'nfsv4:':
self.nfs_data['nfsv4flags'] = ''.join(words[1:]).split(',')
elif words[0] == 'sec:':
keys = ''.join(words[1:]).split(',')
self.nfs_data['flavor'] = int(keys[0].split('=')[1])
self.nfs_data['pseudoflavor'] = 0
if self.nfs_data['flavor'] == 6:
self.nfs_data['pseudoflavor'] = int(keys[1].split('=')[1])
elif words[0] == 'events:':
i = 1
for key in NfsEventCounters:
try:
self.nfs_data[key] = int(words[i])
except IndexError:
self.nfs_data[key] = 0
i += 1
elif words[0] == 'bytes:':
i = 1
for key in NfsByteCounters:
self.nfs_data[key] = int(words[i])
i += 1
def __parse_rpc_line(self, words):
if words[0] == 'RPC':
self.rpc_data['statsvers'] = float(words[3])
self.rpc_data['programversion'] = words[5]
elif words[0] == 'xprt:':
self.rpc_data['protocol'] = words[1]
if words[1] == 'udp':
i = 2
for key in XprtUdpCounters:
self.rpc_data[key] = int(words[i])
i += 1
elif words[1] == 'tcp':
i = 2
for key in XprtTcpCounters:
# new version kernel support more xprt counter
if i >= len(words):
break
self.rpc_data[key] = int(words[i])
i += 1
elif words[1] == 'rdma':
i = 2
for key in XprtRdmaCounters:
self.rpc_data[key] = int(words[i])
i += 1
elif words[0] == 'per-op':
self.rpc_data['per-op'] = words
else:
op = words[0][:-1]
self.rpc_data['ops'] += [op]
self.rpc_data[op] = [int(word) for word in words[1:]]
def parse_stats(self, lines):
"""Turn a list of lines from a mount stat file into a
dictionary full of stats, keyed by name
"""
found = False
for line in lines:
words = line.split()
if not words:
continue
if not found and words[0] != 'RPC':
self.__parse_nfs_line(words)
continue
found = True
self.__parse_rpc_line(words)
def is_nfs_mountpoint(self):
"""Return True if this is an NFS or NFSv4 mountpoint,
otherwise return False
"""
if self.nfs_data['fstype'] == 'nfs':
return True
elif self.nfs_data['fstype'] == 'nfs4':
return True
return False
def nfs_version(self):
if self.is_nfs_mountpoint():
prog, vers = self.rpc_data['programversion'].split('/')
return int(vers)
class CfsMonitorMountStats(object):
def __init__(self):
self.ms_dict = dict()
self.device_dict = dict()
self.nfs_device_dict = dict()
def parse_stats_from_path(self, path='/proc/self/mountstats'):
with open(path) as f:
if not self.parse_stats_file(f):
print "split stats file failed!"
for mount_point, mount_info in self.ms_dict.items():
device = DeviceData()
device.parse_stats(mount_info)
self.device_dict[mount_point] = device
if device.is_nfs_mountpoint():
self.nfs_device_dict[mount_point] = device
def parse_stats_file(self, f):
"""pop the contents of a mountstats file into a dictionary,
keyed by mount point. each value object is a list of the
lines in the mountstats file corresponding to the mount
point named in the key.
"""
self.ms_dict = dict()
key = ''
f.seek(0)
for line in f.readlines():
words = line.split()
if not words:
continue
if words[0] == 'device':
key = words[4]
new = [line.strip()]
elif 'nfs' in words or 'nfs4' in words:
key = words[3]
new = [line.strip()]
else:
new += [line.strip()]
self.ms_dict[key] = new
return True
def main():
collector = CfsShortFrequencyCollector()
collector.init()
collector.collect()
collector.dump_data()
if __name__ == '__main__':
main()