使用SQLAlchemy操作数据库表过程解析
需求场景:
使用sqlalchmy从现有的表中获取数据(不是自己建表)。百度了一下,网上都是使用sqlalchemy自己先创建表,然后导入数据表的模型类进行增删改查;现在不是自己建表,该如何操作呢?
操作方案
通过sqlalchmey执行原生的sql语句,增删改查的原生语句携带表名,就不需要导入数据表的模型类了。
使用的包:
SQLAlchemy(1.3.10)+mysql-connector-python(8.0.19)
提供以下干货:
- 演示了向原生sql语句传递变量的用法即动态执行sql语句更加灵活
- 通过执行原生的sql语句实现操作已有的表
- 演示了sql语句根据多字段排序的方法等
DEMO
#-*-coding:utf-8-*-
fromsqlalchemyimportcreate_engine,MetaData,Table,exists
fromsqlalchemy.ormimportsessionmaker,scoped_session
fromutil.LogimportLog
fromconf.parseConfigimportparseConf
#从配置文件中获取mysql的配置信息
host=parseConf.get_conf('MySQLInfo','host')
port=parseConf.get_conf('MySQLInfo','port')
dbname=parseConf.get_conf('MySQLInfo','dbname')
usernm=parseConf.get_conf('MySQLInfo','usernm')
passwd=parseConf.get_conf('MySQLInfo','passwd')
engine_str="mysql+mysqlconnector://{0}:{1}@{2}:{3}/{4}".format(usernm,passwd,host,port,dbname)
classOpsMysql(object):
def__init__(self,log=Log(__file__).getlog()):
self.log=log
self.session=None
try:
self.engine=create_engine(
engine_str,
max_overflow=0,#超过连接池大小外最多创建的连接
pool_size=5,#连接池大小
pool_timeout=30,#池中没有线程最多等待的时间,否则报错
pool_recycle=-1,#多久之后对线程池中的线程进行一次连接的回收(重置)
#echo=True,#显示相应执行的sql指令
encoding='utf-8'
)
SessionFactory=sessionmaker(bind=self.engine)
self.session=scoped_session(SessionFactory)
exceptExceptionase:
self.log.error(str(e))
self.log.error("Connect{0}@{1}:{2}failed!".format(dbname,host,port))
defget_session(self):
returnself.session
defgetEngine(self):
returnself.engine
definit_db(self,base):
base.metadata.create_all(self.engine)
defdrop_db(self,base):
base.metadata.drop_all(self.engine)
if__name__=="__main__":
log=Log(__file__).getlog()
tt=OpsMysql(log)
session=tt.get_session()
ifsession:
#通过执行原生的sql语句实现操作已有的表
#此处演示了向原生sql语句传递变量的用法即动态执行sql语句更加灵活
mail_id=1
res=session.execute('select*fromtbl_mail_addrwheremail_id='"+mail_id+"'andmail_tp="c"')
res01=res.fetchall()#结果是列表
print(res01[0])
#(1,'c',1,'XX@u163.com')
mail_id='1'
mail_type='c'
#查询id为1,类型是c的邮箱信息,并按mail_tp降序,addr_no升序排列,限制查询数量100
sql="select*fromtbl_mail_addrwheretbl_mail_addr.oper_flag='1'andtbl_mail_addr.mail_id='"+mail_id+"'andtbl_mail_addr.mail_tp='"+mail_type+"'orderbytbl_mail_addr.mail_tp,tbl_mail_addr.addr_noASClimit100"
result=session.execute(sql)
value_list=result.fetchall()
print(value_list)
#[(1,'c',1,'XX@163.com'),(1,'c',2,'XX@qq.com),(1,'c',3,'XX@qq.com')]
session.close()
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。