最近整了个活:公司内IM,每天早上10点定时把头像换成彩色版,每天下午7点定时把头像换成灰色版。蛮简单的,但是里面稍微有点坑。
找了一下,APScheduler(Advanced Python Scheduler)看起来确实强大,简单些了个demo,效果也不错。但是一用到Flask里面就傻了:执行的函数里面需要获取一些 current_app
里面的东西,但是Flask一直在疯狂地报「不能获取current app」。
这个事情可谓经典:从写Flask第一天起就开始遇到这个事情。查了下源码,发现我正在用的是基于线程池的Executor。这下子问题就清晰了。
class ThreadPoolExecutor(BasePoolExecutor):
"""
An executor that runs jobs in a concurrent.futures thread pool.
Plugin alias: ``threadpool``
:param max_workers: the maximum number of spawned threads.
:param pool_kwargs: dict of keyword arguments to pass to the underlying
ThreadPoolExecutor constructor
"""
def __init__(self, max_workers=10, pool_kwargs=None):
pool_kwargs = pool_kwargs or {}
pool = concurrent.futures.ThreadPoolExecutor(int(max_workers), **pool_kwargs)
super(ThreadPoolExecutor, self).__init__(pool)
class ProcessPoolExecutor(BasePoolExecutor):
"""
An executor that runs jobs in a concurrent.futures process pool.
Plugin alias: ``processpool``
:param max_workers: the maximum number of spawned processes.
:param pool_kwargs: dict of keyword arguments to pass to the underlying
ProcessPoolExecutor constructor
"""
def __init__(self, max_workers=10, pool_kwargs=None):
pool_kwargs = pool_kwargs or {}
pool = concurrent.futures.ProcessPoolExecutor(int(max_workers), **pool_kwargs)
super(ProcessPoolExecutor, self).__init__(pool)
官方推荐使用flask-APScheduler来解这个问题,但是使用了这个库后,依然出现这个错误,查了下源码发现,flask-APscheduler其实也没做这个事儿:它直接用了Python的线程池,并没有将current_app注入到当前线程。
那么,解法就非常明确了:使用一个「支持自动注入current_app的线程池」,替换APScheduler使用的原生线程池。这里正好可以用到前面博客里面提到的 FlaskThread
:
from apscheduler.executors.pool import BasePoolExecutor
from flaskthreads import ThreadPoolWithAppContextExecutor
class FlaskThreadPoolExecutor(BasePoolExecutor):
"""
An executor that runs jobs in a concurrent.futures thread pool.
Plugin alias: ``threadpool``
:param max_workers: the maximum number of spawned threads.
:param pool_kwargs: dict of keyword arguments to pass to the underlying
ThreadPoolExecutor constructor
"""
def __init__(self, max_workers=10, pool_kwargs=None):
pool_kwargs = pool_kwargs or {}
pool = ThreadPoolWithAppContextExecutor(int(max_workers), **pool_kwargs)
super(FlaskThreadPoolExecutor, self).__init__(pool)
然后这样用:
scheduler = BackgroundScheduler()
with app.app_context():
scheduler.add_executor(FlaskThreadPoolExecutor())
scheduler.start()
app.scheduler = scheduler
嗯,这下子,就能在需要执行的地方,放心大胆地使用current_app
了:
from flask import current_app
def callback_fn(*args, **kwargs):
some_global_variable = current_app.some_variable
哦对了,进程池同理哦~
发表回复