HEX
Server: Apache/2.4.52 (Ubuntu)
System: Linux WebLive 5.15.0-79-generic #86-Ubuntu SMP Mon Jul 10 16:07:21 UTC 2023 x86_64
User: ubuntu (1000)
PHP: 7.4.33
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
Upload Files
File: //usr/local/qcloud/monitor/barad/lib/pydcmi/pydcmi.py
from ctypes import *
from ctypes.util import find_library
import sys
import os
import threading
import string


DCMI_OK = 0
DCMI_ERR_CODE_INVALID_PARAMETER =  -8001
DCMI_ERR_CODE_OPER_NOT_PERMITTED = -8002
DCMI_ERR_CODE_MEM_OPERATE_FAIL = -8003
DCMI_ERR_CODE_SECURE_FUN_FAIL = -8004
DCMI_ERR_CODE_INNER_ERR = -8005
DCMI_ERR_CODE_TIME_OUT = -8006
DCMI_ERR_CODE_INVALID_DEVICE_ID = -8007
DCMI_ERR_CODE_DEVICE_NOT_EXIST = -8008
DCMI_ERR_CODE_IOCTL_FAIL = -8009
DCMI_ERR_CODE_SEND_MSG_FAIL = -8010
DCMI_ERR_CODE_RECV_MSG_FAIL = -8011
DCMI_ERR_CODE_NOT_REDAY = -8012
DCMI_ERR_CODE_NOT_SUPPORT_IN_CONTAINER = -8013
DCMI_ERR_CODE_RESET_FAIL = -8015
DCMI_ERR_CODE_ABORT_OPERATE = -8016
DCMI_ERR_CODE_IS_UPGRADING = -8017
DCMI_ERR_CODE_RESOURCE_OCCUPIED = -8020
DCMI_ERR_CODE_NOT_SUPPORT = -8255


_dcmiDeviceType_t = c_uint
DCMI_DEVICE_TYPE_DDR = 0
DCMI_DEVICE_TYPE_SRAM = 1
DCMI_DEVICE_TYPE_HBM = 2
DCMI_DEVICE_TYPE_NPU = 3
DCMI_HBM_RECORDED_SINGLE_ADDR = 4
DCMI_HBM_RECORDED_MULTI_ADDR = 5


_dcmiFreqType_t = c_uint
DCMI_FREQ_TYPE_MEM = 1
DCMI_FREQ_TYPE_CPU = 2
DCMI_FREQ_TYPE_HBM = 6
DCMI_FREQ_TYPE_AI_CORE_CUR = 7
DCMI_FREQ_TYPE_AI_CORE_MAX = 9
DCMI_FREQ_TYPE_VECTOR_CORE_CUR = 12

_dcmiInputType_t = c_uint
DCMI_INPUT_TPYE_AI_CORE = 2
DCMI_INPUT_TPYE_AI_CPU  = 3
DCMI_INPUT_TPYE_CTL_CPU = 4

class _PrintableStructure(Structure):
    _fmt_ = {}
    def __str__(self):
        result = []
        for x in self._fields_:
            key = x[0]
            value = getattr(self, key)
            fmt = "%s"
            if key in self._fmt_:
                fmt = self._fmt_[key]
            elif "<default>" in self._fmt_:
                fmt = self._fmt_["<default>"]
            result.append(("%s: " + fmt) % (key, value))
        return self.__class__.__name__ + "(" + string.join(result, ", ") + ")"


class dcmiPciInfo_t(_PrintableStructure):
    _fields_ = [
        ('deviceid', c_uint),
        ('venderid', c_uint),
        ('subvenderid', c_uint),
        ('subdeviceid', c_uint),
        ('bdf_deviceid', c_uint),
        ('bdf_busid', c_uint),
        ('bdf_funcid', c_uint),
    ]
    _fmt_ = {}


class dcmiEccInfo_t(_PrintableStructure):
    _fields_ = [
        ('enable_flag', c_int),
        ('sb_err_cnt', c_uint),
        ('db_err_cnt', c_uint),
        ('total_sb_err_cnt', c_uint),
        ('total_db_err_cnt', c_uint),
        ('isolated_sb_err_cnt', c_uint),
        ('isolated_db_err_cnt', c_uint),
    ]
    _fmt_ = {}

