File: //usr/local/qcloud/monitor/barad/lib/pydcmi/pydcmi.py
from ctypes import *
from ctypes.util import find_library
import sys
import os
import threading
import string
DCMI_OK = 0
DCMI_ERR_CODE_INVALID_PARAMETER = -8001
DCMI_ERR_CODE_OPER_NOT_PERMITTED = -8002
DCMI_ERR_CODE_MEM_OPERATE_FAIL = -8003
DCMI_ERR_CODE_SECURE_FUN_FAIL = -8004
DCMI_ERR_CODE_INNER_ERR = -8005
DCMI_ERR_CODE_TIME_OUT = -8006
DCMI_ERR_CODE_INVALID_DEVICE_ID = -8007
DCMI_ERR_CODE_DEVICE_NOT_EXIST = -8008
DCMI_ERR_CODE_IOCTL_FAIL = -8009
DCMI_ERR_CODE_SEND_MSG_FAIL = -8010
DCMI_ERR_CODE_RECV_MSG_FAIL = -8011
DCMI_ERR_CODE_NOT_REDAY = -8012
DCMI_ERR_CODE_NOT_SUPPORT_IN_CONTAINER = -8013
DCMI_ERR_CODE_RESET_FAIL = -8015
DCMI_ERR_CODE_ABORT_OPERATE = -8016
DCMI_ERR_CODE_IS_UPGRADING = -8017
DCMI_ERR_CODE_RESOURCE_OCCUPIED = -8020
DCMI_ERR_CODE_NOT_SUPPORT = -8255
_dcmiDeviceType_t = c_uint
DCMI_DEVICE_TYPE_DDR = 0
DCMI_DEVICE_TYPE_SRAM = 1
DCMI_DEVICE_TYPE_HBM = 2
DCMI_DEVICE_TYPE_NPU = 3
DCMI_HBM_RECORDED_SINGLE_ADDR = 4
DCMI_HBM_RECORDED_MULTI_ADDR = 5
_dcmiFreqType_t = c_uint
DCMI_FREQ_TYPE_MEM = 1
DCMI_FREQ_TYPE_CPU = 2
DCMI_FREQ_TYPE_HBM = 6
DCMI_FREQ_TYPE_AI_CORE_CUR = 7
DCMI_FREQ_TYPE_AI_CORE_MAX = 9
DCMI_FREQ_TYPE_VECTOR_CORE_CUR = 12
_dcmiInputType_t = c_uint
DCMI_INPUT_TPYE_AI_CORE = 2
DCMI_INPUT_TPYE_AI_CPU = 3
DCMI_INPUT_TPYE_CTL_CPU = 4
class _PrintableStructure(Structure):
_fmt_ = {}
def __str__(self):
result = []
for x in self._fields_:
key = x[0]
value = getattr(self, key)
fmt = "%s"
if key in self._fmt_:
fmt = self._fmt_[key]
elif "<default>" in self._fmt_:
fmt = self._fmt_["<default>"]
result.append(("%s: " + fmt) % (key, value))
return self.__class__.__name__ + "(" + string.join(result, ", ") + ")"
class dcmiPciInfo_t(_PrintableStructure):
_fields_ = [
('deviceid', c_uint),
('venderid', c_uint),
('subvenderid', c_uint),
('subdeviceid', c_uint),
('bdf_deviceid', c_uint),
('bdf_busid', c_uint),
('bdf_funcid', c_uint),
]
_fmt_ = {}
class dcmiEccInfo_t(_PrintableStructure):
_fields_ = [
('enable_flag', c_int),
('sb_err_cnt', c_uint),
('db_err_cnt', c_uint),
('total_sb_err_cnt', c_uint),
('total_db_err_cnt', c_uint),
('isolated_sb_err_cnt', c_uint),
('isolated_db_err_cnt', c_uint),
]
_fmt_ = {}
class dcmiHbmInfo_t(_PrintableStructure):
_fields_ = [
('memory_size', c_ulonglong),
('freq', c_uint),
('memory_usage', c_ulonglong),
('temp', c_uint),
('bandwith_util_rate', c_uint),
]
_fmt_ = {}
MAX_CHIP_NAME_LEN = 32
class dcmiChipInfo_t(_PrintableStructure):
_fields_ = [
('chip_type' , c_char * MAX_CHIP_NAME_LEN),
('chip_name' , c_char * MAX_CHIP_NAME_LEN),
('chip_ver' , c_char * MAX_CHIP_NAME_LEN),
('aicore_cnt' , c_uint),
]
_fmt_ = {}
MAX_LENTH = 256
class dcmiElabelInfo_t(_PrintableStructure):
_fields_ = [
('product_name', c_char * MAX_LENTH),
('model', c_char * MAX_LENTH),
('manufacturer', c_char * MAX_LENTH),
('manufacturer_date', c_char * MAX_LENTH),
('serial_number', c_char * MAX_LENTH),
]
_fmt_ = {}
class dcmiMemoryInfo_t(_PrintableStructure):
_fields_ = [
('memory_size', c_ulonglong),
('memory_available',c_ulonglong),
('freq', c_uint),
('hugepagesize', c_ulong),
('hugepages_total', c_ulong),
('hugepages_free', c_ulong),
('utiliza', c_uint),
('reserve', c_char * 60),
]
_fmt_ = {}
## Lib loading ##
dcmiLib = None
libLoadLock = threading.Lock()
_dcmiLib_refcount = 0
def _dcmiCheckReturn(ret):
if (ret != DCMI_OK):
raise ret
return ret
## Function access ##
_dcmiGetFunctionPointer_cache = dict()
def _dcmiGetFunctionPointer(name):
global dcmiLib
if name in _dcmiGetFunctionPointer_cache:
return _dcmiGetFunctionPointer_cache[name]
libLoadLock.acquire()
try:
# ensure library was loaded
if (dcmiLib == None):
raise -1
try:
_dcmiGetFunctionPointer_cache[name] = getattr(dcmiLib, name)
return _dcmiGetFunctionPointer_cache[name]
except AttributeError:
raise -1
finally:
# lock is always freed
libLoadLock.release()
## C function wrappers ##
def dcmiInit():
_LoadDcmiibrary()
#
# Initialize the library
#
fn = _dcmiGetFunctionPointer("dcmi_init")
ret = fn()
_dcmiCheckReturn(ret)
# Atomically update refcount
global _dcmiLib_refcount
libLoadLock.acquire()
_dcmiLib_refcount += 1
libLoadLock.release()
return None
def _LoadDcmiibrary():
'''
Load the library if it isn't loaded already
'''
global dcmiLib
if (dcmiLib == None):
# lock to ensure only one caller loads the library
libLoadLock.acquire()
try:
# ensure the library still isn't loaded
if (dcmiLib == None):
try:
dcmiLib = CDLL("libdcmi.so")
except OSError as ose:
_dcmiCheckReturn(0)
if (dcmiLib == None):
_dcmiCheckReturn(0)
finally:
# lock is always freed
libLoadLock.release()
def dcmiShutdown():
# Atomically update refcount
global _dcmiLib_refcount
libLoadLock.acquire()
if (0 < _dcmiLib_refcount):
_dcmiLib_refcount -= 1
libLoadLock.release()
return None
#int dcmi_get_driver_version(char *driver_ver, unsigned int len)
def dcmiGetDriverVersion():
c_len = c_uint(64)
c_driver_ver = create_string_buffer(64)
fn = _dcmiGetFunctionPointer("dcmi_get_driver_version")
ret = fn(c_driver_ver, c_len)
_dcmiCheckReturn(ret)
return c_driver_ver.value
#int dcmi_get_card_list(int *card_num, int *card_list, int list_len)
def dcmiGetCardList():
c_card_num = c_uint()
c_list_len = c_uint(16)
card_list_array = c_uint * c_list_len.value
c_card_list = card_list_array()
fn = _dcmiGetFunctionPointer("dcmi_get_card_list")
ret = fn(byref(c_card_num), c_card_list, c_list_len)
_dcmiCheckReturn(ret)
return c_card_num.value, c_card_list[0:c_list_len.value]
#int dcmi_get_device_num_in_card(int card_id, int *device_num)
def dcmiGetDeviceNumInCard(card_id):
c_card_id = c_uint(card_id)
c_device_num = c_uint()
fn = _dcmiGetFunctionPointer("dcmi_get_device_num_in_card")
ret = fn(c_card_id, byref(c_device_num))
_dcmiCheckReturn(ret)
return c_device_num.value
#int dcmi_get_device_pcie_info(int card_id, int device_id, struct dcmi_pcie_info *pcie_info)
def dcmiGetDevicePcieInfo(card_id, device_id):
c_card_id = c_uint(card_id)
c_device_id = c_uint(device_id)
pciinfo = dcmiPciInfo_t()
fn = _dcmiGetFunctionPointer("dcmi_get_device_pcie_info")
ret = fn(c_card_id, c_device_id, byref(pciinfo))
_dcmiCheckReturn(ret)
return pciinfo.bdf_busid,pciinfo.bdf_deviceid,pciinfo.bdf_funcid
#int dcmi_get_device_chip_info(int card_id, int device_id, struct dcmi_chip_info *chip_info)
def dcmiGetDeviceChipInfoName(card_id, device_id):
c_card_id = c_uint(card_id)
c_device_id = c_uint(device_id)
chipinfo = dcmiChipInfo_t()
fn = _dcmiGetFunctionPointer("dcmi_get_device_chip_info")
ret = fn(c_card_id, c_device_id, byref(chipinfo))
_dcmiCheckReturn(ret)
return chipinfo.chip_name
#int dcmi_get_device_elabel_info(int card_id, int device_id, struct dcmi_elabel_info *elabel_info)
def dcmiGetDeviceSerial(card_id, device_id):
c_card_id = c_uint(card_id)
c_device_id = c_uint(device_id)
elabelinfo = dcmiElabelInfo_t()
fn = _dcmiGetFunctionPointer("dcmi_get_device_elabel_info")
ret = fn(c_card_id, c_device_id, byref(elabelinfo))
#print(ret) not supported
_dcmiCheckReturn(ret)
return elabelinfo.serial_number
#int dcmi_get_device_power_info(int card_id, int device_id, int *power)
def dcmiGetDevicePowerInfo(card_id, device_id):
c_card_id = c_uint(card_id)
c_device_id = c_uint(device_id)
c_power = c_uint()
fn = _dcmiGetFunctionPointer("dcmi_get_device_power_info")
ret = fn(c_card_id, c_device_id, byref(c_power))
_dcmiCheckReturn(ret)
return c_power.value
#int dcmi_get_device_temperature(int card_id, int device_id, int *temperature)
def dcmiGetDeviceTemp(card_id, device_id):
c_card_id = c_uint(card_id)
c_device_id = c_uint(device_id)
c_temp = c_uint()
fn = _dcmiGetFunctionPointer("dcmi_get_device_temperature")
ret = fn(c_card_id, c_device_id, byref(c_temp))
_dcmiCheckReturn(ret)
return c_temp.value
#int dcmi_get_device_ecc_info(int card_id, int device_id, enum dcmi_device_type input_type, struct dcmi_ecc_info *device_ecc_info)
def dcmiGetDeviceEccInfo(card_id, device_id, input_type):
c_card_id = c_uint(card_id)
c_device_id = c_uint(device_id)
eccinfo = dcmiEccInfo_t()
fn = _dcmiGetFunctionPointer("dcmi_get_device_ecc_info")
ret = fn(c_card_id, c_device_id, _dcmiDeviceType_t(input_type), byref(eccinfo))
_dcmiCheckReturn(ret)
return eccinfo.enable_flag, eccinfo.total_sb_err_cnt, eccinfo.total_db_err_cnt, eccinfo.isolated_sb_err_cnt ,eccinfo.isolated_db_err_cnt
#int dcmi_get_device_frequency(int card_id, int device_id, enum dcmi_freq_type input_type, unsigned int *frequency)
def dcmiGetDeviceFrequency(card_id, device_id, input_type):
c_card_id = c_uint(card_id)
c_device_id = c_uint(device_id)
c_freq = c_uint()
fn = _dcmiGetFunctionPointer("dcmi_get_device_frequency")
ret = fn(c_card_id, c_device_id, _dcmiFreqType_t(input_type), byref(c_freq))
_dcmiCheckReturn(ret)
return c_freq.value
#int dcmi_get_device_hbm_info(int card_id, int device_id, struct dcmi_hbm_info *hbm_info)
def dcmiGetDeviceHbmInfo(card_id, device_id):
c_card_id = c_uint(card_id)
c_device_id = c_uint(device_id)
hbminfo = dcmiHbmInfo_t()
fn = _dcmiGetFunctionPointer("dcmi_get_device_hbm_info")
ret = fn(c_card_id, c_device_id, byref(hbminfo))
_dcmiCheckReturn(ret)
return hbminfo.memory_size, hbminfo.freq, hbminfo.memory_usage
#int dcmi_get_device_memory_info_v3(int card_id, int device_id, struct dcmi_get_memory_info_stru *memory_info)
def dcmiGetDeviceMemoryInfo(card_id, device_id):
c_card_id = c_uint(card_id)
c_device_id = c_uint(device_id)
memoryinfo = dcmiMemoryInfo_t()
fn = _dcmiGetFunctionPointer("dcmi_get_device_memory_info_v3")
ret = fn(c_card_id, c_device_id, byref(memoryinfo))
print(ret)
_dcmiCheckReturn(ret)
return memoryinfo.freq, memoryinfo.utiliza
#int dcmi_get_device_utilization_rate(int card_id, int device_id, int input_type, unsigned int *utilization_rate)
def dcmiGetDeviceUtilRateCtlCpu(card_id, device_id):
c_card_id = c_uint(card_id)
c_device_id = c_uint(device_id)
c_rate = c_uint()
fn = _dcmiGetFunctionPointer("dcmi_get_device_utilization_rate")
ret = fn(c_card_id, c_device_id, _dcmiInputType_t(DCMI_INPUT_TPYE_CTL_CPU), byref(c_rate))
_dcmiCheckReturn(ret)
return c_rate.value
def dcmiGetDeviceUtilRateAICore(card_id, device_id):
c_card_id = c_uint(card_id)
c_device_id = c_uint(device_id)
c_rate = c_uint()
fn = _dcmiGetFunctionPointer("dcmi_get_device_utilization_rate")
ret = fn(c_card_id, c_device_id, _dcmiInputType_t(DCMI_INPUT_TPYE_AI_CORE), byref(c_rate))
_dcmiCheckReturn(ret)
return c_rate.value
#int dcmi_get_device_health(int card_id, int device_id, unsigned int *health)
def dcmiGetDeviceHealthStatus(card_id, device_id):
c_card_id = c_uint(card_id)
c_device_id = c_uint(device_id)
c_health = c_uint()
fn = _dcmiGetFunctionPointer("dcmi_get_device_health")
ret = fn(c_card_id, c_device_id, byref(c_health))
_dcmiCheckReturn(ret)
return c_health.value