File: //usr/local/qcloud/monitor/barad/plugin/collector/metal/metaldisk.py
# encoding=utf8
import re
import os
import sys
import time,datetime
import traceback
sys.path.append(os.getcwd() + '/../../../comm/')
import constant
import urllib2
from plugin_base import BaseCollector, VmBaseCollector
from utils.metric_handler import MetricHandler
from utils.pyiostat import IOStatus
from cutils import CommUtils,generate_config
import cutils
import pdb
class DiskInfoAndIOStat():
def __init__(self, logger, interval=10):
self.logger = logger
self.interval = interval # 上报时间粒度,默认 10秒
self.disk_config_map = {}
def get_disk_iostat_map(self, start_time_stamp, duration):
disk_iostat_map = {}
for idx in xrange(duration):
try:
time_stamp = start_time_stamp + idx * self.interval
time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time_stamp))
self.logger.info("begin to read disk iostat at time: %s", time_str)
# 从 /proc/diskstats 读取 iostat 数据,如果是模块第一次运行,会抛出异常
# 因为 iostat 需要根据一段时间内的头尾差值来计算,所以第一次运行不上报数据
iostats = IOStatus().get_iostat()
for dev, iostat in iostats.items():
try:
disk_iostat = {}
disk_iostat["time"] = time_str
devname = "/dev/" + dev
disk_iostat["devname"] = devname
disk_iostat["rrqm"] = float(iostat['rrqm/s'])
disk_iostat["wrqm"] = float(iostat['wrqm/s'])
disk_iostat["rcnt"] = float(iostat['r/s'])
disk_iostat["wcnt"] = float(iostat['w/s'])
disk_iostat["rband"] = float(iostat['rMB/s'])
disk_iostat["wband"] = float(iostat['wMB/s'])
disk_iostat["avgrq"] = float(iostat['avgrq-sz'])
disk_iostat["avgqu"] = float(iostat['avgqu-sz'])
disk_iostat["await"] = float(iostat['await'])
disk_iostat["svctm"] = float(iostat['svctm'])
disk_iostat["util"] = float(iostat['util'])
if devname not in disk_iostat_map:
disk_iostat_map[devname] = []
disk_iostat_map[devname].append(disk_iostat)
except Exception as e:
self.logger.info("%s", str(e))
self.logger.info("%s", traceback.format_exc())
except Exception as e:
self.logger.info("%s", str(e))
self.logger.info("%s", traceback.format_exc())
return disk_iostat_map
def get_qemu_disk_info_and_iostat(self,disk_info_map, duration):
qemu_disk_info_and_iostat_map = {}
start_time_stamp = int(time.time()) # 现在更新为实时读取
start_time_stamp = start_time_stamp - start_time_stamp % self.interval
start_time_stamp = start_time_stamp - self.interval * (duration - 1)
#disk_info_map = self.get_disk_info_map()
disk_iostat_map = self.get_disk_iostat_map(start_time_stamp, duration)
# disk_config_map是以磁盘为单位
for devname in disk_info_map:
if devname not in disk_iostat_map:
continue
disk_iostat_list = disk_iostat_map[devname]
for disk_iostat in disk_iostat_list:
disk_iostat.update(disk_info_map[devname])
qemu_disk_info_and_iostat_map[devname] = disk_iostat_list
self.logger.info("DISK_NUMBER: %s", len(qemu_disk_info_and_iostat_map))
return qemu_disk_info_and_iostat_map
class BareMetalDiskCollector(VmBaseCollector):
def init(self):
self.set_frequency(10)
self._metric_handler = MetricHandler()
self._metric_handler.namespace = 'qce/cvm'
self._last_disk_config_map_time = 0
self.disk_config_map_update_period = 60*10
self.q = DiskInfoAndIOStat(self.logger(), interval=10)
self.disk_config_map = {} #所有磁盘id信息
self.metadata = '' #metadata信息
self.system_disk_id = '' #系统盘id
def collect_all_disks_id(self,disks):
#所有磁盘的id map
disk_id_map = {}
#本地能获取到所有id的集合
disk_ids = []
null_diskid_name = ''
#获取本地所有磁盘的id,记录获取不到的diskname
for disk in disks:
disk_id = self.collect_diskid_for_diskname(disk["dev"])
if disk_id != '':
disk_id_map[disk["dev"]] = disk_id
disk_ids.append(disk_id)
else:
null_diskid_name = disk["dev"]
#通过metadata获取cbs系统盘diskid
if null_diskid_name != '':
disk_id_map[null_diskid_name] = self.collect_cbs_system_diskid(disk_ids)
self.logger().info("disk_id_map_len: %d , disk_id_map:%s",len(disk_id_map),disk_id_map)
return disk_id_map
def get_physical_disk_id(self,serial):
if self.metadata == '':
self.metadata = generate_config(constant.PLUGIN_CONFIG_PATH, "MetaDataServer")
url = ""
diskid = ''
if re.match("vd",serial) is None:
url=self.metadata['metadata_url'] + 'bare-metal/block_disk_mapping/%s' % serial
try:
response = urllib2.urlopen(url, timeout=2)
content = response.read()
diskid = content
except Exception as e:
self.logger().error("get_physical_disk_id from metadata fail, %s " % e)
return diskid
def collect_diskid_for_diskname(self,disk_name):
disk_id = ''
# 先获取物理盘的信息(sda,nvme)
if re.match("vd",disk_name) is None:
# 直接去metadata获取diskid
disk_id = self.get_physical_disk_id(disk_name)
else:
#如果是cbs盘,获取cbs盘非系统盘的disk(系统盘disk-id无法获取,只能通过metadata来进行匹配)
try:
if os.path.exists('/sys/block/%s/serial' % disk_name):
with open('/sys/block/%s/serial' % disk_name) as f:
disk_id = f.read().strip().split("\n")[0]
except Exception as e:
self.logger().error("get disk_id from /sys/block/%s/serial fail, %s " % (disk_name, e))
return disk_id
#获取cbs系统盘diskid逻辑,先获取所有本地盘diskid,然后再获取非系统cbs盘diskid,请求matedata服务拿到所有diskid,最后过滤出系统盘diskid
def collect_cbs_system_diskid(self,diskids):
#如果有system_diskid,则直接返回
if self.system_disk_id != '':
self.logger().info("metal collect_cbs_system_diskid return: %s",self.system_disk_id)
return self.system_disk_id
system_disk_id = ''
if self.metadata == '':
self.metadata = generate_config(constant.PLUGIN_CONFIG_PATH, "MetaDataServer")
url = self.metadata['metadata_url'] + 'volumes'
try:
response = urllib2.urlopen(url, timeout=2)
content = response.read().replace('/', '').splitlines()
except Exception as e:
self.logger().error("get disk_id from metadata fail, %s " % e)
return system_disk_id
set_diskids = set(diskids)
set_content = set(content)
diff_set = set_content ^ set_diskids
if len(diff_set) == 1:
system_disk_id = next(iter(diff_set))
self.system_disk_id = system_disk_id
else:
self.logger().error("diff_set!=1 %s", diff_set)
return system_disk_id
def get_disk_info_map(self):
now = time.time()
if (now - self._last_disk_config_map_time) < self.disk_config_map_update_period:
return self.disk_config_map
partition_cmd = 'cat /proc/partitions | sort -k 4'
partition_data = re.sub(" * ", " ", CommUtils.ExecuteTimeoutCommand(partition_cmd, 3)).split("\n")
disks = []
for line in partition_data:
ret_t = {}
line_data = line.split(' ')
if len(line_data) >= 5 and (re.match('((.*[a-z])|(nvme[0-9]+n[0-9]+))$',line_data[4])):
disk_name = line_data[4]
ret_t['dev'] = disk_name
# ret_t['uuid'] = ''
# 这里没有浮点运算 因为后面解析的时候是直接取整的
disk_size = int(line_data[3])/(1024*1024)
ret_t['size'] = str(disk_size)+'G'
# ret_t['disk_id'] =''
disks.append(ret_t)
#计算存储所有盘的diskid
all_disks_id = self.collect_all_disks_id(disks)
for disk in disks:
try:
disk_config = {}
disk_config["size"] = disk["size"].encode("utf8") \
if type(disk["size"]) == unicode else disk["size"]
# 设备名
disk_config["dev"] = disk["dev"].encode("utf8") \
if type(disk["dev"]) == unicode else disk["dev"]
tmp = disk_config["dev"]
devname = "/dev/" + disk_config["dev"]
disk_config["disk_id"] = all_disks_id[tmp]
self.disk_config_map[devname] = disk_config
except Exception as e:
self.logger().info("%s", str(e))
self.logger().info("%s", traceback.format_exc())
self._last_disk_config_map_time = now
return self.disk_config_map
def do_collect(self):
if cutils.is_minios():
return
self.logger().info("BareMetalDiskCollector start to run")
disk_map = self.get_disk_info_map()
info = self.q.get_qemu_disk_info_and_iostat(disk_map, 1)
# host_ip = HostBaseCollector.getLocalIp()
for dev in info:
iostat_list = info[dev]
for iostat in iostat_list:
disk_total = 0
try:
pattern = re.compile("([0-9]+)([a-zA-Z]+)")
result = pattern.match(iostat["size"])
disk_total = int(result.group(1))
except Exception as e:
self.logger().info("%s", str(e))
rcnt = 0 if iostat["rcnt"] < 0 else iostat["rcnt"]
wcnt = 0 if iostat["wcnt"] < 0 else iostat["wcnt"]
rband = 0 if iostat["rband"] < 0 else iostat["rband"] * 1024
wband = 0 if iostat["wband"] < 0 else iostat["wband"] * 1024
await = 0 if iostat["await"] < 0 else iostat["await"]
svctm = 0 if iostat["svctm"] < 0 else iostat["svctm"]
util = 0 if iostat["util"] < 0 else iostat["util"]
disk_id = 'disk-' + self.get_vm_uuid() + '-' + dev.split('/')[-1] if iostat["disk_id"] is None or len(iostat["disk_id"].strip()) == 0 else iostat["disk_id"]
dimension = {"diskid": disk_id}
# dimension={}# 暂时为空
time_stamp = int(time.time())
batch_metric = []
metric = {"name": "disk_read_traffic", "value": rband, "unit": "KB/s"}
batch_metric.append(metric)
metric = {"name": "disk_write_traffic", "value": wband, "unit": "KB/s"}
batch_metric.append(metric)
metric = {"name": "disk_read_iops", "value": rcnt, "unit": "count"}
batch_metric.append(metric)
metric = {"name": "disk_write_iops", "value": wcnt, "unit": "count"}
batch_metric.append(metric)
metric = {"name": "disk_await", "value": await, "unit": "ms"}
batch_metric.append(metric)
metric = {"name": "disk_svctm", "value": svctm, "unit": "ms"}
batch_metric.append(metric)
metric = {"name": "disk_util", "value": util, "unit": "%"}
batch_metric.append(metric)
metric = {"name": "disk_total", "value": disk_total, "unit": "GB"}
batch_metric.append(metric)
self._metric_handler.add_batch_metric(dimensions=dimension, timestamp=time_stamp, batch=batch_metric)
if (len(self._metric_handler.get_metrics()) > 0) :
data = {'sender': 'nws_sender', 'datas': self._metric_handler.pop_metrics()}
self.put_data(data)
def main():
collector = BareMetalDiskCollector()
while 1:
collector.do_collect()
collector.dump_data()
time.sleep(3)
if __name__ == '__main__':
data = main()