您好, 欢迎来到 !    登录 | 注册 | | 设为首页 | 收藏本站

python – sqlalchemy的Flask应用程序上下文

5b51 2022/1/14 8:22:28 python 字数 4552 阅读 516 来源 www.jb51.cc/python

我正在研究烧瓶中的小休息api. Api具有注册请求并生成单独线程以在后台运行的路由.这是代码: def dostuff(scriptname): new_thread = threading.Thread(target=executescript,args=(scriptname,)) new_thread.start() 线程启动但是当我尝试从executioncript函数插

概述

def dostuff(scriptname):
    new_thread = threading.Thread(target=executescript,args=(scriptname,))
    new_thread.start()

线程启动但是当我尝试从executioncript函数插入db时它会出错.它抱怨db对象没有在应用程序中注册.

我正在动态创建我的应用程序(使用api作为Blueprint).

这是应用程序的结构

-run.py ## runner script
-config
   -development.py
   -prod.py
-app
  -__init__.py
  - auth.py
  - api_v1
     - __init__.py
     - routes.py
     - models.py

这是我的跑步者脚本run.py:

from app import create_app,db

if __name__ == '__main__':
    app = create_app(os.environ.get('FLASK_CONfig','development'))
    with app.app_context():
        db.create_all()
    app.run()

以下是来自app / __ init__.py的代码,用于创建应用:

from flask import Flask,jsonify,g
from flask.ext.sqlalchemy import sqlAlchemy

db = sqlAlchemy()

def create_app(config_name):
    """Create an application instance."""
    app = Flask(__name__)

    # apply configuration
    cfg = os.path.join(os.getcwd(),'config',config_name + '.py')
    app.config.from_pyfile(cfg)

    # initialize extensions
    db.init_app(app)
    # register blueprints
    from .api_v1 import api as api_blueprint
    app.register_blueprint(api_blueprint,url_prefix='/api/')
    return app

我需要知道的是如何在routes.py中扩展应用程序上下文.我不能直接在那里导入应用程序,如果我执行以下操作,我会得到RuntimeError:在应用程序上下文之外工作

def executescript(scriptname):
    with current_app.app_context():
        test_run = Testrun(pid=989,exit_status=988,comments="Test TestStarted")
        db.session.add(test_run)
        db.session.commit()
from threading import Thread
from app import app

def send_async_email(app,msg):
    with app.app_context():
        mail.send(msg)

def send_email(subject,sender,recipients,text_body,html_body):
    msg = Message(subject,sender=sender,recipients=recipients)
    msg.body = text_body
    msg.html = html_body
    thr = Thread(target=send_async_email,args=[app,msg])
    thr.start()

或者(可能是最好的解决方案)实际上是set up a thread-local scoped SQLAlchemy session,而不是依赖于Flask-sqlAlchemy的请求上下文.

>>> from sqlalchemy.orm import scoped_session
>>> from sqlalchemy.orm import sessionmaker

>>> session_factory = sessionmaker(bind=some_engine)
>>> Session = scoped_session(session_factory)

总结

以上是编程之家为你收集整理的python – sqlalchemy的Flask应用程序上下文全部内容,希望文章能够帮你解决python – sqlalchemy的Flask应用程序上下文所遇到的程序开发问题。


如果您也喜欢它,动动您的小指点个赞吧

除非注明,文章均由 laddyq.com 整理发布,欢迎转载。

转载请注明:
链接:http://laddyq.com
来源:laddyq.com
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。


联系我
置顶