2: foriinsys.argv[1:]: _argv=i.split('=') if_argv[0]=='--source': _list=_argv[1].split('/') self.source_host=_list[0].split(':')[0] self.source_port=int(_list[0].split(':')[1]) self.source_db=_list[1].split(':')[0] self.source_tab=_list[1].split(':')[1] self.source_user=_list[2] self.source_password=_list[3] elif_argv[0]=='--dest': _list=_argv[1].split('/') self.dest_host=_list[0].split(':')[0] self.dest_port=int(_list[0].split(':')[1]) self.dest_db=_list[1].split(':')[0] self.dest_tab=_list[1].split(':')[1] self.dest_user=_list[2] self.dest_password=_list[3] elif_argv[0]=='--delete_strategy': self.deleteStrategy=_argv[1] ifself.deleteStrategynotin('delete','drop'): print(self.usage) sys.exit(1) elif_argv[0]=='--primary_key': self.pk=_argv[1] elif_argv[0]=='--date_col': self.date_col=_argv[1] elif_argv[0]=='--time_interval': self.interval=_argv[1] else: print(self.usage) sys.exit(1) def__init__(self): self._get_argv() ##-------------------------------------------------------------------- self.sourcedb_conn_str=MySQLdb.connect(host=self.source_host,port=self.source_port,user=self.source_user,passwd=self.source_password,db=self.source_db,charset='utf8') self.sourcedb_conn_str.autocommit(True) self.destdb_conn_str=MySQLdb.connect(host=self.dest_host,port=self.dest_port,user=self.dest_user,passwd=self.dest_password,db=self.dest_db,charset='utf8') self.destdb_conn_str.autocommit(True) ##-------------------------------------------------------------------- self.template_tab=self.source_tab+'_template' self.step_size=20000 ##-------------------------------------------------------------------- self._migCompleteState=False self._deleteCompleteState=False ##-------------------------------------------------------------------- self.source_cnt='' self.source_min_id='' self.source_max_id='' self.source_checksum='' self.dest_cn='' ##-------------------------------------------------------------------- self.today=time.strftime("%Y-%m-%d") #self.today='2016-05-3009:59:40' defsourcedb_query(self,sql,sql_type): try: cr=self.sourcedb_conn_str.cursor() cr.execute(sql) ifsql_type=='select': returncr.fetchall() elifsql_type=='dml': rows=self.sourcedb_conn_str.affected_rows() returnrows else: returnTrue exceptException,e: print(str(e)+"
") returnFalse finally: cr.close() defdestdb_query(self,sql,sql_type,values=''): try: cr=self.destdb_conn_str.cursor() ifsql_type=='select': cr.execute(sql) returncr.fetchall() elifsql_type=='insertmany': cr.executemany(sql,values) rows=self.destdb_conn_str.affected_rows() returnrows else: cr.execute(sql) returnTrue exceptException,e: print(str(e)+"
") returnFalse finally: cr.close() defcreate_table_from_source(self): '''''因为tab_name表的数据需要迁移到archive引擎表,所以不适合使用这种方式。预留作其他用途。''' try: sql="showcreatetable%s;"%self.source_tab create_str=self.sourcedb_query(sql,'select')[0][1] create_str=create_str.replace('CREATETABLE','CREATETABLEIFNOTEXISTS') self.destdb_query(create_str,'ddl') returnTrue exceptException,e: print(str(e)+"
") returnFalse defcreate_table_from_template(self): try: sql='CREATETABLEIFNOTEXISTS%slike%s;'%(self.dest_tab,self.template_tab) state=self.destdb_query(sql,'ddl') ifstate: returnTrue else: returnFalse exceptException,e: print(str(e+"
")+"
") returnFalse defget_min_max(self): """创建目标表、并获取源表需要迁移的总条数、最小id、最大id""" try: print("\nStartingMigrateat--%s
")%(datetime.datetime.now().__str__()) sql="""selectcount(*),IFNULL(min(%s),-1),IFNULL(max(%s),-1)from%swhere%s>=CONCAT(DATE_FORMAT(DATE_ADD('%s',INTERVAL-%sday),'%%Y-%%m-%%d'),'00:00:00')\ and%s<=CONCAT(DATE_FORMAT(DATE_ADD('%s',INTERVAL-%sday),'%%Y-%%m-%%d'),'23:59:59')"""\ %(self.pk,self.pk,self.source_tab,self.date_col,self.today,self.interval,self.date_col,self.today,self.interval) q=self.sourcedb_query(sql,'select') self.source_cnt=q[0][0] self.source_min_id=q[0][1] self.source_max_id=q[0][2] self.source_checksum=str(self.source_cnt)+'_'+str(self.source_min_id)+'_'+str(self.source_max_id) ifself.source_cnt==0orself.source_min_id==-1orself.source_max_id==-1: print("Thereis0recordinsourcetablebeenmatched!
") returnFalse else: returnTrue exceptException,e: print(str(e)+"
") returnFalse defmigrate_2_destdb(self): try: get_min_max_id=self.get_min_max() ifget_min_max_id: k=self.source_min_id desc_sql="desc%s;"%self.source_tab #self.filed=[] cols=self.sourcedb_query(desc_sql,'select') #forjincols: #self.filed.append(j[0]) fileds="%s,"*len(cols)#源表有多少个字段,就拼凑多少个%s,拼接到insert语句 fileds=fileds.rstrip(',') whilek<=self.source_max_id: sql="""select*from%swhere%s>=%dand%s<%d\ and%s>=CONCAT(DATE_FORMAT(DATE_ADD('%s',INTERVAL-%sday),'%%Y-%%m-%%d'),'00:00:00')\ and%s<=CONCAT(DATE_FORMAT(DATE_ADD('%s',INTERVAL-%sday),'%%Y-%%m-%%d'),'23:59:59')"""\ %(self.source_tab,self.pk,k,self.pk,k+self.step_size,self.date_col,self.today,self.interval,self.date_col,self.today,self.interval) print("\n%s
")%sql starttime=datetime.datetime.now() results=self.sourcedb_query(sql,'select') insert_sql="insertinto"+self.dest_tab+"values(%s)"%fileds rows=self.destdb_query(insert_sql,'insertmany',results) ifrows==False: print("Insertfailed!!
") else: print("Inserted%srows.
")%rows endtime=datetime.datetime.now() timeinterval=endtime-starttime print("Elapsed:"+str(timeinterval.seconds)+'.'+str(timeinterval.microseconds)+"seconds
") k+=self.step_size print("\nInsertcompleteat--%s
")%(datetime.datetime.now().__str__()) returnTrue else: returnFalse exceptException,e: print(str(e)+"
") returnFalse defverify_total_cnt(self): try: sql="""selectcount(*),IFNULL(min(%s),-1),IFNULL(max(%s),-1)from%swhere%s>=CONCAT(DATE_FORMAT(DATE_ADD('%s',INTERVAL-%sday),'%%Y-%%m-%%d'),'00:00:00')\ and%s<=CONCAT(DATE_FORMAT(DATE_ADD('%s',INTERVAL-%sday),'%%Y-%%m-%%d'),'23:59:59')"""\ %(self.pk,self.pk,self.dest_tab,self.date_col,self.today,self.interval,self.date_col,self.today,self.interval) dest_result=self.destdb_query(sql,'select') self.dest_cnt=dest_result[0][0] dest_checksum=str(self.dest_cnt)+'_'+str(dest_result[0][1])+'_'+str(dest_result[0][2]) print("source_checksum:%s,dest_checksum:%s
")%(self.source_checksum,dest_checksum) ifself.source_cnt==dest_result[0][0]anddest_result[0][0]!=0andself.source_checksum==dest_checksum: self._migCompleteState=True print("Verifysuccessfully!!
") else: print("Verifyfailed!!
") sys.exit(77) exceptException,e: print(str(e)+"
") defdrop_daily_partition(self): try: ifself._migCompleteState: sql="""explainpartitionsselect*from%swhere%s>=CONCAT(DATE_FORMAT(DATE_ADD('%s',INTERVAL-%sday),'%%Y-%%m-%%d'),'00:00:00') and%s<=CONCAT(DATE_FORMAT(DATE_ADD('%s',INTERVAL-%sday),'%%Y-%%m-%%d'),'23:59:59')"""\ %(self.source_tab,self.date_col,self.today,self.interval,self.date_col,self.today,self.interval) partition_name=self.sourcedb_query(sql,'select') partition_name=partition_name[0][3] sql="""selectcount(*),IFNULL(min(%s),-1),IFNULL(max(%s),-1)from%spartition(%s)"""\ %(self.pk,self.pk,self.source_tab,partition_name) q=self.sourcedb_query(sql,'select') source_cnt=q[0][0] source_min_id=q[0][1] source_max_id=q[0][2] checksum=str(source_cnt)+'_'+str(source_min_id)+'_'+str(source_max_id) ifsource_cnt==0orsource_min_id==-1orsource_max_id==-1: print("Thereis0recordinsourcePARTITIONbeenmatched!
") else: ifchecksum==self.source_checksum: drop_par_sql="altertable%sdroppartition%s;"%(self.source_tab,partition_name) droped=self.sourcedb_query(drop_par_sql,'ddl') ifdroped: print(drop_par_sql+"
") print("\nDroppartitioncompleteat--%s
")%(datetime.datetime.now().__str__()) self._deleteCompleteState=True else: print(drop_par_sql+"
") print("Droppartitionfailed..
") else: print("Thepartition%schecksumfailed!!Dropfailed!!")%partition_name sys.exit(77) exceptException,e: print(str(e)+"
") defdelete_data(self): try: ifself._migCompleteState: k=self.source_min_id whilek<=self.source_max_id: sql="""deletefrom%swhere%s>=%dand%s<%d\ and%s>=CONCAT(DATE_FORMAT(DATE_ADD('%s',INTERVAL-%sday),'%%Y-%%m-%%d'),'00:00:00')\ and%s<=CONCAT(DATE_FORMAT(DATE_ADD('%s',INTERVAL-%sday),'%%Y-%%m-%%d'),'23:59:59')"""\ %(self.source_tab,self.pk,k,self.pk,k+self.step_size,self.date_col,self.today,self.interval,self.date_col,self.today,self.interval) print("\n%s
")%sql starttime=datetime.datetime.now() rows=self.sourcedb_query(sql,'dml') ifrows==False: print("Deletefailed!!
") else: print("Deleted%srows.
")%rows endtime=datetime.datetime.now() timeinterval=endtime-starttime print("Elapsed:"+str(timeinterval.seconds)+'.'+str(timeinterval.microseconds)+"seconds
") time.sleep(1) k+=self.step_size print("\nDeletecompleteat--%s
")%(datetime.datetime.now().__str__()) self._deleteCompleteState=True exceptException,e: print(str(e)+"
") defdo(self): tab_create=self.create_table_from_template() iftab_create: migration=self.migrate_2_destdb() ifmigration: self.verify_total_cnt() ifself._migCompleteState: ifself.deleteStrategy=='drop': self.drop_daily_partition() else: self.delete_data() print("\n
") print("====="*5+'
') print("source_total_cnt:%s
")%self.source_cnt print("dest_total_cnt:%s
")%self.dest_cnt print("====="*5+'
') ifself._deleteCompleteState: print("\nFinalresult:Successfully!!
") sys.exit(88) else: print("\nFinalresult:Failed!!
") sys.exit(254) else: print("Createtablefailed!Exiting...") sys.exit(255) f=ClassMigrate() f.do()

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。

热门推荐

1 毛坯房验收经验和常识 看了之后再验房心里有底
2 二手房收房如何交接 二手房收房注意问题
3 专业验收毛坯房的价格 商品房验收合格的标准
4 精装房怎么验收 精装房请验房师有用吗
5 一般要到哪里找验房师 验房师有哪些作用呢
6 请人验房一般是多少钱 验房师费用是多少
7 怎样测量房子面积 建筑面积和使用面积怎么算
8 收房需要注意什么 仔细检查不松懈
9 收房时三书一证一表是什么 主要作用介绍
10 交房时交房税费有哪些 本文为你一一讲解
11 验房都需要验什么 要做哪些准备呢
12 毛坯房验房师有必要请吗 毛坯房装修完如何验收
13 地下室防水工程质量验收规范详解
14 水性涂料、油性涂料区别介绍
15 零基础布艺DIY工坊 教你做超萌猫头鹰钥匙包
16 三棵树漆怎么样?三棵树漆官方网站
17 家庭“装修套餐”中猫腻你知道吗?
18 小空间大浴望 卫浴间装修巧支招