Flask 使用定时任务

最近整了个活:公司内 IM,每天早上 10 点定时把头像换成彩色版,每天下午 7 点定时把头像换成灰色版。蛮简单的,但是里面稍微有点坑。


找了一下,APScheduler(Advanced Python Scheduler)看起来确实强大,简单些了个 demo,效果也不错。但是一用到 Flask 里面就傻了:执行的函数里面需要获取一些 current_app 里面的东西,但是 Flask 一直在疯狂地报「不能获取 current app」。

这个事情可谓经典:从写 Flask 第一天起就开始遇到这个事情。查了下源码,发现我正在用的是基于线程池的 Executor。这下子问题就清晰了。

class ThreadPoolExecutor(BasePoolExecutor):
    """
    An executor that runs jobs in a concurrent.futures thread pool.

    Plugin alias: ``threadpool``

    :param max_workers: the maximum number of spawned threads.
    :param pool_kwargs: dict of keyword arguments to pass to the underlying
        ThreadPoolExecutor constructor
    """

    def __init__(self, max_workers=10, pool_kwargs=None):
        pool_kwargs = pool_kwargs or {}
        pool = concurrent.futures.ThreadPoolExecutor(int(max_workers), **pool_kwargs)
        super(ThreadPoolExecutor, self).__init__(pool)


class ProcessPoolExecutor(BasePoolExecutor):
    """
    An executor that runs jobs in a concurrent.futures process pool.

    Plugin alias: ``processpool``

    :param max_workers: the maximum number of spawned processes.
    :param pool_kwargs: dict of keyword arguments to pass to the underlying
        ProcessPoolExecutor constructor
    """

    def __init__(self, max_workers=10, pool_kwargs=None):
        pool_kwargs = pool_kwargs or {}
        pool = concurrent.futures.ProcessPoolExecutor(int(max_workers), **pool_kwargs)
        super(ProcessPoolExecutor, self).__init__(pool)

官方推荐使用 flask-APScheduler 来解这个问题,但是使用了这个库后,依然出现这个错误,查了下源码发现,flask-APscheduler 其实也没做这个事儿:它直接用了 Python 的线程池,并没有将 current_app 注入到当前线程。

那么,解法就非常明确了:使用一个「支持自动注入 current_app 的线程池」,替换 APScheduler 使用的原生线程池。这里正好可以用到前面博客里面提到的 FlaskThread

from apscheduler.executors.pool import BasePoolExecutor
from flaskthreads import ThreadPoolWithAppContextExecutor


class FlaskThreadPoolExecutor(BasePoolExecutor):
    """
    An executor that runs jobs in a concurrent.futures thread pool.

    Plugin alias: ``threadpool``

    :param max_workers: the maximum number of spawned threads.
    :param pool_kwargs: dict of keyword arguments to pass to the underlying
        ThreadPoolExecutor constructor
    """

    def __init__(self, max_workers=10, pool_kwargs=None):
        pool_kwargs = pool_kwargs or {}
        pool = ThreadPoolWithAppContextExecutor(int(max_workers), **pool_kwargs)
        super(FlaskThreadPoolExecutor, self).__init__(pool)

然后这样用:

scheduler = BackgroundScheduler()
with app.app_context():
    scheduler.add_executor(FlaskThreadPoolExecutor())
    scheduler.start()
app.scheduler = scheduler

嗯,这下子,就能在需要执行的地方,放心大胆地使用 current_app 了:

from flask import current_app
def callback_fn(*args, **kwargs):
    some_global_variable = current_app.some_variable

哦对了,进程池同理哦~

发表评论