Python实现mysql数据库更新表数据接口的功能
前言
昨天,因为项目需求要添加表的更新接口,来存储预测模型训练的数据,所以自己写了一段代码实现了该功能,在开始之前,给大家分享python操作mysql数据库基础:
#coding=utf-8 importMySQLdb conn=MySQLdb.connect( host='localhost', port=3306, user='root', passwd='123456', db='test', ) cur=conn.cursor() #创建数据表 #cur.execute("createtablestudent(idint,namevarchar(20),classvarchar(30),agevarchar(10))") #插入一条数据 #cur.execute("insertintostudentvalues('2','Tom','3year2class','9')") #修改查询条件的数据 #cur.execute("updatestudentsetclass='3year1class'wherename='Tom'") #删除查询条件的数据 #cur.execute("deletefromstudentwhereage='9'") cur.close() conn.commit() conn.close()
>>>conn=MySQLdb.connect(host='localhost',port=3306,user='root',passwd='123456',db='test',)
Connect()方法用于创建数据库的连接,里面可以指定参数:用户名,密码,主机等信息。
这只是连接到了数据库,要想操作数据库需要创建游标。
>>>cur=conn.cursor()
通过获取到的数据库连接conn下的cursor()方法来创建游标。
>>>cur.execute("createtablestudent(idint,namevarchar(20),classvarchar(30),agevarchar(10))")
通过游标cur操作execute()方法可以写入纯sql语句。通过execute()方法中写如sql语句来对数据进行操作。
>>>cur.close()
cur.close()关闭游标
>>>conn.commit()
conn.commit()方法在提交事物,在向数据库插入一条数据时必须要有这个方法,否则数据不会被真正的插入。
>>>conn.close()
Conn.close()关闭数据库连接
下面开始本文的正文:
Python实现mysql更新表数据接口
示例代码
#-*-coding:utf-8-*- importpymysql importsettings classmysql(object): def__init__(self): self.db=None defconnect(self): self.db=pymysql.connect(host=settings.ip,port=settings.port,user=settings.mysql_user,passwd=settings.mysql_passwd,db=settings.database,) #print("connectisok") #return1 defdisconnect(self): self.db.close() #return-1 defcreate_table(self,tablename,columns,spec='time'): """ :paramtablename: :paramspec: :paramcolumns:列表[] :return: """ type_data=['int','double(10,3)'] cursor=self.db.cursor() sql="createtable%s("%(tablename,) sqls=[] forcolincolumns: #判断是否time_num ifcol==spec: sqls.append('%s%sprimarykey'%(col,type_data[0])) else: sqls.append('%s%s'%(col,type_data[1])) sqlStr=','.join(sqls) sql+=sqlStr+')' try: cursor.execute(sql) print("Table%siscreated"%tablename) except: self.db.rollback() defis_table_exist(self,tablename,dbname): cursor=self.db.cursor() sql="selecttable_namefrominformation_schema.TABLESwheretable_schema='%s'andtable_name='%s'"%(dbname,tablename) #results="error:Thietableisnotexit" try: cursor.execute(sql) results=cursor.fetchall()#接受全部返回行 except: #不存在这张表返回错误提示 raiseException('Thistabledoesnotexist') ifnotresults: returnNone else: returnresults #printdatas definsert_mysql_with_json(self,tablename,datas): """ :paramtablename: :paramdatas:字典{(key:value),.....} :return: """ #keys=datas[0] keys=datas[0].keys() keys=str(tuple(keys)) keys=''.join(keys.split("'"))#用'隔开 print(keys) ret=[] fordtindatas: values=dt.values()##‘str'objecthasnoattribute# sql="insertinto%s"%tablename+keys sql=sql+"values"+str(tuple(values)) ret.append(sql) #print("1") #printkeysinsertinto%tablenamedat[i]valuesstr[i] self.insert_into_sql(ret) print("1") definsert_into_sql(self,sqls): cursor=self.db.cursor() forsqlinsqls: #执行sql语句 try: cursor.execute(sql) self.db.commit() #print("insert%s"%sql,"success.") except: #Rollbackincasethereisanyerror self.db.rollback() #找列名 deffind_columns(self,tablename): sql="selectCOLUMN_NAMEfrominformation_schema.columnswheretable_name='%s'"%tablename cursor=self.db.cursor() try: cursor.execute(sql) results=cursor.fetchall() except: raiseException('hello') returntuple(map(lambdax:x[0],results)) deffind(self,tablename,start_time,end_time,fieldName=None): """ :paramtablename:test_scale1015 :paramfieldName:Noneor(columns1010,columns1011,columns1012,columns1013,time) :return: """ cursor=self.db.cursor() sql='' iffieldName==None: fieldName=self.find_columns(tablename) sql="select*from%swheretimebetween%sand%s"%(tablename,str(start_time),str(end_time)) #print('None') else: fieldNameStr=','.join(fieldName) sql="select%sfrom%swheretimebetween%sand%s"%( fieldNameStr,tablename,str(start_time),str(end_time)) #print('sm') try: cursor.execute(sql) results=cursor.fetchall() except: raiseException('hello') returnfieldName,results, #样例data=[{'time':123321,'predict':1.222},{'time':123322,'predict':1.223},{'time':123324,'predict':1.213}] defupdata(self,datas,tablename): cursor=self.db.cursor() columns=[] fordataindatas: foriindata.keys(): columns.append(i) #print(columns) break #columns_2=columns[:] db.connect() ifdb.is_table_exist(settings.tablename_2,settings.database): #exists #pass forcolincolumns: ifcol!='time': sql="altertable%saddcolumn%sdouble(10,3);"%(settings.tablename_2,col) try: cursor.execute(sql) print("%sisalteredok"%(col)) except: print("alterisfailed") ret=[] foriindatas: col=[] foriiini.keys(): col.append(ii) #time=col[0]andpredict=col[1] time_data=i[col[0]] predic_data=i[col[1]] sql="update%sset%s='%s'where%s=%s"%(settings.tablename_2,col[1],predic_data,col[0],time_data) ret.append(sql) self.insert_into_sql(ret) #db.insert_mysql_with_json(tablename,datas) else: #noexists db.create_table(settings.tablename_2,columns) db.insert_mysql_with_json(settings.tablename_2,datas) db=mysql()
其中update()函数,是新添加的接口:
传入的data的样例data=[{'time':123321,'predict':1.222},{'time':123322,'predict':1.223},{'time':123324,'predict':1.213}]这样子的。
一个列表里有多个字典,每个字典有time和predict。如果需要存predict_2,predict_3的时候,则实现更新操作,否则,只进行创表和插入数据的操作~~~~~~
看起来是不是很简单~~~~~~
这个接口还没有进行优化等操作,很冗余~~~~
毕竟项目还在测试阶段,等先跑通了,在考虑优化吧~~~~~~
总结
以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,如果有疑问大家可以留言交流,谢谢大家对毛票票的支持。