HEX
Server: Apache/2.4.52 (Ubuntu)
System: Linux WebLive 5.15.0-79-generic #86-Ubuntu SMP Mon Jul 10 16:07:21 UTC 2023 x86_64
User: ubuntu (1000)
PHP: 7.4.33
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
Upload Files
File: //usr/local/qcloud/monitor/barad/plugin/collector/metal/metaldisk.py
# encoding=utf8

import re
import os
import sys
import time,datetime
import traceback
sys.path.append(os.getcwd() + '/../../../comm/')
import constant
import urllib2
from plugin_base import BaseCollector, VmBaseCollector
from utils.metric_handler import MetricHandler
from utils.pyiostat import IOStatus
from cutils import CommUtils,generate_config
import cutils
import pdb

class DiskInfoAndIOStat():
    def __init__(self, logger, interval=10):
        self.logger = logger
        self.interval = interval  # 上报时间粒度,默认 10秒
        self.disk_config_map = {}

    def get_disk_iostat_map(self, start_time_stamp, duration):
        disk_iostat_map = {}
        for idx in xrange(duration):
            try:
                time_stamp = start_time_stamp + idx * self.interval
                time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time_stamp))
                self.logger.info("begin to read disk iostat at time: %s", time_str)

                # 从 /proc/diskstats 读取 iostat 数据,如果是模块第一次运行,会抛出异常
                # 因为 iostat 需要根据一段时间内的头尾差值来计算,所以第一次运行不上报数据
                iostats = IOStatus().get_iostat()
                for dev, iostat in iostats.items():
                    try:
                        disk_iostat = {}
                        disk_iostat["time"] = time_str
                        devname = "/dev/" + dev
                        disk_iostat["devname"] = devname
                        disk_iostat["rrqm"] = float(iostat['rrqm/s'])
                        disk_iostat["wrqm"] = float(iostat['wrqm/s'])
                        disk_iostat["rcnt"] = float(iostat['r/s'])
                        disk_iostat["wcnt"] = float(iostat['w/s'])
                        disk_iostat["rband"] = float(iostat['rMB/s'])
                        disk_iostat["wband"] = float(iostat['wMB/s'])
                        disk_iostat["avgrq"] = float(iostat['avgrq-sz'])
                        disk_iostat["avgqu"] = float(iostat['avgqu-sz'])
                        disk_iostat["await"] = float(iostat['await'])
                        disk_iostat["svctm"] = float(iostat['svctm'])
                        disk_iostat["util"] = float(iostat['util'])

                        if devname not in disk_iostat_map:
                            disk_iostat_map[devname] = []

                        disk_iostat_map[devname].append(disk_iostat)

                    except Exception as e:
                        self.logger.info("%s", str(e))
                        self.logger.info("%s", traceback.format_exc())

            except Exception as e:
                self.logger.info("%s", str(e))
                self.logger.info("%s", traceback.format_exc())
        return disk_iostat_map

    def get_qemu_disk_info_and_iostat(self,disk_info_map, duration):
        qemu_disk_info_and_iostat_map = {}
        start_time_stamp = int(time.time())  # 现在更新为实时读取
        start_time_stamp = start_time_stamp - start_time_stamp % self.interval
        start_time_stamp = start_time_stamp - self.interval * (duration - 1)
        #disk_info_map = self.get_disk_info_map()
        disk_iostat_map = self.get_disk_iostat_map(start_time_stamp, duration)
        # disk_config_map是以磁盘为单位
        for devname in disk_info_map:
            if devname not in disk_iostat_map:
                continue

            disk_iostat_list = disk_iostat_map[devname]
            for disk_iostat in disk_iostat_list:
                disk_iostat.update(disk_info_map[devname])

            qemu_disk_info_and_iostat_map[devname] = disk_iostat_list

        self.logger.info("DISK_NUMBER: %s", len(qemu_disk_info_and_iostat_map))
        return qemu_disk_info_and_iostat_map

