HEX
Server: Apache/2.4.52 (Ubuntu)
System: Linux WebLive 5.15.0-79-generic #86-Ubuntu SMP Mon Jul 10 16:07:21 UTC 2023 x86_64
User: ubuntu (1000)
PHP: 7.4.33
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
Upload Files
File: //usr/local/qcloud/monitor/barad/comm/executor.py
import time
from base_process import BaseProcess
import constant
from module_loader import ModuleLoader
from apscheduler.scheduler import Scheduler
from plugin_base import BaseCollector
import copy
from datetime import datetime, timedelta
from Queue import Empty
import cutils

class Executor(BaseProcess):
    def __init__(self, config_path, router):
        super(Executor, self).__init__(config_path, self.__class__.__name__.lower())
        self.__router = router
        self.__msg_queue = router.get_queue(constant.QUEUE_TO_EXECUTOR)
        module_path = self.get_config('module_path')
        load_dir = self.get_config('module_dir').replace(' ','').split(',')
        if not cutils.is_metal():
            load_dir.remove('metal')
        self.__module_loader = ModuleLoader(module_path, load_dir)
        #self.__module_loader.load('do_collect', self)
        self.__jobs = { }
        self.load_modules()
      
    def load_modules(self):
        try:
            self.__module_loader.load_by_class(BaseCollector, self)
        except Exception,e:
            self.logger().error(e)
        self.__module_dict = self.__module_loader.get_modules()

    def add_jobs(self):
        curr_time = datetime.now()
        index = 0
        for collector_name in self.__module_dict.keys():
            if not self.__jobs.has_key(collector_name):
                collector = self.__module_dict[collector_name]
                start_date = curr_time + timedelta(seconds = collector.frequency() + index)
                index += 1
                job = self.__sched.add_interval_job(collector.collect, seconds = collector.frequency(), start_date = start_date)
                self.__jobs[collector_name] = job
                self.logger().info("%s add to scheduler." % collector_name)

    def run_scheduler(self):
        self.__sched = Scheduler()
        self.add_jobs()
        self.__sched.start()
    
    def msg_wait(self):
        try:
            msg =  self.__msg_queue.get(timeout = 5)
            if msg.has_key('collector') and msg.has_key('method'):
                collector_name = msg.get('collector')
                if self.__module_dict.has_key(collector_name):
                    collector = self.__module_dict.get(collector_name)
                    method_name = msg.get('method')
                    
                    if hasattr(collector, method_name):
                        method = getattr(collector, method_name)
                        data = method()
                        try:
                            from mr_sender import MRSender
                            sender = MRSender(self.logger())
                            sender.init()
                            ret = sender.send_one_data(8, data, True)
                            self.logger().warn('execute collector : %s , ret : %d' % (collector_name, ret))
                        except Exception,e:
                            import traceback
                            self.logger().error('execute collector failed, %s' % traceback.format_exc().replace('\n','\\n'))
                    else:
                        self.logger().error('collector %s has no method : %s' % (collector_name, method_name))
                else:
                    self.logger().error("collector %s isn't loaded" % collector_name)
        except Empty as e:
            pass

    def run(self):
        self.run_scheduler()
        while True:
            try:
                #self.msg_wait()
                # reload new modules every 10 minute
                self.load_modules()
                self.add_jobs()
                time.sleep(600)
            except Exception,e:
                import traceback
                self.logger().error(traceback.format_exc());
                                    
    def put_data(self, msg):
        self.logger().info('put msg : %s to dispatcher' % msg)
        self.__router.route(constant.QUEUE_TO_DISPATCHER, copy.deepcopy(msg))