File: //usr/local/qcloud/monitor/barad/plugin/collector/vm/ib_monitor.py
import resource
import re
import sys
import os, time
sys.path.append(os.getcwd() + '/../../../comm/')
import constant
import datetime, subprocess
from plugin_base import VmBaseCollector
from utils.metric_handler import MetricHandler
from pynvml.pynvml import *
import cutils
import json
import re
from Queue import Queue
CMD_TIMEOUT_SEC = 3
VAL_ERR = -1
CMD_TIMEOUT_VAL = "-1"
HPBW_NUM = 10
QUEUE_DEPTH=10
class NetworkCollect():
def __init__(self):
dev_pairs = self.get_rdma_devices()
self.ib_devs = dev_pairs[0] # [mlx5_0]
self.rdma_netdevs = dev_pairs[1] # [ib0]
self.eth_bdfs = dev_pairs[2] # [5e:00.0]
# alarm after system start 15 mins at least
self.uptime_alarm_interval = 60 * 15
self.uptime = self.read_uptime_secs()
self.last_read_uptime = self.uptime
def check_cmd_exist(self, cmd):
for cmdpath in os.environ['PATH'].split(':'):
if os.path.isdir(cmdpath) and cmd in os.listdir(cmdpath):
return 1
return 0
# different from utils because we need high precision
def ExecuteTimeoutCmd(self, command):
"""call shell-command and either return its output or kill it
if it doesn't normally exit within timeout seconds and return None"""
start = datetime.datetime.now()
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
while process.poll() is None:
time.sleep(0.0001)
now = datetime.datetime.now()
if (now - start).seconds> CMD_TIMEOUT_SEC:
#os.kill(process.pid, signal.SIGKILL)
#os.waitpid(-1, os.WNOHANG)
'''modified by cloudyang 2014.07.15
fix for the fd-leak bug in subprocess module in python 2.6
hope it works'''
process.kill()
process.wait()
return CMD_TIMEOUT_VAL #timeout
return process.stdout.read()
def valueCast(self, value):
try:
return int(value)
except ValueError:
try:
return round(float(value), 3)
except ValueError:
return VAL_ERR
def read_uptime_secs(self):
uptimefd = self.ExecuteTimeoutCmd('cut -d \'.\' -f 1 /proc/uptime')
if uptimefd != CMD_TIMEOUT_VAL:
return self.valueCast(uptimefd)
return VAL_ERR
# support with only number existed in file!
def read_file_contents(self, path):
try:
if os.path.exists(path):
with open(path) as fd:
contents = fd.readlines()
for line in contents:
return self.valueCast(line)
except Exception:
pass
return VAL_ERR
# support with only cmd result like:
# name : val
# so we can excute the val use this func
def read_cmd_contents(self, cmd):
cmdfd = self.ExecuteTimeoutCmd(cmd)
if cmdfd != CMD_TIMEOUT_VAL:
for line in cmdfd.splitlines():
try:
return self.valueCast(line.split(":")[1])
except Exception:
pass
return VAL_ERR
def check_pcie_width(self, bdf_list):
err_list = []
try:
for bdf in bdf_list:
max_width = cutils.FileQuickOp("/sys/bus/pci/devices/" + bdf + "/max_link_width").get_match_line(match="", single_match=True)
cur_width = cutils.FileQuickOp("/sys/bus/pci/devices/" + bdf + "/current_link_width").get_match_line(match="", single_match=True)
if cur_width != '16' or max_width != '16':
err_list.append(bdf)
except Exception:
pass
return err_list
def check_pcie_speed(self, bdf_list):
err_list = []
try:
for bdf in bdf_list:
max_speed = cutils.FileQuickOp("/sys/bus/pci/devices/" + bdf + "/max_link_speed").get_column_value(match="GT/s", split=" ", count=1)
cur_speed = cutils.FileQuickOp("/sys/bus/pci/devices/" + bdf + "/current_link_speed").get_column_value(match="GT/s", split=" ", count=1)
if cur_speed == '32':
continue
if cur_speed != '' and max_speed != '' and cur_speed != max_speed:
err_list.append(bdf)
except Exception:
pass
return err_list
def check_pcie_aer(self, bdf_list):
err_list = []
try:
for bdf in bdf_list:
aer_dev_fatal = cutils.FileQuickOp("/sys/bus/pci/devices/" + bdf + "/aer_dev_fatal").get_raw_data()
for item in aer_dev_fatal:
if item.strip().split(' ')[1] != '0':
err_list.append(bdf)
except Exception:
pass
return list(set(err_list))
def get_rdma_devices(self):
dev_cmd = 'ibdev2netdev'
dev_pairs = []
ibdevs = []
netdevs = []
ethbdfs = []
linkfd = self.ExecuteTimeoutCmd(dev_cmd)
for line in linkfd.splitlines():
if line.find("==>") == -1:
continue
try:
ibdev = line.split("==>")[0].strip().split()[0].strip()
netdev = line.split("==>")[1].strip().split()[0].strip()
if not netdev.startswith("ib"):
continue
bdfcmd = 'lspci -s $(basename $(realpath /sys/class/infiniband/'+ ibdev +'/device))'
bdffd = self.ExecuteTimeoutCmd(bdfcmd)
pattern = r'\w+:\d{2}\.\d'
match = re.search(pattern, bdffd)
bdf = match.group(0) if match else None
ibdevs.append(ibdev)
netdevs.append(netdev)
ethbdfs.append(bdf)
except:
pass
dev_pairs.append(ibdevs)
dev_pairs.append(netdevs)
dev_pairs.append(ethbdfs)
return dev_pairs
def get_monitor_info(self):
monitor_info = []
info_list = []
for i in range(len(self.eth_bdfs)):
info_key = [
'max_link_speed',
'max_link_width',
'current_link_speed',
'current_link_width',
'aer_has_fatal',
'switch_ports',
'link_state',
'qp_num',
'cq_num',
'mr_num',
'pd_num',
'cmid_num',
'ctx_num',
'link_detected',
]
info_value = [VAL_ERR] * len(info_key)
ethif_path = '/sys/bus/pci/devices/' + self.eth_bdfs[i] + '/device/max_link_speed'
if os.path.exists(ethif_path):
with open(ethif_path) as ethif_fd:
contents = ethif_fd.readlines()
for line in contents:
try:
info_value[0] = self.valueCast(line.split()[0])
except Exception:
pass
ethif_path = '/sys/bus/pci/devices/' + self.eth_bdfs[i] + '/device/max_link_width'
info_value[1] = self.read_file_contents(ethif_path)
ethif_path = '/sys/bus/pci/devices/' + self.eth_bdfs[i] + '/device/current_link_speed'
if os.path.exists(ethif_path):
with open(ethif_path) as ethif_fd:
contents = ethif_fd.readlines()
for line in contents:
try:
info_value[2] = self.valueCast(line.split()[0])
except Exception:
pass
ethif_path = '/sys/class/net/' + self.eth_bdfs[i] + '/device/current_link_width'
info_value[3] = self.read_file_contents(ethif_path)
ethif_path = '/sys/bus/pci/devices/' + self.eth_bdfs[i] + '/aer_dev_fatal'
if os.path.exists(ethif_path):
with open(ethif_path) as ethif_fd:
contents = ethif_fd.readlines()
for line in contents:
try:
if line.strip().split(' ')[1] != '0':
info_value[4] = 1
except Exception:
pass
info_value[4] = 0
ethif_path = '/sys/bus/pci/devices/' + self.eth_bdfs[i]
if os.path.exists(ethif_path):
info_value[5] = []
link = os.readlink(ethif_path)
for b in link.split('/'):
if b == '..' or b == self.eth_bdfs[i] or 'pci' in b or 'devices' in b:
continue
info_value[5].append(b)
ibdev_path = '/sys/class/infiniband/' + self.ib_devs[i] + '/ports/1/state'
if os.path.exists(ibdev_path):
with open(ibdev_path) as ibdev_fd:
contents = ibdev_fd.readlines()
info_value[6] = 0
for line in contents:
if line.find("ACTIVE") != -1:
info_value[6] = 1
break
if info_value[6] == 0 and self.uptime < self.uptime_alarm_interval: #abnormal in start period
ibdev_fd.flush()
os.fsync(ibdev_fd.fileno())
info_value[6] = VAL_ERR
ibdevfd = self.ExecuteTimeoutCmd('/opt/mellanox/iproute2/sbin/rdma -d resource show')
if ibdevfd != CMD_TIMEOUT_VAL:
for line in ibdevfd.splitlines():
if line.find(self.ib_devs[i]) != -1:
try:
info_value[7] = self.valueCast(line.split("qp ")[1].split(" ")[0])
info_value[8] = self.valueCast(line.split("cq ")[1].split(" ")[0])
info_value[9] = self.valueCast(line.split("mr ")[1].split(" ")[0])
info_value[10] = self.valueCast(line.split("pd ")[1].split(" ")[0])
info_value[11] = self.valueCast(line.split("cm_id ")[1].split(" ")[0])
info_value[12] = self.valueCast(line.split("ctx ")[1].split(" ")[0])
break
except Exception:
pass
ibdev_path = '/sys/class/infiniband/' + self.ib_devs[i] + '/ports/1/phys_state'
if os.path.exists(ibdev_path):
with open(ibdev_path) as ibdev_fd:
contents = ibdev_fd.readlines()
info_value[13] = 0
for line in contents:
if line.find("LinkUp") != -1:
info_value[13] = 1
break
if info_value[13] == 0 and self.uptime < self.uptime_alarm_interval: #abnormal in start period
ibdev_fd.flush()
os.fsync(ibdev_fd.fileno())
info_value[13] = VAL_ERR
info_list.append(dict(zip(info_key, info_value)))
monitor_info.append(info_list)
return monitor_info
def get_config_info(self):
config_info = []
config_info_list = []
for i in range(len(self.ib_devs)):
info_key = [
'active_mtu',
'ofed_version',
'nv_peer_mem_state',
'nv_peer_mem_ver',
'fw_ver',
'ats_enabled',
'acs',
"serial",
'mrss',
]
info_value = [VAL_ERR] * len(info_key)
info_value[0] = self.read_file_contents('/sys/class/net/' + self.rdma_netdevs[i] +'/mtu')
ibdevfd = self.ExecuteTimeoutCmd('ofed_info -s')
for line in ibdevfd.splitlines():
try:
ver_str = line.split('-')[1].replace('.', '') + line.split('-')[2].replace('.', '').replace(':', '')
info_value[1] = self.valueCast(ver_str)
except Exception:
pass
ibdevfd = self.ExecuteTimeoutCmd('lsmod')
if ibdevfd == "":
info_value[2] = 0
else:
for line in ibdevfd.splitlines():
if line.find('nvidia_peermem') != -1:
info_value[2] = 1
break
if info_value[2] != VAL_ERR:
ibdevfd = self.ExecuteTimeoutCmd('modinfo nvidia_peermem')
for line in ibdevfd.splitlines():
if line.find('version') != -1:
try:
ver_str = line.split(':')[1].strip().replace('.', '')
info_value[3] = self.valueCast(ver_str)
break
except:
pass
ethiffd = self.ExecuteTimeoutCmd('cat /sys/class/infiniband/' + self.ib_devs[i] + '/fw_ver')
if ethiffd != VAL_ERR:
try:
info_value[4] = self.valueCast(ethiffd.replace('.', ''))
except Exception:
pass
ethiffd = self.ExecuteTimeoutCmd('lspci -s ' + self.eth_bdfs[i] + ' -v')
if ethiffd != CMD_TIMEOUT_VAL:
info_value[5] = 0
for line in ethiffd.splitlines():
if line.find('Capabilities: [1b0]') != -1:
info_value[5] = 1
ethiffd = self.ExecuteTimeoutCmd('setpci -v -s ' + self.eth_bdfs[i] + ' f2a.w')
if ethiffd != CMD_TIMEOUT_VAL:
for line in ethiffd.splitlines():
try:
line_val = line.split(" ")[-1]
info_value[6] = int('0x' + line_val, 16)
except Exception:
pass
ethiffd = self.ExecuteTimeoutCmd('lspci -vvv -s ' + self.eth_bdfs[i])
if ethiffd != CMD_TIMEOUT_VAL:
for line in ethiffd.splitlines():
try:
if line.find("Serial number") != -1:
info_value[7] = line.split(":")[1].strip()
break
except Exception:
pass
ethiffd = self.ExecuteTimeoutCmd('lspci -s ' + self.eth_bdfs[i] + ' -vv')
if ethiffd != CMD_TIMEOUT_VAL:
for line in ethiffd.splitlines():
if line.find('MaxReadReq') != -1:
line_val = line.split(" ")[4]
info_value[8] = self.valueCast(line_val)
break
config_info_list.append(dict(zip(info_key, info_value)))
config_info.append(config_info_list)
return config_info
def get_netdevflow_stats(self):
netdev_stas = []
for i in range(len(self.ib_devs)):
netdev_key = [
'port_rcv_data',
'port_rcv_packets',
'np_ecn_marked_roce_packets',
'rp_cnp_handled',
'req_cqe_error',
'req_remote_access_errors',
'req_remote_invalid_request',
'duplicate_request',
'resp_cqe_error',
'resp_local_length_error',
'resp_remote_access_errors',
'rx_icrc_encapsulated',
'port_xmit_wait',
'rnr_nak_retry_err',
"out_of_buffer",
"out_of_sequence",
"packet_seq_err",
"local_ack_timeout_err",
"port_xmit_data",
"port_xmit_packets"
]
netdev_value = [VAL_ERR] * len(netdev_key)
netdev_value[0] = self.read_file_contents('/sys/class/infiniband/' + self.ib_devs[i] + '/ports/1/counters/port_rcv_data')
netdev_value[1] = self.read_file_contents('/sys/class/infiniband/' + self.ib_devs[i] + '/ports/1/counters/port_rcv_packets')
netdev_value[2] = self.read_file_contents('/sys/class/infiniband/' + self.ib_devs[i] + '/ports/1/hw_counters/np_ecn_marked_roce_packets')
netdev_value[3] = self.read_file_contents('/sys/class/infiniband/' + self.ib_devs[i] + '/ports/1/hw_counters/rp_cnp_handled')
netdev_value[4] = self.read_file_contents('/sys/class/infiniband/' + self.ib_devs[i] + '/ports/1/hw_counters/req_cqe_error')
netdev_value[5] = self.read_file_contents('/sys/class/infiniband/' + self.ib_devs[i] + '/ports/1/hw_counters/req_remote_access_errors')
netdev_value[6] = self.read_file_contents('/sys/class/infiniband/' + self.ib_devs[i] + '/ports/1/hw_counters/req_remote_invalid_request')
netdev_value[7] = self.read_file_contents('/sys/class/infiniband/' + self.ib_devs[i] + '/ports/1/hw_counters/duplicate_request')
netdev_value[8] = self.read_file_contents('/sys/class/infiniband/' + self.ib_devs[i] + '/ports/1/hw_counters/resp_cqe_error')
netdev_value[9] = self.read_file_contents('/sys/class/infiniband/' + self.ib_devs[i] + '/ports/1/hw_counters/resp_local_length_error')
netdev_value[10] = self.read_file_contents('/sys/class/infiniband/' + self.ib_devs[i] + '/ports/1/hw_counters/resp_remote_access_errors')
netdev_value[11] = self.read_file_contents('/sys/class/infiniband/' + self.ib_devs[i] + '/ports/1/hw_counters/rx_icrc_encapsulated')
netdev_value[12] = self.read_file_contents('/sys/class/infiniband/' + self.ib_devs[i] + '/ports/1/counters/port_xmit_wait')
netdev_value[13] = self.read_file_contents('/sys/class/infiniband/' + self.ib_devs[i] + '/ports/1/hw_counters/rnr_nak_retry_err')
netdev_value[14] = self.read_file_contents('/sys/class/infiniband/' + self.ib_devs[i] + '/ports/1/hw_counters/out_of_buffer')
netdev_value[15] = self.read_file_contents('/sys/class/infiniband/' + self.ib_devs[i] + '/ports/1/hw_counters/out_of_sequence')
netdev_value[16] = self.read_file_contents('/sys/class/infiniband/' + self.ib_devs[i] + '/ports/1/hw_counters/packet_seq_err')
netdev_value[17] = self.read_file_contents('/sys/class/infiniband/' + self.ib_devs[i] + '/ports/1/hw_counters/local_ack_timeout_err')
netdev_value[18] = self.read_file_contents('/sys/class/infiniband/' + self.ib_devs[i] + '/ports/1/counters/port_xmit_data')
netdev_value[19] = self.read_file_contents('/sys/class/infiniband/' + self.ib_devs[i] + '/ports/1/counters/port_xmit_packets')
netdev_stas.append(dict(zip(netdev_key, netdev_value)))
return netdev_stas
class RDMACollector(VmBaseCollector):
def init(self):
self.set_frequency(10)
self.collector = NetworkCollect()
self.handler = MetricHandler()
self.handler.namespace = 'qce/cvm'
self.is_dns_normal = 1
try:
self.uuid = self.get_vm_uuid()
self.vmip = self.get_vmip()
except Exception:
self.is_dns_normal = 0
pass
# last report time to barad, only hpbw can report per 10s, others are 1 min
self.last_report_time = 0
self.report_interval = 50 # Error elimination
self.last_netdev_stats = self.collector.get_netdevflow_stats()
self.config_info = self.collector.get_config_info()
self.monitor_info = self.collector.get_monitor_info()
# set monitor info sample interval to 60s
self.monitor_info_interval = 60
self.last_monitor_info_time = 0
# set config info sample interval to 5 min
self.config_info_interval = 300
self.last_config_info_time = 0
# alarm interval 1 hour
self.alarm_interval = 1 * 60 * 60
self.last_link_state_alarm_time = [0] * len(self.collector.rdma_netdevs)
self.last_link_detected_alarm_time = [0] * len(self.collector.rdma_netdevs)
def is_rdma_ready(self):
fd = self.collector.ExecuteTimeoutCmd("ibdev2netdev")
ibdevs = []
netdevs = []
for line in fd.splitlines():
if line.find("==>") == -1:
continue
try:
ibdev = line.split("==>")[0].strip().split()[0].strip()
netdev = line.split("==>")[1].strip().split()[0].strip()
if not netdev.startswith("ib"):
continue
ibdevs.append(ibdev)
netdevs.append(netdev)
except:
pass
if ibdevs != self.collector.ib_devs or netdevs != self.collector.rdma_netdevs:
return 0
return 1
def rdma_dev_reinit(self):
dev_pairs = self.collector.get_rdma_devices()
self.collector.ib_devs = dev_pairs[0] # [mln5_0]
self.collector.rdma_netdevs = dev_pairs[1] # [ib0]
self.collector.eth_bdfs = dev_pairs[2] # [5e:00.0]
self.last_netdev_stats = self.collector.get_netdevflow_stats()
self.config_info = self.collector.get_config_info()
self.monitor_info = self.collector.get_monitor_info()
def do_collect(self):
#not rdma device or bonding device
rdma_ready = self.is_rdma_ready()
if rdma_ready == 0:
self.rdma_dev_reinit() #reinit if rdma kernel module does not probe when barad start
if not rdma_ready or not self.is_dns_normal:
self.is_dns_normal = 1
try:
self.uuid = self.get_vm_uuid()
self.vmip = self.get_vmip()
except Exception:
self.is_dns_normal = 0
pass
return
if self.collector.uptime < self.collector.uptime_alarm_interval:
self.collector.uptime = self.collector.read_uptime_secs()
current_time = time.time()
for i in range(len(self.collector.rdma_netdevs)):
serial = self.config_info[0][i]['serial']
alarm_dimensions = [{"Key" : "Uuid", "Value" :self.uuid}, {"Key" : "BondMac", "Value": "0"}, {"Key" : "EthMac", "Value" : "0"}, {"Key" : "RdmaIp", "Value" : "0"}, {"Key" : "Bdf", "Value" : "0" if len(self.collector.eth_bdfs) == 0 else self.collector.eth_bdfs[i]}, {"Key" : "Sn", "Value": "0" if serial == VAL_ERR else serial}]
try:
if self.monitor_info[0][i]['link_state'] == VAL_ERR:
continue
# link state
if self.monitor_info[0][i]['link_state'] != 1:
if current_time - self.last_link_state_alarm_time[i] > self.alarm_interval or self.last_link_state_alarm_time[i] == 0:
alarmtime = time.strftime("%Y-%m-%d %H:%M:%S+0800", time.localtime(current_time))
alarmproxy_metric = {"CallerName": "barad_agent", "CallerKey":"PSbhht7wQLEbH6OjDXTayQ==", "AlarmTime":alarmtime, "Dimensions":alarm_dimensions, "DeviceName":self.collector.rdma_netdevs[i], "DeviceId": "", 'Slot':self.collector.eth_bdfs[i], 'SN': serial}
alarmproxy_event = {"AlarmId":1138, "EventName":"ib_link_state", "FaultType": "Hardware", "FaultDesc":"ib dgx link_state" }
data_alarmproxy = {'sender':'alarmproxy_sender', 'datas': dict(alarmproxy_metric, **alarmproxy_event)}
self.put_data(data_alarmproxy)
self.last_link_state_alarm_time[i] = current_time
else:
self.last_link_state_alarm_time[i] = 0
except Exception:
pass
try:
if self.monitor_info[0][i]['link_detected'] == VAL_ERR:
continue
# link state
if self.monitor_info[0][i]['link_detected'] != 1:
if current_time - self.last_link_detected_alarm_time[i] > self.alarm_interval or self.last_link_detected_alarm_time[i] == 0:
alarmtime = time.strftime("%Y-%m-%d %H:%M:%S+0800", time.localtime(current_time))
alarmproxy_metric = {"CallerName": "barad_agent", "CallerKey":"PSbhht7wQLEbH6OjDXTayQ==", "AlarmTime":alarmtime, "Dimensions":alarm_dimensions, "DeviceName":self.collector.rdma_netdevs[i], "DeviceId": "", 'Slot':self.collector.eth_bdfs[i], 'SN': serial}
alarmproxy_event = {"AlarmId":1117, "EventName":"link_detected", "FaultType": "Hardware", "FaultDesc":"rdma eth interface link detected" }
data_alarmproxy = {'sender':'alarmproxy_sender', 'datas': dict(alarmproxy_metric, **alarmproxy_event)}
self.put_data(data_alarmproxy)
self.last_link_detected_alarm_time[i] = current_time
else:
self.last_link_detected_alarm_time[i] = 0
except Exception:
pass
if current_time - self.last_monitor_info_time > self.monitor_info_interval:
self.monitor_info = self.collector.get_monitor_info()
self.last_monitor_info_time = current_time
if current_time - self.last_config_info_time > self.config_info_interval:
self.config_info = self.collector.get_config_info()
self.last_config_info_time = current_time
if current_time - self.last_report_time >= self.report_interval:
netdev_stats = self.collector.get_netdevflow_stats()
for i in range(len(self.collector.rdma_netdevs)):
delta_time = current_time - self.last_report_time
if self.config_info[0][i]['serial'] == VAL_ERR or self.collector.eth_bdfs[i] == VAL_ERR:
continue
try:
replaced_devname = self.collector.ib_devs[i].replace("mlx5_", "bond")
dimensions = {
'name': replaced_devname,
'vmip': self.vmip,
'uuid': self.uuid,
'rdma_ip': "0",
'serial': self.config_info[0][i]['serial'],
'rdma_bdf': self.collector.eth_bdfs[i],
}
batch_metric_part = []
if current_time - self.last_report_time >= self.report_interval:
port_rcv_data = format(float(netdev_stats[i]['port_rcv_data'] - self.last_netdev_stats[i]['port_rcv_data']) * 8 *4 / delta_time, '.3f')
port_rcv_packets = format(float(netdev_stats[i]['port_rcv_packets'] - self.last_netdev_stats[i]['port_rcv_packets']) / delta_time, '.3f')
port_xmit_data = format(float(netdev_stats[i]['port_xmit_data'] - self.last_netdev_stats[i]['port_xmit_data']) * 8 *4 / delta_time, '.3f')
port_xmit_packets = format(float(netdev_stats[i]['port_xmit_packets'] - self.last_netdev_stats[i]['port_xmit_packets']) / delta_time, '.3f')
np_ecn_marked_roce_packets = format(float(netdev_stats[i]['np_ecn_marked_roce_packets'] - self.last_netdev_stats[i]['np_ecn_marked_roce_packets']) / delta_time, '.3f')
rp_cnp_handled = format(float(netdev_stats[i]['rp_cnp_handled'] - self.last_netdev_stats[i]['rp_cnp_handled']) / delta_time, '.3f')
req_cqe_error = format(float(netdev_stats[i]['req_cqe_error'] - self.last_netdev_stats[i]['req_cqe_error']) / delta_time, '.3f')
req_remote_access_errors = format(float(netdev_stats[i]['req_remote_access_errors'] - self.last_netdev_stats[i]['req_remote_access_errors']) / delta_time, '.3f')
req_remote_invalid_request = format(float(netdev_stats[i]['req_remote_invalid_request'] - self.last_netdev_stats[i]['req_remote_invalid_request']) / delta_time, '.3f')
duplicate_request = format(float(netdev_stats[i]['duplicate_request'] - self.last_netdev_stats[i]['duplicate_request']) / delta_time, '.3f')
resp_cqe_error = format(float(netdev_stats[i]['resp_cqe_error'] - self.last_netdev_stats[i]['resp_cqe_error']) / delta_time, '.3f')
resp_local_length_error = format(float(netdev_stats[i]['resp_local_length_error'] - self.last_netdev_stats[i]['resp_local_length_error']) / delta_time, '.3f')
resp_remote_access_errors = format(float(netdev_stats[i]['resp_remote_access_errors'] - self.last_netdev_stats[i]['resp_remote_access_errors']) / delta_time, '.3f')
rx_icrc_encapsulated = format(float(netdev_stats[i]['rx_icrc_encapsulated'] - self.last_netdev_stats[i]['rx_icrc_encapsulated']) / delta_time, '.3f')
port_xmit_wait = format(float(netdev_stats[i]['port_xmit_wait'] - self.last_netdev_stats[i]['port_xmit_wait']) / delta_time, '.3f')
rnr_nak_retry_err = format(float(netdev_stats[i]['rnr_nak_retry_err'] - self.last_netdev_stats[i]['rnr_nak_retry_err']) / delta_time, '.3f')
out_of_buffer = format(float(netdev_stats[i]['out_of_buffer'] - self.last_netdev_stats[i]['out_of_buffer']) / delta_time, '.3f')
out_of_sequence = format(float(netdev_stats[i]['out_of_sequence'] - self.last_netdev_stats[i]['out_of_sequence']) / delta_time, '.3f')
packet_seq_err = format(float(netdev_stats[i]['packet_seq_err'] - self.last_netdev_stats[i]['packet_seq_err']) / delta_time, '.3f')
local_ack_timeout_err = format(float(netdev_stats[i]['local_ack_timeout_err'] - self.last_netdev_stats[i]['local_ack_timeout_err']) / delta_time, '.3f')
if self.config_info[0][i]['fw_ver'] != VAL_ERR:
batch_metric_part.append({'name': 'fw_ver', 'value': self.config_info[0][i]['fw_ver']})
if self.config_info[0][i]['ats_enabled'] != VAL_ERR and not(self.collector.uptime < self.collector.uptime_alarm_interval and self.config_info[0][i]['ats_enabled'] != 0):
batch_metric_part.append({'name': 'ats_enabled', 'value': self.config_info[0][i]['ats_enabled']})
if self.config_info[0][i]['acs'] != VAL_ERR and not(self.collector.uptime < self.collector.uptime_alarm_interval and self.config_info[0][i]['acs'] != 0):
batch_metric_part.append({'name': 'acs', 'value': self.config_info[0][i]['acs']})
if self.config_info[0][i]['mrss'] != VAL_ERR and not(self.collector.uptime < self.collector.uptime_alarm_interval and self.config_info[0][i]['mrss'] != 4096):
batch_metric_part.append({'name': 'mrss', 'value': self.config_info[0][i]['mrss']})
if self.monitor_info[0][i]['link_state'] != VAL_ERR and not(self.collector.uptime < self.collector.uptime_alarm_interval and self.monitor_info[0][i]['link_state'] != 1):
batch_metric_part.append({'name': 'link_state', 'value': self.monitor_info[0][i]['link_state']})
if self.monitor_info[0][i]['link_detected'] != VAL_ERR and not(self.collector.uptime < self.collector.uptime_alarm_interval and self.monitor_info[0][i]['link_state'] != 1):
batch_metric_part.append({'name': 'link_detected', 'value': self.monitor_info[0][i]['link_detected']})
if self.config_info[0][i]['active_mtu'] != VAL_ERR and not(self.collector.uptime < self.collector.uptime_alarm_interval and self.config_info[0][i]['active_mtu'] != 9100):
batch_metric_part.append({'name': 'active_mtu', 'value': self.config_info[0][i]['active_mtu']})
if self.config_info[0][i]['ofed_version'] != VAL_ERR:
batch_metric_part.append({'name': 'ofed_version', 'value': self.config_info[0][i]['ofed_version']})
if self.config_info[0][i]['nv_peer_mem_state'] != VAL_ERR and not(self.collector.uptime < self.collector.uptime_alarm_interval and self.config_info[0][i]['nv_peer_mem_state'] != 1):
batch_metric_part.append({'name': 'nv_peer_mem_state', 'value': self.config_info[0][i]['nv_peer_mem_state']})
if self.config_info[0][i]['nv_peer_mem_ver'] != VAL_ERR:
batch_metric_part.append({'name': 'nv_peer_mem_ver', 'value': self.config_info[0][i]['nv_peer_mem_ver']})
if self.monitor_info[0][i]['qp_num'] != VAL_ERR:
batch_metric_part.append({'name': 'qp_num', 'value': self.monitor_info[0][i]['qp_num']})
if self.monitor_info[0][i]['cq_num'] != VAL_ERR:
batch_metric_part.append({'name': 'cq_num', 'value': self.monitor_info[0][i]['cq_num']})
if self.monitor_info[0][i]['mr_num'] != VAL_ERR:
batch_metric_part.append({'name': 'mr_num', 'value': self.monitor_info[0][i]['mr_num']})
if self.monitor_info[0][i]['pd_num'] != VAL_ERR:
batch_metric_part.append({'name': 'pd_num', 'value': self.monitor_info[0][i]['pd_num']})
if self.monitor_info[0][i]['cmid_num'] != VAL_ERR:
batch_metric_part.append({'name': 'cmid_num', 'value': self.monitor_info[0][i]['cmid_num']})
if self.monitor_info[0][i]['ctx_num'] != VAL_ERR:
batch_metric_part.append({'name': 'ctx_num', 'value': self.monitor_info[0][i]['ctx_num']})
if netdev_stats[i]['port_rcv_data'] != VAL_ERR:
batch_metric_part.append({'name': 'rx_prio5_bytes', 'value': port_rcv_data})
batch_metric_part.append({'name': 'rdma_intraffic', 'value': port_rcv_data})
if netdev_stats[i]['port_rcv_packets'] != VAL_ERR:
batch_metric_part.append({'name': 'rx_prio5_packets', 'value': port_rcv_packets})
batch_metric_part.append({'name': 'rdma_inpkg', 'value': port_rcv_packets})
if netdev_stats[i]['port_xmit_data'] != VAL_ERR:
batch_metric_part.append({'name': 'tx_prio5_bytes', 'value': port_xmit_data})
batch_metric_part.append({'name': 'rdma_outtraffic', 'value': port_xmit_data})
if netdev_stats[i]['port_xmit_packets'] != VAL_ERR:
batch_metric_part.append({'name': 'tx_prio5_packets', 'value': port_xmit_packets})
batch_metric_part.append({'name': 'rdma_outpkg', 'value': port_xmit_packets})
if netdev_stats[i]['np_ecn_marked_roce_packets'] != VAL_ERR:
batch_metric_part.append({'name': 'np_ecn_marked_roce_packets', 'value': np_ecn_marked_roce_packets})
if netdev_stats[i]['rp_cnp_handled'] != VAL_ERR:
batch_metric_part.append({'name': 'rp_cnp_handled', 'value': rp_cnp_handled})
if netdev_stats[i]['req_cqe_error'] != VAL_ERR:
batch_metric_part.append({'name': 'req_cqe_error', 'value': req_cqe_error})
if netdev_stats[i]['req_remote_access_errors'] != VAL_ERR:
batch_metric_part.append({'name': 'req_remote_access_errors', 'value': req_remote_access_errors})
if netdev_stats[i]['req_remote_invalid_request'] != VAL_ERR:
batch_metric_part.append({'name': 'req_remote_invalid_request', 'value': req_remote_invalid_request})
if netdev_stats[i]['duplicate_request'] != VAL_ERR:
batch_metric_part.append({'name': 'duplicate_request', 'value': duplicate_request})
if netdev_stats[i]['resp_cqe_error'] != VAL_ERR:
batch_metric_part.append({'name': 'resp_cqe_error', 'value': resp_cqe_error})
if netdev_stats[i]['resp_local_length_error'] != VAL_ERR:
batch_metric_part.append({'name': 'resp_local_length_error', 'value': resp_local_length_error})
if netdev_stats[i]['resp_remote_access_errors'] != VAL_ERR:
batch_metric_part.append({'name': 'resp_remote_access_errors', 'value': resp_remote_access_errors})
if netdev_stats[i]['rx_icrc_encapsulated'] != VAL_ERR:
batch_metric_part.append({'name': 'rx_icrc_encapsulated', 'value': rx_icrc_encapsulated})
if netdev_stats[i]['port_xmit_wait'] != VAL_ERR:
batch_metric_part.append({'name': 'port_xmit_wait', 'value': port_xmit_wait})
if netdev_stats[i]['rnr_nak_retry_err'] != VAL_ERR:
batch_metric_part.append({'name': 'rnr_nak_retry_err', 'value': rnr_nak_retry_err})
if netdev_stats[i]['out_of_buffer'] != VAL_ERR:
batch_metric_part.append({'name': 'out_of_buffer', 'value': out_of_buffer})
if netdev_stats[i]['out_of_sequence'] != VAL_ERR:
batch_metric_part.append({'name': 'out_of_sequence', 'value': out_of_sequence})
if netdev_stats[i]['packet_seq_err'] != VAL_ERR:
batch_metric_part.append({'name': 'packet_seq_err', 'value': packet_seq_err})
if netdev_stats[i]['local_ack_timeout_err'] != VAL_ERR:
batch_metric_part.append({'name': 'local_ack_timeout_err', 'value': local_ack_timeout_err})
self.handler.add_batch_metric(batch=batch_metric_part, dimensions=dimensions, timestamp=self.collector.valueCast(current_time))
except Exception:
pass
if current_time - self.last_report_time >= self.report_interval:
self.last_netdev_stats = self.collector.get_netdevflow_stats()
self.last_report_time = current_time
if (len(self.handler.get_metrics()) > 0):
data = {'sender': 'nws_sender', 'datas': self.handler.pop_metrics()}
self.put_data(data)
def main():
collector = RDMACollector()
while 1:
collector.collect()
collector.dump_data()
time.sleep(60)
if __name__ == '__main__':
main()