HEX
Server: Apache/2.4.52 (Ubuntu)
System: Linux WebLive 5.15.0-79-generic #86-Ubuntu SMP Mon Jul 10 16:07:21 UTC 2023 x86_64
User: ubuntu (1000)
PHP: 7.4.33
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
Upload Files
File: //usr/local/qcloud/monitor/barad/comm/dispatcher.py
import constant
import Queue
from base_process import BaseProcess
from module_loader import ModuleLoader
from plugin_base import BaseSender
import cutils
from collect_vm_msg import CollectVmMesg

class Dispatcher(BaseProcess):
    def __init__(self, config_path, router):
        super(Dispatcher, self).__init__(config_path, self.__class__.__name__.lower())
        self.__msg_queue = router.get_queue(constant.QUEUE_TO_DISPATCHER)
        
        module_path = self.get_config('module_path')
        self.__module_loader = ModuleLoader(module_path)
        self.__module_loader.load_by_class(BaseSender, self.logger())
        self.__module_dict = self.__module_loader.get_modules()
        self.metadata_instance = CollectVmMesg()

    def run(self):
        while True:
            try:
                data = self.__msg_queue.get(timeout=120)
                self.logger().debug('get msg : %s from queue' % data)
                if isinstance(data, list):
                    for one_data in data:
                        self.send_one_data(one_data)
                        hai_data = self.generate_hai_data(one_data)
                        if hai_data != "":
                            self.send_one_data(hai_data)
                else:
                    self.send_one_data(data)
                    hai_data = self.generate_hai_data(data)
                    if hai_data != "":
                        self.send_one_data(hai_data)
            except Queue.Empty:
                import traceback
                self.logger().error(traceback.format_exc());
                break
            except Exception,e:
                import traceback
                self.logger().error(traceback.format_exc());
                
    def send_one_data(self, data):
        if data.has_key('sender') and data.has_key('datas'):
            module_name = data.get('sender')
            if self.__module_dict.has_key(module_name):
                sender = self.__module_dict.get(module_name)
                sender.send_data(data['datas'])
            else:
                self.logger().error("no sender module named '%s'" % module_name)
        else:
            self.logger().error("invalid data format , %s" % data)

    def generate_hai_data(self, data):
        try:
            vm_uuid = self.metadata_instance.get_vm_uuid()
            instance_id = self.metadata_instance.get_vm_instance_id()
            app_id = self.metadata_instance.get_vm_app_id()
            project_id = self.metadata_instance.get_vm_project_id()
            if instance_id.startswith('hai-') and data.has_key('sender') and (data['sender'] == "nws_sender") and data.has_key('datas') and (len(data['datas']) > 0):
                data['sender'] = 'hai_sender'
                for datas in data['datas']:
                    if  datas.has_key('dimension') and datas.has_key('namespace'):
                        datas['namespace'] = 'qce/hai'
                        datas['dimension']['vm_uuid'] = vm_uuid
                        datas['dimension']['appid'] = app_id
                        datas['dimension']['instance_id'] = instance_id
                        datas['dimension']['projectid'] = project_id
                return data
        except Exception as e:
                import traceback
                self.logger().error(traceback.format_exc())
        return ""