HEX
Server: Apache/2.4.52 (Ubuntu)
System: Linux WebLive 5.15.0-79-generic #86-Ubuntu SMP Mon Jul 10 16:07:21 UTC 2023 x86_64
User: ubuntu (1000)
PHP: 7.4.33
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
Upload Files
File: //usr/local/qcloud/monitor/barad/plugin/collector/vm/npu_rdma.py
import resource
import re
import sys
import os, time

sys.path.append(os.getcwd() + "/../../../comm/")
import constant
import datetime, subprocess
from plugin_base import VmBaseCollector
from utils.metric_handler import MetricHandler
from pynvml.pynvml import *
import cutils
import json
import re
from Queue import Queue

CMD_TIMEOUT_SEC = 3
CMD_TIMEOUT_FLAG = "-1"
VAL_ERR = -1


class AscendNPUCollect:
    def __init__(self):
        self.npu_id_list = self.get_npu_ids()
        self.npu_bus_id_list = self.get_npu_bus_ids()
        self.npu_serial_number_list = self.get_npu_serial_numbers()
        self.npu_fw_ver_list = self.get_fw_vers()
        self.ascend_support_stat_items = {
            "tx_prio5_bytes": "mac_tx_total_oct_num",
            "tx_prio5_packets": "mac_tx_total_pkt_num",
            "rx_prio5_bytes": "mac_rx_total_oct_num",
            "rx_prio5_packets": "mac_rx_total_oct_num",
            "tx_prio5_pause": "mac_tx_pfc_pkt_num",
            "rx_prio5_pause": "mac_rx_pfc_pkt_num",
            "rp_cnp_handled": "roce_rx_cnp_pkt_num",
            "out_of_sequence": "roce_out_of_order_num",
            "rx_crc_errors_phy": "roce_verification_err_num",
            "link_events_down_phy": "",
            "qp_num": "",
            "pd_num": "",
            "mr_num": "",
            "cq_num": "",
            "sample_time": "",
        }
        self.ascend_support_config_items = [
            "bonding_mode",
            "active_mtu",
            "bond_duplicate_ip_dect",
            "rdma_ip",
            "link_state",
            "bond_dev_order",
            "traffic_class",
            "dcqcn_enable"
        ]

    def ExecuteCmdWithTimeout(self, command):
        start = datetime.datetime.now()
        process = subprocess.Popen(
            command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True
        )
        while process.poll() is None:
            time.sleep(0.0001)
            now = datetime.datetime.now()
            if (now - start).seconds > CMD_TIMEOUT_SEC:
                process.kill()
                process.wait()
                return CMD_TIMEOUT_FLAG
        return process.stdout.read()

    def read_uptime_secs(self):
        output = self.ExecuteCmdWithTimeout('cut -d \'.\' -f 1 /proc/uptime').decode()
        try:
            secs = int(output)
        except Exception:
            secs = VAL_ERR
        return secs

    def value_cast(self, value):
        try:
            return int(value)
        except ValueError:
            try:
                return round(float(value), 3)
            except ValueError:
                return VAL_ERR

    def grep(self, pattern, lines):
        return [line for line in lines if re.search(pattern, line)]

    def get_npu_ids(self):
        npu_id_list = []
        cmd = "npu-smi info -l"
        output = self.ExecuteCmdWithTimeout(cmd).decode()
        lines = output.splitlines()
        for line in lines:
            if line.find("NPU ID") == -1:
                continue
            try:
                npu_id = line.split(":")[1].strip()
                npu_id_list.append(npu_id)
            except Exception:
                pass
        return npu_id_list

    def get_npu_bus_ids(self):
        cmd = "npu-smi info"
        output = self.ExecuteCmdWithTimeout(cmd).decode()
        lines = output.splitlines()
        bus_id_lines = self.grep("0000:", lines)
        npu_bus_id_list = [VAL_ERR] * len(bus_id_lines)
        for i in range(len(bus_id_lines)):
            line = bus_id_lines[i]
            try:
                bus_id = line.split("|")[2].split("0000:")[1].strip()
            except Exception:
                bus_id = VAL_ERR
            npu_bus_id_list[i] = bus_id
        return npu_bus_id_list

    def get_npu_serial_numbers(self):
        npu_serial_number_list = [VAL_ERR] * len(self.npu_bus_id_list)
        for i in range(len(self.npu_bus_id_list)):
            cmd = "lspci -vvv -s %s" % self.npu_bus_id_list[i]
            output = self.ExecuteCmdWithTimeout(cmd).decode()
            lines = output.splitlines()
            serial_lines = self.grep("Serial Number", lines)
            try:
                serial_number = serial_lines[0].split(" ")[-1].strip()
            except Exception:
                serial_number = VAL_ERR
            npu_serial_number_list[i] = serial_number
        return npu_serial_number_list

    def get_npu_mac(self, npu_id):
        cmd = "hccn_tool -i %s -mac -g" % npu_id
        output = self.ExecuteCmdWithTimeout(cmd).decode()
        try:
            mac = output.split(":")[1].strip()
        except Exception:
            mac = VAL_ERR
        return mac

    def get_npu_ip(self, npu_id):
        cmd = "hccn_tool -i %s -ip -g" % npu_id
        output = self.ExecuteCmdWithTimeout(cmd).decode()
        lines = output.splitlines()
        ip_line = self.grep("ipaddr", lines)
        try:
            ip = ip_line[0].split(":")[1].strip()
        except Exception:
            ip = VAL_ERR
        return ip

    def get_npu_link_state(self, npu_id):
        cmd = "hccn_tool -i %s -link -g" % npu_id
        output = self.ExecuteCmdWithTimeout(cmd).decode()
        lines = output.splitlines()
        try:
            state = 1 if lines[0].split(":")[1] == " UP" else 0
        except Exception:
            state = VAL_ERR
        return state

    def get_npu_net_health(self, npu_id):
        cmd = "hccn_tool -i %s -net_health -g" % npu_id
        output = self.ExecuteCmdWithTimeout(cmd).decode()
        lines = output.splitlines()
        state = VAL_ERR
        try:
            if lines[0].split(":")[1] == " Success":
                state = 0
            elif lines[0].split(":")[1] == " Socket fail":
                state = 1
            elif lines[0].split(":")[1] == " Receive timeout":
                state = 2
            elif lines[0].split(":")[1] == " Unreachable":
                state = 3
            elif lines[0].split(":")[1] == " Time exceeded":
                state = 4
            elif lines[0].split(":")[1] == " Fault":
                state = 5
            elif lines[0].split(":")[1] == " Init":
                state = 6
            elif lines[0].split(":")[1] == " Thread error":
                state = 7
            elif lines[0].split(":")[1] == " Detect ip set":
                state = 8
            else:
                state = 9
        except Exception:
            pass
        return state

    def get_npu_traffic_class(self):
        cmd = "env | grep HCCL_RDMA_TC"
        output = self.ExecuteCmdWithTimeout(cmd).decode()
        try:
            tc = output.split("=")[1].strip()
        except Exception:
            tc = VAL_ERR
        return tc

    def get_udp_port(self, npu_id):
        cmd = "hccn_tool -i %s -udp -g" % npu_id
        output = self.ExecuteCmdWithTimeout(cmd).decode()
        lines = output.splitlines()
        try:
            udp_port = int(lines[0].split(":")[1].strip())
        except Exception:
            udp_port = VAL_ERR
        return udp_port

    def get_mtu(self, npu_id):
        cmd = "hccn_tool -i %s -mtu -g" % npu_id
        output = self.ExecuteCmdWithTimeout(cmd).decode()
        try:
            mtu = int(output.split(":")[1].strip())
        except Exception:
            mtu = VAL_ERR
        return mtu

    def get_health(self, npu_id):
        cmd = "hccn_tool -i %s -net_health -g" % npu_id
        output = self.ExecuteCmdWithTimeout(cmd).decode()
        try:
            health = 0 if output.split(":")[1].strip() == "Success" else VAL_ERR
        except Exception:
            health = VAL_ERR
        return health

    def get_fw_vers(self):
        npu_fw_ver_list = [VAL_ERR] * len(self.npu_bus_id_list)
        for i in range(len(self.npu_bus_id_list)):
            cmd = "npu-smi info -t board -i %d" % i
            output = self.ExecuteCmdWithTimeout(cmd).decode()
            lines = output.splitlines()
            try:
                fw_line = self.grep("Firmware Version", lines)
                fw_ver_str = fw_line[0].split(":")[1].strip()
                fw_ver = int("".join(fw_ver_str.split(".")))
            except Exception:
                fw_ver = VAL_ERR
            npu_fw_ver_list[i] = fw_ver
        return npu_fw_ver_list

    def get_link_down_stat(self, npu_id):
        cmd = "hccn_tool -i %s -link_stat -g" % npu_id
        output = self.ExecuteCmdWithTimeout(cmd).decode()
        lines = output.splitlines()
        try:
            link_stat = self.grep("link down count", lines)
            down_cnt = int(link_stat[0].split(":")[1].strip())
        except Exception:
            down_cnt = VAL_ERR
        return down_cnt

    def get_dcqcn_status(self, npu_id):
        cmd = "hccn_tool -i %s -dcqcn -g status" % npu_id
        output = self.ExecuteCmdWithTimeout(cmd).decode()
        lines = output.splitlines()
        try:
            dcqcn_status = self.grep("dcqcn enable status", lines)
            status_str = dcqcn_status[0].split(":")[1].strip()
            if status_str == "disable":
                status = 0
            else:
                status = 1
        except Exception:
            status = VAL_ERR
        return status

    def get_rdma_resource(self, npu_id):
        cmd = "hccn_tool -i %s -hw_stats -g" % npu_id
        output = self.ExecuteCmdWithTimeout(cmd).decode()
        lines = output.splitlines()
        try:
            keys = {
                "qp_num": "qp_active", 
                "mr_num": "mr_active", 
                "cq_num": "cq_active", 
                "pd_num": "pd_active" 
            }
            values = {
                "qp_num": VAL_ERR, 
                "mr_num": VAL_ERR, 
                "cq_num": VAL_ERR, 
                "pd_num": VAL_ERR,
            }
            for i in range(len(keys.keys())):
                k = keys.keys()[i]
                v = keys[k]
                line = self.grep(v, lines)
                cnt = int(line[0].split(":")[1].strip())
                values[k] = cnt
        except Exception:
            pass
        return values

    def get_npu_config(self):
        npu_config = []
        keys = self.ascend_support_config_items
        values = [VAL_ERR] * len(keys)
        for i in range(len(self.npu_id_list)):
            npu_id = self.npu_id_list[i]
            for j in range(len(keys)):
                key = keys[j]
                try:
                    if key == "bonding_mode":
                        values[j] = self.get_udp_port(npu_id)
                    elif key == "active_mtu":
                        values[j] = self.get_mtu(npu_id)
                    elif key == "bond_duplicate_ip_dect":
                        values[j] = self.get_health(npu_id)
                    # elif key == "rdma_ip":
                    #     values[j] = self.get_npu_ip(npu_id)
                    elif key == "link_state":
                        values[j] = self.get_npu_link_state(npu_id)
                    elif key == "bond_dev_order":
                        values[j] = self.get_npu_net_health(npu_id)
                    elif key == "traffic_class":
                        values[j] = self.get_npu_traffic_class()
                    elif key == "link_events_down_phy":
                        values[j] = self.get_link_down_stat(npu_id)
                    elif key == "dcqcn_enable":
                        values[j] = self.get_dcqcn_status(npu_id)
                except Exception:
                    continue
            npu_config.append(dict(zip(keys, values)))
        return npu_config

    def get_npu_stat(self):
        npu_stat = []
        for i in range(len(self.npu_id_list)):
            keys = self.ascend_support_stat_items.keys()
            values = [VAL_ERR] * len(keys)
            cmd = "hccn_tool -i %s -stat -g" % i
            output = self.ExecuteCmdWithTimeout(cmd).decode()
            # this time is used as the denominator of the RDMA traffic
            sample_time = time.time()
            lines = output.splitlines()
            rdma_resource = self.get_rdma_resource(i)
            for j in range(len(keys)):
                key = keys[j]
                if key not in self.ascend_support_stat_items.keys():
                    continue
                if key == "link_events_down_phy":
                    values[j] = self.get_link_down_stat(i)
                    continue
                if key == "qp_num":
                    values[j] = rdma_resource["qp_num"]
                    continue
                if key == "cq_num":
                    values[j] = rdma_resource["cq_num"]
                    continue
                if key == "pd_num":
                    values[j] = rdma_resource["pd_num"]
                    continue
                if key == "mr_num":
                    values[j] = rdma_resource["mr_num"]
                    continue
                if key == "sample_time":
                    values[j] = sample_time
                    continue
                try:
                    key_line = self.grep(self.ascend_support_stat_items[key], lines)
                    values[j] = int(key_line[0].split(":")[1])
                except Exception:
                    continue
            npu_stat.append(dict(zip(keys, values)))
        return npu_stat


