File: //usr/local/qcloud/monitor/barad/bin/agent.py
#!../../python26/bin/python
'''
barad_agent v2
written by cloudyang
2014.05.14
'''
import sys
import os
import time
sys.path.append(os.getcwd()+"/../comm/")
import constant
import datetime
from executor import Executor
from dispatcher import Dispatcher
from router import QueueRouter
from multiprocessing import Process
from multiprocessing import Queue
from daemon import daemon_init
from phealth import PluginHealth
import cutils
import getopt
QUEUE_SIZE = 1024
CONFIG_PATH = '../etc/config.ini'
agent_name = 'barad_agent'
def run_process(func, router, conf_path):
instance = func(conf_path, router)
instance.run()
def create_router():
router = QueueRouter()
router.add_queue(constant.QUEUE_TO_DISPATCHER, Queue(QUEUE_SIZE))
router.add_queue(constant.QUEUE_TO_EXECUTOR, Queue(QUEUE_SIZE))
return router
def run_check(pid_name):
import fcntl,errno
pid_file = "/var/run/%s.lock" % pid_name
global _pid_fp
_pid_fp = open(pid_file,"w")
try:
fcntl.flock(_pid_fp, fcntl.LOCK_EX | fcntl.LOCK_NB)
except IOError,e:
if e.errno == errno.EAGAIN or e.errno == errno.EACCES:
return False
return True
def start_process(cls, router, config, process_list):
if not isinstance(cls, list):
cls = [ cls ]
process_pids = []
for c in cls:
process = Process(target=run_process, args = (c, router, CONFIG_PATH), name=c.__name__)
process_list.append(process)
process.start()
process_pids.append(process.pid)
return process_list, process_pids
def terminate_all(process_list):
for p in process_list:
p.terminate()
p.join()
del process_list[:]
def main_log(log_str):
"""barad main process log
:todo: logging rewrite.
"""
now = datetime.datetime.now()
timestamp = now.strftime('%Y-%m-%d %H:%M:%S')
log_line = "[%s] %s" % (timestamp, log_str)
with open("../log/main.log", "a+") as fd:
fd.write(log_line + "\n")
def main():
agent_ver = cutils.get_agent_version(CONFIG_PATH)
version = '%s_%s' % (agent_name, agent_ver)
daemon_flag = False
opts, args = getopt.getopt(sys.argv[1:], 'vVd', [])
for o,a in opts:
if o in ['-v', '-V']:
print version
sys.exit(0)
if o in ['-d']:
daemon_flag = True
break
if not run_check(agent_name):
main_log("agent already run!")
sys.exit(111)
try:
import setproctitle
setproctitle.setproctitle(agent_name)
# checking whether set process name succ or not
if not setproctitle.getproctitle().startswith(agent_name):
main_log("set process name to %s failed" % agent_name)
sys.exit(111)
except Exception, e:
import traceback
main_log(traceback.format_exc())
sys.exit(111)
if daemon_flag :
daemon_init()
class_dict = {"Dispatcher":Dispatcher, "Executor":Executor }
process_list = []
try:
router = create_router()
plugin_hc = PluginHealth(agent_name, [agent_name])
_, process_pids = start_process(class_dict.values(), router, CONFIG_PATH, process_list)
process_pids.append(os.getpid())
plugin_hc.write_pidfile(process_pids)
while True:
time.sleep(5)
for process in process_list:
if not process.is_alive():
main_log('process : %s, pid : %d is dead with exit code : %d' % (process.name, process.pid, process.exitcode))
process.join()
process_list.remove(process)
main_log('restart in 3 second..')
terminate_all(process_list)
time.sleep(3)
del router
router = create_router()
_, process_pids = start_process(class_dict.values(), router, CONFIG_PATH, process_list)
process_pids.append(os.getpid())
plugin_hc.write_pidfile(process_pids)
break
except Exception as e:
import traceback
main_log(traceback.format_exc())
terminate_all(process_list)
sys.exit(111)
if __name__ == '__main__':
main()