class dcmiHbmInfo_t(_PrintableStructure):
    _fields_ = [
        ('memory_size', c_ulonglong),
        ('freq', c_uint),
        ('memory_usage', c_ulonglong),
        ('temp', c_uint),
        ('bandwith_util_rate', c_uint),
    ]
    _fmt_ = {}

MAX_CHIP_NAME_LEN = 32
class dcmiChipInfo_t(_PrintableStructure):
    _fields_ = [
        ('chip_type' , c_char * MAX_CHIP_NAME_LEN),
        ('chip_name' , c_char * MAX_CHIP_NAME_LEN),
        ('chip_ver' , c_char * MAX_CHIP_NAME_LEN),
        ('aicore_cnt' , c_uint),
    ]
    _fmt_ = {}


MAX_LENTH = 256
class dcmiElabelInfo_t(_PrintableStructure):
    _fields_ = [
        ('product_name', c_char * MAX_LENTH),
        ('model', c_char * MAX_LENTH),
        ('manufacturer', c_char * MAX_LENTH),
        ('manufacturer_date', c_char * MAX_LENTH),
        ('serial_number', c_char * MAX_LENTH),
    ]
    _fmt_ = {}


class dcmiMemoryInfo_t(_PrintableStructure):
    _fields_ = [
        ('memory_size', c_ulonglong),
        ('memory_available',c_ulonglong),
        ('freq', c_uint),
        ('hugepagesize', c_ulong),
        ('hugepages_total', c_ulong),
        ('hugepages_free', c_ulong),
        ('utiliza', c_uint),
        ('reserve', c_char * 60),
    ]
    _fmt_ = {}


## Lib loading ##
dcmiLib = None
libLoadLock = threading.Lock()
_dcmiLib_refcount = 0


def _dcmiCheckReturn(ret):
    if (ret != DCMI_OK):
        raise ret
    return ret

## Function access ##
_dcmiGetFunctionPointer_cache = dict()
def _dcmiGetFunctionPointer(name):
    global dcmiLib

    if name in _dcmiGetFunctionPointer_cache:
        return _dcmiGetFunctionPointer_cache[name]

    libLoadLock.acquire()
    try:
        # ensure library was loaded
        if (dcmiLib == None):
            raise -1
        try:
            _dcmiGetFunctionPointer_cache[name] = getattr(dcmiLib, name)
            return _dcmiGetFunctionPointer_cache[name]
        except AttributeError:
            raise -1
    finally:
        # lock is always freed
        libLoadLock.release()


## C function wrappers ##
def dcmiInit():
    _LoadDcmiibrary()

    #
    # Initialize the library
    #
    fn = _dcmiGetFunctionPointer("dcmi_init")
    ret = fn()
    _dcmiCheckReturn(ret)

    # Atomically update refcount
    global _dcmiLib_refcount
    libLoadLock.acquire()
    _dcmiLib_refcount += 1
    libLoadLock.release()
    return None

def _LoadDcmiibrary():
    '''
    Load the library if it isn't loaded already
    '''
    global dcmiLib

    if (dcmiLib == None):
        # lock to ensure only one caller loads the library
        libLoadLock.acquire()

        try:
            # ensure the library still isn't loaded
            if (dcmiLib == None):
                try:
                    dcmiLib = CDLL("libdcmi.so")
                except OSError as ose:
                    _dcmiCheckReturn(0)
                if (dcmiLib == None):
                    _dcmiCheckReturn(0)
        finally:
            # lock is always freed
            libLoadLock.release()

def dcmiShutdown():
    # Atomically update refcount
    global _dcmiLib_refcount
    libLoadLock.acquire()
    if (0 < _dcmiLib_refcount):
        _dcmiLib_refcount -= 1
    libLoadLock.release()
    return None



