FlaskThread

遇到一个奇葩需求:某个 API 的回调,需要业务方在 1s 内返回 200 结果。但是我们的处理非常重,5 分钟都不一定能处理完毕。这时候需要先返回一个假的 200 response,再将实际的处理过程放在单独的线程里面慢慢做。

这里一个重点是获取 app context,或者说 current app。Google 找到了解决办法,有两个实现。第一个实现是 StackOverflow 版本的,一直会有问题;第二个实现是 GitHub 里面 flask-threads 包,感觉不错。

from threading import Thread

from flask import _app_ctx_stack
from flask import has_app_context

class FlaskThread(Thread):
    """Implements Thread with flask AppContext."""

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        if not has_app_context():
            raise RuntimeError('Running outside of Flask AppContext.')
        self.app_ctx = _app_ctx_stack.top

    def run(self):
        try:
            self.app_ctx.push()
            super().run()
        finally:
            self.app_ctx.pop()

嗯,就这样~

发表评论