HEX
Server: Apache/2.4.52 (Ubuntu)
System: Linux WebLive 5.15.0-79-generic #86-Ubuntu SMP Mon Jul 10 16:07:21 UTC 2023 x86_64
User: ubuntu (1000)
PHP: 7.4.33
Disabled: pcntl_alarm,pcntl_fork,pcntl_waitpid,pcntl_wait,pcntl_wifexited,pcntl_wifstopped,pcntl_wifsignaled,pcntl_wifcontinued,pcntl_wexitstatus,pcntl_wtermsig,pcntl_wstopsig,pcntl_signal,pcntl_signal_get_handler,pcntl_signal_dispatch,pcntl_get_last_error,pcntl_strerror,pcntl_sigprocmask,pcntl_sigwaitinfo,pcntl_sigtimedwait,pcntl_exec,pcntl_getpriority,pcntl_setpriority,pcntl_async_signals,pcntl_unshare,
Upload Files
File: //usr/local/qcloud/monitor/barad/lib/apscheduler/jobstores/redis_store.py
"""
Stores jobs in a Redis database.
"""
from uuid import uuid4
from datetime import datetime
import logging

from apscheduler.jobstores.base import JobStore
from apscheduler.job import Job

try:
    import cPickle as pickle
except ImportError:  # pragma: nocover
    import pickle

try:
    from redis import StrictRedis
except ImportError:  # pragma: nocover
    raise ImportError('RedisJobStore requires redis installed')

try:
    long = long
except NameError:
    long = int

logger = logging.getLogger(__name__)


class RedisJobStore(JobStore):
    def __init__(self, db=0, key_prefix='jobs.',
                 pickle_protocol=pickle.HIGHEST_PROTOCOL, **connect_args):
        self.jobs = []
        self.pickle_protocol = pickle_protocol
        self.key_prefix = key_prefix

        if db is None:
            raise ValueError('The "db" parameter must not be empty')
        if not key_prefix:
            raise ValueError('The "key_prefix" parameter must not be empty')

        self.redis = StrictRedis(db=db, **connect_args)

    def add_job(self, job):
        job.id = str(uuid4())
        job_state = job.__getstate__()
        job_dict = {
            'job_state': pickle.dumps(job_state, self.pickle_protocol),
            'runs': '0',
            'next_run_time': job_state.pop('next_run_time').isoformat()}
        self.redis.hmset(self.key_prefix + job.id, job_dict)
        self.jobs.append(job)

    def remove_job(self, job):
        self.redis.delete(self.key_prefix + job.id)
        self.jobs.remove(job)

    def load_jobs(self):
        jobs = []
        keys = self.redis.keys(self.key_prefix + '*')
        pipeline = self.redis.pipeline()
        for key in keys:
            pipeline.hgetall(key)
        results = pipeline.execute()

        for job_dict in results:
            job_state = {}
            try:
                job = Job.__new__(Job)
                job_state = pickle.loads(job_dict['job_state'.encode()])
                job_state['runs'] = long(job_dict['runs'.encode()])
                dateval = job_dict['next_run_time'.encode()].decode()
                job_state['next_run_time'] = datetime.strptime(
                    dateval, '%Y-%m-%dT%H:%M:%S')
                job.__setstate__(job_state)
                jobs.append(job)
            except Exception:
                job_name = job_state.get('name', '(unknown)')
                logger.exception('Unable to restore job "%s"', job_name)
        self.jobs = jobs

    def update_job(self, job):
        attrs = {
            'next_run_time': job.next_run_time.isoformat(),
            'runs': job.runs}
        self.redis.hmset(self.key_prefix + job.id, attrs)

    def close(self):
        self.redis.connection_pool.disconnect()

    def __repr__(self):
        return '<%s>' % self.__class__.__name__