#int dcmi_get_driver_version(char *driver_ver, unsigned int len)
def dcmiGetDriverVersion():
    c_len = c_uint(64)
    c_driver_ver = create_string_buffer(64)
    fn = _dcmiGetFunctionPointer("dcmi_get_driver_version")
    ret = fn(c_driver_ver, c_len)
    _dcmiCheckReturn(ret)
    return c_driver_ver.value

#int dcmi_get_card_list(int *card_num, int *card_list, int list_len)
def dcmiGetCardList():
    c_card_num = c_uint()
    c_list_len = c_uint(16)
    card_list_array = c_uint * c_list_len.value
    c_card_list = card_list_array()
    fn = _dcmiGetFunctionPointer("dcmi_get_card_list")
    ret = fn(byref(c_card_num), c_card_list, c_list_len)
    _dcmiCheckReturn(ret)
    return c_card_num.value, c_card_list[0:c_list_len.value]

#int dcmi_get_device_num_in_card(int card_id, int *device_num)
def dcmiGetDeviceNumInCard(card_id):
    c_card_id = c_uint(card_id)
    c_device_num = c_uint()
    fn = _dcmiGetFunctionPointer("dcmi_get_device_num_in_card")
    ret = fn(c_card_id, byref(c_device_num))
    _dcmiCheckReturn(ret)
    return c_device_num.value

#int dcmi_get_device_pcie_info(int card_id, int device_id, struct dcmi_pcie_info *pcie_info)
def dcmiGetDevicePcieInfo(card_id, device_id):
    c_card_id = c_uint(card_id)
    c_device_id = c_uint(device_id)
    pciinfo = dcmiPciInfo_t()
    fn = _dcmiGetFunctionPointer("dcmi_get_device_pcie_info")
    ret = fn(c_card_id, c_device_id, byref(pciinfo))
    _dcmiCheckReturn(ret)
    return pciinfo.bdf_busid,pciinfo.bdf_deviceid,pciinfo.bdf_funcid

#int dcmi_get_device_chip_info(int card_id, int device_id, struct dcmi_chip_info *chip_info)
def dcmiGetDeviceChipInfoName(card_id, device_id):
    c_card_id = c_uint(card_id)
    c_device_id = c_uint(device_id)
    chipinfo = dcmiChipInfo_t()
    fn = _dcmiGetFunctionPointer("dcmi_get_device_chip_info")
    ret = fn(c_card_id, c_device_id, byref(chipinfo))
    _dcmiCheckReturn(ret)
    return chipinfo.chip_name

#int dcmi_get_device_elabel_info(int card_id, int device_id, struct dcmi_elabel_info *elabel_info)
def dcmiGetDeviceSerial(card_id, device_id):
    c_card_id = c_uint(card_id)
    c_device_id = c_uint(device_id)
    elabelinfo = dcmiElabelInfo_t()
    fn = _dcmiGetFunctionPointer("dcmi_get_device_elabel_info")
    ret = fn(c_card_id, c_device_id, byref(elabelinfo))
    #print(ret) not supported
    _dcmiCheckReturn(ret)
    return elabelinfo.serial_number


#int dcmi_get_device_power_info(int card_id, int device_id, int *power)
def dcmiGetDevicePowerInfo(card_id, device_id):
    c_card_id = c_uint(card_id)
    c_device_id = c_uint(device_id)
    c_power = c_uint()
    fn = _dcmiGetFunctionPointer("dcmi_get_device_power_info")
    ret = fn(c_card_id, c_device_id, byref(c_power))
    _dcmiCheckReturn(ret)
    return c_power.value

#int dcmi_get_device_temperature(int card_id, int device_id, int *temperature)
def dcmiGetDeviceTemp(card_id, device_id):
    c_card_id = c_uint(card_id)
    c_device_id = c_uint(device_id)
    c_temp = c_uint()
    fn = _dcmiGetFunctionPointer("dcmi_get_device_temperature")
    ret = fn(c_card_id, c_device_id, byref(c_temp))
    _dcmiCheckReturn(ret)
    return c_temp.value