class BareMetalDiskCollector(VmBaseCollector):
    def init(self):
        self.set_frequency(10)
        self._metric_handler = MetricHandler()
        self._metric_handler.namespace = 'qce/cvm'
        self._last_disk_config_map_time = 0
        self.disk_config_map_update_period = 60*10
        self.q = DiskInfoAndIOStat(self.logger(), interval=10)
        self.disk_config_map = {}       #所有磁盘id信息
        self.metadata = ''              #metadata信息
        self.system_disk_id = ''        #系统盘id

    def collect_all_disks_id(self,disks):

        #所有磁盘的id map
        disk_id_map = {}
        #本地能获取到所有id的集合
        disk_ids = []
        null_diskid_name = ''
        #获取本地所有磁盘的id,记录获取不到的diskname
        for disk in disks:
            disk_id = self.collect_diskid_for_diskname(disk["dev"])
            if disk_id != '':
                disk_id_map[disk["dev"]] = disk_id
                disk_ids.append(disk_id)
            else:
                null_diskid_name = disk["dev"]

        #通过metadata获取cbs系统盘diskid
        if null_diskid_name != '':
            disk_id_map[null_diskid_name] = self.collect_cbs_system_diskid(disk_ids)

        self.logger().info("disk_id_map_len: %d , disk_id_map:%s",len(disk_id_map),disk_id_map)
        return disk_id_map

    def get_physical_disk_id(self,serial):
        if self.metadata == '':
            self.metadata = generate_config(constant.PLUGIN_CONFIG_PATH, "MetaDataServer")
        url = ""
        diskid = ''
        if re.match("vd",serial) is None:
            url=self.metadata['metadata_url'] + 'bare-metal/block_disk_mapping/%s' % serial
        try:
            response = urllib2.urlopen(url, timeout=2)
            content = response.read()
            diskid = content
        except Exception as e:
            self.logger().error("get_physical_disk_id from metadata fail,  %s " % e)
        return diskid

    def collect_diskid_for_diskname(self,disk_name):
        disk_id = ''
        # 先获取物理盘的信息(sda,nvme)
        if re.match("vd",disk_name) is None:
            # 直接去metadata获取diskid
            disk_id = self.get_physical_disk_id(disk_name)
        else:
            #如果是cbs盘,获取cbs盘非系统盘的disk(系统盘disk-id无法获取,只能通过metadata来进行匹配)
            try:
                if os.path.exists('/sys/block/%s/serial' % disk_name):
                    with open('/sys/block/%s/serial' % disk_name) as f:
                        disk_id = f.read().strip().split("\n")[0]
            except Exception as e:
                self.logger().error("get disk_id from /sys/block/%s/serial fail,  %s " % (disk_name, e))
        return disk_id

    #获取cbs系统盘diskid逻辑,先获取所有本地盘diskid,然后再获取非系统cbs盘diskid,请求matedata服务拿到所有diskid,最后过滤出系统盘diskid
    def collect_cbs_system_diskid(self,diskids):
        #如果有system_diskid,则直接返回
        if self.system_disk_id != '':
            self.logger().info("metal collect_cbs_system_diskid return: %s",self.system_disk_id)
            return self.system_disk_id

        system_disk_id = ''
        if self.metadata == '':
            self.metadata = generate_config(constant.PLUGIN_CONFIG_PATH, "MetaDataServer")

        url = self.metadata['metadata_url'] + 'volumes'
        try:
            response = urllib2.urlopen(url, timeout=2)
            content = response.read().replace('/', '').splitlines()
        except Exception as e:
            self.logger().error("get disk_id from metadata fail,  %s " % e)
            return system_disk_id

        set_diskids = set(diskids)
        set_content = set(content)
        diff_set = set_content ^ set_diskids
        if len(diff_set) == 1:
            system_disk_id = next(iter(diff_set))
            self.system_disk_id = system_disk_id
        else:
            self.logger().error("diff_set!=1  %s", diff_set)
        return system_disk_id

    def get_disk_info_map(self):
        now = time.time()
        if (now - self._last_disk_config_map_time) < self.disk_config_map_update_period:
            return self.disk_config_map
        partition_cmd = 'cat /proc/partitions | sort -k 4'
        partition_data = re.sub(" * ", " ", CommUtils.ExecuteTimeoutCommand(partition_cmd, 3)).split("\n")
        disks = []
        for line in partition_data:
            ret_t = {}
            line_data = line.split(' ')
            if len(line_data) >= 5 and (re.match('((.*[a-z])|(nvme[0-9]+n[0-9]+))$',line_data[4])):
                disk_name = line_data[4]
                ret_t['dev'] = disk_name
                # ret_t['uuid'] = ''
                # 这里没有浮点运算 因为后面解析的时候是直接取整的
                disk_size = int(line_data[3])/(1024*1024)
                ret_t['size'] = str(disk_size)+'G'
                # ret_t['disk_id'] =''
                disks.append(ret_t)
        #计算存储所有盘的diskid
        all_disks_id = self.collect_all_disks_id(disks)
        for disk in disks:
            try:
                disk_config = {}
                disk_config["size"] = disk["size"].encode("utf8") \
                    if type(disk["size"]) == unicode else disk["size"]
                # 设备名
                disk_config["dev"] = disk["dev"].encode("utf8") \
                    if type(disk["dev"]) == unicode else disk["dev"]

                tmp = disk_config["dev"]
                devname = "/dev/" + disk_config["dev"]

                disk_config["disk_id"] = all_disks_id[tmp]

                self.disk_config_map[devname] = disk_config
            except Exception as e:
                self.logger().info("%s", str(e))
                self.logger().info("%s", traceback.format_exc())
        self._last_disk_config_map_time = now
        return self.disk_config_map

    def do_collect(self):
        if cutils.is_minios():
            return
        self.logger().info("BareMetalDiskCollector start to run")
        disk_map = self.get_disk_info_map()
        info = self.q.get_qemu_disk_info_and_iostat(disk_map, 1)
        # host_ip = HostBaseCollector.getLocalIp()

        for dev in info:
            iostat_list = info[dev]
            for iostat in iostat_list:
                disk_total = 0
                try:
                    pattern = re.compile("([0-9]+)([a-zA-Z]+)")
                    result = pattern.match(iostat["size"])
                    disk_total = int(result.group(1))
                except Exception as e:
                    self.logger().info("%s", str(e))

                rcnt = 0 if iostat["rcnt"] < 0 else iostat["rcnt"]
                wcnt = 0 if iostat["wcnt"] < 0 else iostat["wcnt"]
                rband = 0 if iostat["rband"] < 0 else iostat["rband"] * 1024
                wband = 0 if iostat["wband"] < 0 else iostat["wband"] * 1024
                await = 0 if iostat["await"] < 0 else iostat["await"]
                svctm = 0 if iostat["svctm"] < 0 else iostat["svctm"]
                util = 0 if iostat["util"] < 0 else iostat["util"]
                disk_id = 'disk-' + self.get_vm_uuid() + '-' + dev.split('/')[-1] if iostat["disk_id"] is None or len(iostat["disk_id"].strip()) == 0 else iostat["disk_id"]

                dimension = {"diskid": disk_id}
                # dimension={}#  暂时为空
                time_stamp = int(time.time())
                batch_metric = []
                metric = {"name": "disk_read_traffic", "value": rband, "unit": "KB/s"}
                batch_metric.append(metric)
                metric = {"name": "disk_write_traffic", "value": wband, "unit": "KB/s"}
                batch_metric.append(metric)
                metric = {"name": "disk_read_iops", "value": rcnt, "unit": "count"}
                batch_metric.append(metric)
                metric = {"name": "disk_write_iops", "value": wcnt, "unit": "count"}
                batch_metric.append(metric)
                metric = {"name": "disk_await", "value": await, "unit": "ms"}
                batch_metric.append(metric)
                metric = {"name": "disk_svctm", "value": svctm, "unit": "ms"}
                batch_metric.append(metric)
                metric = {"name": "disk_util", "value": util, "unit": "%"}
                batch_metric.append(metric)
                metric = {"name": "disk_total", "value": disk_total, "unit": "GB"}
                batch_metric.append(metric)
                self._metric_handler.add_batch_metric(dimensions=dimension, timestamp=time_stamp, batch=batch_metric)

        if (len(self._metric_handler.get_metrics()) > 0) :
            data = {'sender': 'nws_sender', 'datas': self._metric_handler.pop_metrics()}
            self.put_data(data)

def main():
    collector = BareMetalDiskCollector()
    while 1:
        collector.do_collect()
        collector.dump_data()
        time.sleep(3)

if __name__ == '__main__':
    data = main()