class NPUCollector(VmBaseCollector):
    def init(self):
        self.set_frequency(10)
        self.collector = AscendNPUCollect()
        self.handler = MetricHandler()
        self.handler.namespace = "qce/cvm"
        self.is_dns_normal = 1
        try:
            self.uuid = self.get_vm_uuid()
            self.vmip = self.get_vmip()
        except Exception:
            self.is_dns_normal = 0
            pass
        self.last_report_time = 0
        self.last_get_config_time = 0
        self.last_get_stat_time = 0
        self.last_get_fw_ver_time = 0
        self.report_interval = 30
        self.get_stat_interval = 29
        self.get_config_interval = 300
        self.get_fw_ver_interval = 12 * 60 * 60
        self.alarm_interval = 1 * 60 * 60
        self.uptime_alarm_interval = 60 * 15  # alarm after system start 15 mins at least
        self.last_link_state_alarm_time = [0] * len(self.collector.npu_id_list)
        self.last_npu_config = self.collector.get_npu_config()
        self.last_npu_stat = self.collector.get_npu_stat()

    def is_npu_ready(self, old_count):
        cmd = "npu-smi info -l"
        output = self.collector.ExecuteCmdWithTimeout(cmd).decode()
        lines = output.splitlines()
        curr_count = 0
        for line in lines:
            if line.find("NPU ID") != -1:
                curr_count += 1
        if curr_count == 0 or curr_count != old_count:
            return 0
        return 1

    def reinit(self):
        self.collector.npu_id_list = self.collector.get_npu_ids()
        self.collector.npu_bus_id_list = self.collector.get_npu_bus_ids()
        self.collector.npu_serial_number_list = self.collector.get_npu_serial_numbers()
        self.last_report_time = 0
        self.last_get_stat_time = 0
        self.last_get_config_time = 0
        self.report_interval = 30
        self.get_stat_interval = 29
        self.get_config_interval = 300
        self.last_npu_config = self.collector.get_npu_config()
        self.last_npu_stat = self.collector.get_npu_stat()

    def do_collect(self):
        if not self.is_npu_ready(len(self.collector.npu_id_list)):
            self.reinit()
        if not self.is_dns_normal:
            try:
                self.uuid = self.get_vm_uuid()
                self.vmip = self.get_vmip()
                self.is_dns_normal = 1
            except Exception:
                return

        current_time = time.time()
        # alarm link status
        for i in range(len(self.collector.npu_id_list)):
            try:
                if self.collector.read_uptime_secs() > self.uptime_alarm_interval:
                    rdma_ip = self.collector.get_npu_ip(i)
                    alarm_dimensions = [{"Key" : "Uuid", "Value" :self.uuid}, {"Key" : "BondMac", "Value": "0"}, {"Key" : "EthMac", "Value" : "0"}, {"Key" : "RdmaIp", "Value" : "0" if rdma_ip == VAL_ERR else rdma_ip}, {"Key" : "Bdf", "Value" : self.collector.npu_bus_id_list[i]}, {"Key" : "Sn", "Value": self.collector.npu_serial_number_list[i]}]
                    if self.last_npu_config[i]["link_state"] == VAL_ERR:
                        continue
                    if self.last_npu_config[i]["link_state"] != 1:
                        if current_time - self.last_link_state_alarm_time[i] > self.alarm_interval or self.last_link_state_alarm_time[i] == 0:
                            alarmtime = time.strftime("%Y-%m-%d %H:%M:%S+0800", time.localtime(current_time))
                            alarmproxy_metric = {"CallerName": "barad_agent", "CallerKey":"PSbhht7wQLEbH6OjDXTayQ==", "AlarmTime":alarmtime, "Dimensions":alarm_dimensions, "DeviceName":"bond" + self.collector.npu_id_list[i], "DeviceId": "", 'Slot':self.collector.npu_bus_id_list[i], 'SN': self.collector.npu_serial_number_list[i]}
                            alarmproxy_event = {"AlarmId":1118, "EventName":"link_state", "FaultType": "Hardware", "FaultDesc":"rdma bond link_state" }
                            data_alarmproxy = {'sender':'alarmproxy_sender', 'datas': dict(alarmproxy_metric, **alarmproxy_event)}
                            self.put_data(data_alarmproxy)
                            self.last_link_state_alarm_time[i] = current_time
                    else:
                        self.last_link_state_alarm_time[i] = 0
            except Exception:
                pass
        if current_time - self.last_get_stat_time >= self.get_stat_interval:
            npu_stat = self.collector.get_npu_stat()
            for i in range(len(self.collector.npu_id_list)):
                delta_time = npu_stat[i]["sample_time"] - self.last_npu_stat[i]["sample_time"]
                if (
                    self.collector.npu_id_list[i] == VAL_ERR
                    or self.collector.npu_serial_number_list[i] == VAL_ERR
                    or self.collector.npu_bus_id_list[i] == VAL_ERR
                ):
                    continue
                rdma_ip = self.collector.get_npu_ip(i)
                try:
                    dimensions = {
                        "name": "bond" + self.collector.npu_id_list[i],
                        "vmip": self.vmip,
                        "uuid": self.uuid,
                        "rdma_ip": rdma_ip if rdma_ip != VAL_ERR else "0",
                        "serial": self.collector.npu_serial_number_list[i],
                        "rdma_bdf": self.collector.npu_bus_id_list[i],
                    }
                    tx_prio5_bytes = format(
                        float(
                            npu_stat[i]["tx_prio5_bytes"]
                            - self.last_npu_stat[i]["tx_prio5_bytes"]
                        )
                        * 8
                        / delta_time,
                        ".3f",
                    )
                    rx_prio5_bytes = format(
                        float(
                            npu_stat[i]["rx_prio5_bytes"]
                            - self.last_npu_stat[i]["rx_prio5_bytes"]
                        )
                        * 8
                        / delta_time,
                        ".3f",
                    )
                    tx_prio5_packets = format(
                        float(
                            npu_stat[i]["tx_prio5_packets"]
                            - self.last_npu_stat[i]["tx_prio5_packets"]
                        )
                        / delta_time,
                        ".3f",
                    )
                    rx_prio5_packets = format(
                        float(
                            npu_stat[i]["rx_prio5_packets"]
                            - self.last_npu_stat[i]["rx_prio5_packets"]
                        )
                        / delta_time,
                        ".3f",
                    )
                    tx_prio5_pause = format(
                        float(
                            npu_stat[i]["tx_prio5_pause"]
                            - self.last_npu_stat[i]["tx_prio5_pause"]
                        )
                        / delta_time,
                        ".3f",
                    )
                    rx_prio5_pause = format(
                        float(
                            npu_stat[i]["rx_prio5_pause"]
                            - self.last_npu_stat[i]["rx_prio5_pause"]
                        )
                        / delta_time,
                        ".3f",
                    )
                    rp_cnp_handled = format(
                        float(
                            npu_stat[i]["rp_cnp_handled"]
                            - self.last_npu_stat[i]["rp_cnp_handled"]
                        )
                        / delta_time,
                        ".3f",
                    )
                    out_of_sequence = format(
                        float(
                            npu_stat[i]["out_of_sequence"]
                            - self.last_npu_stat[i]["out_of_sequence"]
                        )
                        / delta_time,
                        ".3f",
                    )
                    rx_crc_errors_phy = format(
                        float(
                            npu_stat[i]["rx_crc_errors_phy"]
                            - self.last_npu_stat[i]["rx_crc_errors_phy"]
                        )
                        / delta_time,
                        ".3f",
                    )
                    batch_metric = []
                    if self.collector.npu_fw_ver_list[i] != VAL_ERR:
                        batch_metric.append(
                            {
                                "name": "fw_ver",
                                "value": self.collector.npu_fw_ver_list[i],
                            }
                        )
                    if self.last_npu_config[i]["bonding_mode"] != VAL_ERR:
                        batch_metric.append(
                            {
                                "name": "bonding_mode",
                                "value": self.last_npu_config[i]["bonding_mode"],
                            }
                        )
                    if self.last_npu_config[i]["active_mtu"] != VAL_ERR:
                        batch_metric.append(
                            {
                                "name": "active_mtu",
                                "value": self.last_npu_config[i]["active_mtu"],
                            }
                        )
                    if self.last_npu_config[i]["bond_duplicate_ip_dect"] != VAL_ERR:
                        batch_metric.append(
                            {
                                "name": "bond_duplicate_ip_dect",
                                "value": self.last_npu_config[i][
                                    "bond_duplicate_ip_dect"
                                ],
                            }
                        )
                    if self.last_npu_config[i]["link_state"] != VAL_ERR:
                        batch_metric.append(
                            {
                                "name": "link_state",
                                "value": self.last_npu_config[i]["link_state"],
                            }
                        )
                    if self.last_npu_config[i]["bond_dev_order"] != VAL_ERR:
                        batch_metric.append(
                            {
                                "name": "bond_dev_order",
                                "value": self.last_npu_config[i]["bond_dev_order"],
                            }
                        )
                    if self.last_npu_config[i]["traffic_class"] != VAL_ERR:
                        batch_metric.append(
                            {
                                "name": "traffic_class",
                                "value": self.last_npu_config[i]["traffic_class"],
                            }
                        )
                    if self.last_npu_config[i]["dcqcn_enable"] != VAL_ERR:
                        batch_metric.append(
                            {
                                "name": "dcqcn_enable",
                                "value": self.last_npu_config[i]["dcqcn_enable"],
                            }
                        )
                    if npu_stat[i]["tx_prio5_bytes"] != VAL_ERR:
                        batch_metric.append(
                            {
                                "name": "rdma_outtraffic",
                                "value": tx_prio5_bytes,
                            }
                        )
                        batch_metric.append(
                            {
                                "name": "tx_prio5_bytes",
                                "value": tx_prio5_bytes,
                            }
                        )
                    if npu_stat[i]["rx_prio5_bytes"] != VAL_ERR:
                        batch_metric.append(
                            {
                                "name": "rdma_intraffic",
                                "value": rx_prio5_bytes,
                            }
                        )
                        batch_metric.append(
                            {
                                "name": "rx_prio5_bytes",
                                "value": rx_prio5_bytes,
                            }
                        )
                    if npu_stat[i]["tx_prio5_packets"] != VAL_ERR:
                        batch_metric.append(
                            {
                                "name": "rdma_outpkg",
                                "value": tx_prio5_packets,
                            }
                        )
                        batch_metric.append(
                            {
                                "name": "tx_prio5_packets",
                                "value": tx_prio5_packets,
                            }
                        )
                    if npu_stat[i]["rx_prio5_packets"] != VAL_ERR:
                        batch_metric.append(
                            {
                                "name": "rdma_inpkg",
                                "value": rx_prio5_packets,
                            }
                        )
                        batch_metric.append(
                            {
                                "name": "rx_prio5_packets",
                                "value": rx_prio5_packets,
                            }
                        )
                    if npu_stat[i]["tx_prio5_pause"] != VAL_ERR:
                        batch_metric.append(
                            {
                                "name": "tx_prio5_pause",
                                "value": tx_prio5_pause,
                            }
                        )
                    if npu_stat[i]["rx_prio5_pause"] != VAL_ERR:
                        batch_metric.append(
                            {
                                "name": "rx_prio5_pause",
                                "value": rx_prio5_pause,
                            }
                        )
                    if npu_stat[i]["rx_crc_errors_phy"] != VAL_ERR:
                        batch_metric.append(
                            {
                                "name": "rx_crc_errors_phy",
                                "value": rx_crc_errors_phy,
                            }
                        )
                    if npu_stat[i]["out_of_sequence"] != VAL_ERR:
                        batch_metric.append(
                            {
                                "name": "out_of_sequence",
                                "value": out_of_sequence,
                            }
                        )
                    if npu_stat[i]["rp_cnp_handled"] != VAL_ERR:
                        batch_metric.append(
                            {
                                "name": "rp_cnp_handled",
                                "value": rp_cnp_handled,
                            }
                        )
                    if npu_stat[i]["link_events_down_phy"] != VAL_ERR:
                        batch_metric.append(
                            {
                                "name": "link_events_down_phy",
                                "value": npu_stat[i]["link_events_down_phy"],
                            }
                        )
                    if npu_stat[i]["mr_num"] != VAL_ERR:
                        batch_metric.append(
                            {
                                "name": "mr_num",
                                "value": npu_stat[i]["mr_num"],
                            }
                        )
                    if npu_stat[i]["cq_num"] != VAL_ERR:
                        batch_metric.append(
                            {
                                "name": "cq_num",
                                "value": npu_stat[i]["cq_num"],
                            }
                        )
                    if npu_stat[i]["pd_num"] != VAL_ERR:
                        batch_metric.append(
                            {
                                "name": "pd_num",
                                "value": npu_stat[i]["pd_num"],
                            }
                        )
                    if npu_stat[i]["qp_num"] != VAL_ERR:
                        batch_metric.append(
                            {
                                "name": "qp_num",
                                "value": npu_stat[i]["qp_num"],
                            }
                        )
                    self.handler.add_batch_metric(
                        batch=batch_metric,
                        dimensions=dimensions,
                        timestamp=self.collector.value_cast(current_time),
                    )
                except Exception:
                    continue
            self.last_npu_stat = npu_stat
            self.last_get_stat_time = current_time
        
        if current_time - self.last_get_config_time >= self.get_config_interval:
            self.last_npu_config = self.collector.get_npu_config()
            self.last_get_config_time = current_time
        
        if current_time - self.last_get_fw_ver_time >= self.get_fw_ver_interval:
            self.collector.npu_fw_ver_list = self.collector.get_fw_vers()
            self.last_get_fw_ver_time = current_time

        if len(self.handler.get_metrics()) > 0:
            data = {"sender": "nws_sender", "datas": self.handler.pop_metrics()}
            self.put_data(data)


if __name__ == "__main__":
    collector = NPUCollector()
    while 1:
        current_time = time.time()
        if current_time - collector.last_report_time >= collector.report_interval:
            collector.collect()
            collector.dump_data()
            collector.last_report_time = current_time
        else:
            time.sleep(collector.report_interval - int(current_time - collector.last_report_time))