#int dcmi_get_device_ecc_info(int card_id, int device_id, enum dcmi_device_type input_type, struct dcmi_ecc_info *device_ecc_info)
def dcmiGetDeviceEccInfo(card_id, device_id, input_type):
    c_card_id = c_uint(card_id)
    c_device_id = c_uint(device_id)
    eccinfo = dcmiEccInfo_t()
    fn = _dcmiGetFunctionPointer("dcmi_get_device_ecc_info")
    ret = fn(c_card_id, c_device_id, _dcmiDeviceType_t(input_type), byref(eccinfo))
    _dcmiCheckReturn(ret)
    return eccinfo.enable_flag, eccinfo.total_sb_err_cnt, eccinfo.total_db_err_cnt, eccinfo.isolated_sb_err_cnt ,eccinfo.isolated_db_err_cnt

#int dcmi_get_device_frequency(int card_id, int device_id, enum dcmi_freq_type input_type, unsigned int *frequency)
def dcmiGetDeviceFrequency(card_id, device_id, input_type):
    c_card_id = c_uint(card_id)
    c_device_id = c_uint(device_id)
    c_freq = c_uint()
    fn = _dcmiGetFunctionPointer("dcmi_get_device_frequency")
    ret = fn(c_card_id, c_device_id, _dcmiFreqType_t(input_type), byref(c_freq))
    _dcmiCheckReturn(ret)
    return c_freq.value

#int dcmi_get_device_hbm_info(int card_id, int device_id, struct dcmi_hbm_info *hbm_info)
def dcmiGetDeviceHbmInfo(card_id, device_id):
    c_card_id = c_uint(card_id)
    c_device_id = c_uint(device_id)
    hbminfo = dcmiHbmInfo_t()
    fn = _dcmiGetFunctionPointer("dcmi_get_device_hbm_info")
    ret = fn(c_card_id, c_device_id, byref(hbminfo))
    _dcmiCheckReturn(ret)
    return hbminfo.memory_size, hbminfo.freq, hbminfo.memory_usage

#int dcmi_get_device_memory_info_v3(int card_id, int device_id, struct dcmi_get_memory_info_stru *memory_info)
def dcmiGetDeviceMemoryInfo(card_id, device_id):
    c_card_id = c_uint(card_id)
    c_device_id = c_uint(device_id)
    memoryinfo = dcmiMemoryInfo_t()
    fn = _dcmiGetFunctionPointer("dcmi_get_device_memory_info_v3")
    ret = fn(c_card_id, c_device_id, byref(memoryinfo))
    print(ret)
    _dcmiCheckReturn(ret)
    return memoryinfo.freq, memoryinfo.utiliza


#int dcmi_get_device_utilization_rate(int card_id, int device_id, int input_type, unsigned int *utilization_rate)
def dcmiGetDeviceUtilRateCtlCpu(card_id, device_id):
    c_card_id = c_uint(card_id)
    c_device_id = c_uint(device_id)    
    c_rate = c_uint()
    fn = _dcmiGetFunctionPointer("dcmi_get_device_utilization_rate")
    ret = fn(c_card_id, c_device_id, _dcmiInputType_t(DCMI_INPUT_TPYE_CTL_CPU), byref(c_rate))
    _dcmiCheckReturn(ret)
    return c_rate.value

def dcmiGetDeviceUtilRateAICore(card_id, device_id):
    c_card_id = c_uint(card_id)
    c_device_id = c_uint(device_id)    
    c_rate = c_uint()
    fn = _dcmiGetFunctionPointer("dcmi_get_device_utilization_rate")
    ret = fn(c_card_id, c_device_id, _dcmiInputType_t(DCMI_INPUT_TPYE_AI_CORE), byref(c_rate))
    _dcmiCheckReturn(ret)
    return c_rate.value

#int dcmi_get_device_health(int card_id, int device_id, unsigned int *health)
def dcmiGetDeviceHealthStatus(card_id, device_id):
    c_card_id = c_uint(card_id)
    c_device_id = c_uint(device_id)    
    c_health = c_uint()
    fn = _dcmiGetFunctionPointer("dcmi_get_device_health")
    ret = fn(c_card_id, c_device_id, byref(c_health))
    _dcmiCheckReturn(ret)
    return c_health.value