您好, 欢迎来到 !    登录 | 注册 | | 设为首页 | 收藏本站

如何在python-rq中的预定作业和排队作业之间创建``depends_on''关系

如何在python-rq中的预定作业和排队作业之间创建``depends_on''关系

我对此问题的解决方rq仅使用了(并且不再使用rq_scheduler):

升级到最新的python-rq软件包:

# requirements.txt

… rq==1.1.0

为轮询作业创建专用队列,并相应地使作业入队(具有depends_on关系):

with Connection(redis.from_url(current_app.config['REDIS_URL'])):
q = Queue('default')
p = Queue('pqueue')
job1 = q.enqueue(step1)
job2 = p.enqueue(step2, depends_on=job1)  # step2 enqueued in polling queue
job3 = q.enqueue(step3, depends_on=job2)

派遣专职工作人员进行轮询队列。它继承自标准Worker类:

class PWorker(rq.worker.Worker):
def execute_job(self, *args, **kwargs):
    seconds_between_polls = 65
    job = args[0]
    if 'lastpoll' in job.Meta:
        job_timedelta = (datetime.utcNow() - job.Meta["lastpoll"]).total_seconds()
        if job_timedelta < seconds_between_polls:
            sleep_period = seconds_between_polls - job_timedelta
            time.sleep(sleep_period)
    job.Meta['lastpoll'] = datetime.utcNow()
    job.save_Meta()

    super().execute_job(*args, **kwargs)

PWorker execute_job通过向作业的元数据添加时间戳来扩展该方法'lastpoll'

如果有lastpoll时间戳记的轮询作业进入,工作人员将检查此后的时间间隔lastpoll是否大于65秒。如果是,它将当前时间写入'lastpoll'并执行轮询。如果没有,它将一直hibernate直到65s结束,然后将当前时间写入'lastpoll'并执行轮询。没有lastpoll时间戳的进来的作业是第一次轮询,而工作人员创建时间戳并执行轮询。

        # exceptions.py

    class PACError(Exception):
        pass

    class PACJobRun(PACError):
        pass

    class PACJobExit(PACError):
        pass


        # exception_handlers.py

    def poll_exc_handler(job, exc_type, exc_value, traceback):
        if exc_type is PACJobRun:
            requeue_job(job.get_id(), connection=job.connection)
            return False  # no further exception handling
        else:
            return True  # further exception handling


        # tasks.py

    def step2():
        # GET request to remote compute job portal API for status
        # if response == "RUN":
        raise PACJobRun
        return True

当定制异常处理程序捕获到定制异常(这意味着远程计算作业仍在运行)时,它将在轮询队列中重新排队该作业。

        # manage.py

    @cli.command('run_pworker')
    def run_pworker():
        redis_url = app.config['REDIS_URL']
        redis_connection = redis.from_url(redis_url)
        with rq.connections.Connection(redis_connection):
            pworker = PWorker(app.config['PQUEUE'], exception_handlers=[poll_exc_handler])
            pworker.work()

解决方案的优点在于,它仅用几行额外的代码即可扩展python-rq的标准功能。另一方面,额外的队列和工作程序增加了复杂性……

python 2022/1/1 18:15:17 有633人围观

撰写回答


你尚未登录,登录后可以

和开发者交流问题的细节

关注并接收问题和回答的更新提醒

参与内容的编辑和改进,让解决方法与时俱进

请先登录

推荐问题


联系我
置顶