使用SQLAlchemy操作数据库表过程解析
需求场景:
使用sqlalchmy从现有的表中获取数据(不是自己建表)。百度了一下,网上都是使用sqlalchemy自己先创建表,然后导入数据表的模型类进行增删改查;现在不是自己建表,该如何操作呢?
操作方案
通过sqlalchmey执行原生的sql语句,增删改查的原生语句携带表名,就不需要导入数据表的模型类了。
使用的包:
SQLAlchemy(1.3.10)+mysql-connector-python(8.0.19)
提供以下干货:
- 演示了向原生sql语句传递变量的用法即动态执行sql语句更加灵活
- 通过执行原生的sql语句实现操作已有的表
- 演示了sql语句根据多字段排序的方法等
DEMO
#-*-coding:utf-8-*- fromsqlalchemyimportcreate_engine,MetaData,Table,exists fromsqlalchemy.ormimportsessionmaker,scoped_session fromutil.LogimportLog fromconf.parseConfigimportparseConf #从配置文件中获取mysql的配置信息 host=parseConf.get_conf('MySQLInfo','host') port=parseConf.get_conf('MySQLInfo','port') dbname=parseConf.get_conf('MySQLInfo','dbname') usernm=parseConf.get_conf('MySQLInfo','usernm') passwd=parseConf.get_conf('MySQLInfo','passwd') engine_str="mysql+mysqlconnector://{0}:{1}@{2}:{3}/{4}".format(usernm,passwd,host,port,dbname) classOpsMysql(object): def__init__(self,log=Log(__file__).getlog()): self.log=log self.session=None try: self.engine=create_engine( engine_str, max_overflow=0,#超过连接池大小外最多创建的连接 pool_size=5,#连接池大小 pool_timeout=30,#池中没有线程最多等待的时间,否则报错 pool_recycle=-1,#多久之后对线程池中的线程进行一次连接的回收(重置) #echo=True,#显示相应执行的sql指令 encoding='utf-8' ) SessionFactory=sessionmaker(bind=self.engine) self.session=scoped_session(SessionFactory) exceptExceptionase: self.log.error(str(e)) self.log.error("Connect{0}@{1}:{2}failed!".format(dbname,host,port)) defget_session(self): returnself.session defgetEngine(self): returnself.engine definit_db(self,base): base.metadata.create_all(self.engine) defdrop_db(self,base): base.metadata.drop_all(self.engine) if__name__=="__main__": log=Log(__file__).getlog() tt=OpsMysql(log) session=tt.get_session() ifsession: #通过执行原生的sql语句实现操作已有的表 #此处演示了向原生sql语句传递变量的用法即动态执行sql语句更加灵活 mail_id=1 res=session.execute('select*fromtbl_mail_addrwheremail_id='"+mail_id+"'andmail_tp="c"') res01=res.fetchall()#结果是列表 print(res01[0]) #(1,'c',1,'XX@u163.com') mail_id='1' mail_type='c' #查询id为1,类型是c的邮箱信息,并按mail_tp降序,addr_no升序排列,限制查询数量100 sql="select*fromtbl_mail_addrwheretbl_mail_addr.oper_flag='1'andtbl_mail_addr.mail_id='"+mail_id+"'andtbl_mail_addr.mail_tp='"+mail_type+"'orderbytbl_mail_addr.mail_tp,tbl_mail_addr.addr_noASClimit100" result=session.execute(sql) value_list=result.fetchall() print(value_list) #[(1,'c',1,'XX@163.com'),(1,'c',2,'XX@qq.com),(1,'c',3,'XX@qq.com')] session.close()
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。