Flask 使用定时任务


发布于

|

分类

最近整了个活:公司内IM,每天早上10点定时把头像换成彩色版,每天下午7点定时把头像换成灰色版。蛮简单的,但是里面稍微有点坑。

找了一下,APScheduler(Advanced Python Scheduler)看起来确实强大,简单些了个demo,效果也不错。但是一用到Flask里面就傻了:执行的函数里面需要获取一些 current_app 里面的东西,但是Flask一直在疯狂地报「不能获取current app」。

这个事情可谓经典:从写Flask第一天起就开始遇到这个事情。查了下源码,发现我正在用的是基于线程池的Executor。这下子问题就清晰了。

class ThreadPoolExecutor(BasePoolExecutor):
    """
    An executor that runs jobs in a concurrent.futures thread pool.

    Plugin alias: ``threadpool``

    :param max_workers: the maximum number of spawned threads.
    :param pool_kwargs: dict of keyword arguments to pass to the underlying
        ThreadPoolExecutor constructor
    """

    def __init__(self, max_workers=10, pool_kwargs=None):
        pool_kwargs = pool_kwargs or {}
        pool = concurrent.futures.ThreadPoolExecutor(int(max_workers), **pool_kwargs)
        super(ThreadPoolExecutor, self).__init__(pool)


class ProcessPoolExecutor(BasePoolExecutor):
    """
    An executor that runs jobs in a concurrent.futures process pool.

    Plugin alias: ``processpool``

    :param max_workers: the maximum number of spawned processes.
    :param pool_kwargs: dict of keyword arguments to pass to the underlying
        ProcessPoolExecutor constructor
    """

    def __init__(self, max_workers=10, pool_kwargs=None):
        pool_kwargs = pool_kwargs or {}
        pool = concurrent.futures.ProcessPoolExecutor(int(max_workers), **pool_kwargs)
        super(ProcessPoolExecutor, self).__init__(pool)

官方推荐使用flask-APScheduler来解这个问题,但是使用了这个库后,依然出现这个错误,查了下源码发现,flask-APscheduler其实也没做这个事儿:它直接用了Python的线程池,并没有将current_app注入到当前线程。

那么,解法就非常明确了:使用一个「支持自动注入current_app的线程池」,替换APScheduler使用的原生线程池。这里正好可以用到前面博客里面提到的 FlaskThread

from apscheduler.executors.pool import BasePoolExecutor
from flaskthreads import ThreadPoolWithAppContextExecutor


class FlaskThreadPoolExecutor(BasePoolExecutor):
    """
    An executor that runs jobs in a concurrent.futures thread pool.

    Plugin alias: ``threadpool``

    :param max_workers: the maximum number of spawned threads.
    :param pool_kwargs: dict of keyword arguments to pass to the underlying
        ThreadPoolExecutor constructor
    """

    def __init__(self, max_workers=10, pool_kwargs=None):
        pool_kwargs = pool_kwargs or {}
        pool = ThreadPoolWithAppContextExecutor(int(max_workers), **pool_kwargs)
        super(FlaskThreadPoolExecutor, self).__init__(pool)

然后这样用:

scheduler = BackgroundScheduler()
with app.app_context():
    scheduler.add_executor(FlaskThreadPoolExecutor())
    scheduler.start()
app.scheduler = scheduler

嗯,这下子,就能在需要执行的地方,放心大胆地使用current_app了:

from flask import current_app
def callback_fn(*args, **kwargs):
    some_global_variable = current_app.some_variable

哦对了,进程池同理哦~


评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注