flask-sqlalchemy 同时连接多个数据库,动态获得不同的session
目前正在用Flask做一个小项目,持久层用的是flask-sqlalchemy, 这个和直接使用sqlalchemy还略有些不同。 项目中有个需求,要从另外一个mssqlserver数据库中读取数据,查到的很多资料都是需要再次在代码定义数据库的表结构,通过定义不同的bind,然后使用关键字”bind_key “把表映射的不同的数据库中。这样略有些麻烦,要对接的mssqlserver是另外一个C/S系统的数据库,主要是读取报表功能,已经找到了一个方法,可以通过在界面上操作,直接获取到所有对数据库的操作。
1 2 3 4 5 6 7 SELECT st.text as sql_statement, qs.creation_time as plan_last_compiled, qs.last_execution_time as plan_last_executed, qs.execution_count as plan_executed_count FROM sys.dm_exec_query_stats qsCROSS APPLY sys.dm_exec_sql_text(qs.plan_handle) storder by plan_last_compiled desc
执行这条sql就可以知道这个客户端对sqlserver最终执行了什么,所以简单的操作就是直接把sql语句拿到我们的系统里直接执行就可以了,所以就需要一个方法传入一个变量,就直接拿到特殊的session,而这个session又是ORM框架管理,查了很多资料,包括flask-sqlalchemy的官网后都没有找到直接答案,自己翻了翻源码,发现这样实现是可以的。
配置部分:
db_setting.py 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 from contextlib import contextmanagerfrom flask_migrate import Migratefrom flask_sqlalchemy import SQLAlchemyfrom greenwater_app import appapp.config['SQLALCHEMY_DATABASE_URI' ] = 'sqlite:///db/database1.db' app.config['SQLALCHEMY_BINDS' ] = { 'database2' : 'sqlite:///db/database2.db' } app.config['SQLALCHEMY_COMMIT_ON_TEARDOWN' ] = True app.config['SQLALCHEMY_TRACK_MODIFICATIONS' ] = True app.config['SQLALCHEMY_ECHO' ] = True db = SQLAlchemy(app) migrate = Migrate(app, db, compare_type=True ) @contextmanager def session_scope (bind='default' ): """Provide a transactional scope around a series of operations.""" if bind != 'default' : s = db.create_scoped_session(options={'bind' : db.get_engine(db.get_app(), bind)}) else : s = db.session s.expire_on_commit = False try : yield s s.commit() except : s.rollback() raise finally : s.close()
正常的按照官网的逻辑是可以使用的。具体参考这里https://flask-sqlalchemy.palletsprojects.com/en/2.x/binds/
如果想直接拿到一个session去执行sql语句,可以这样:
1 2 3 4 5 6 7 8 sql = 'select * from user' with session_scope(bind='database2' ) as s: res = s.execute(sql).fetchall() print ([dict (r) for r in res]) with session_scope() as s: res = s.execute(sql).fetchall() print ([dict (r) for r in res])
通过给session_scope传入不同的变量,就可以获得一个针对不同的数据库的session。