实例分析python3实现并发访问水平切分表
场景说明
假设有一个mysql表被水平切分,分散到多个host中,每个host拥有n个切分表。
如果需要并发去访问这些表,快速得到查询结果,应该怎么做呢?
这里提供一种方案,利用python3的asyncio异步io库及aiomysql异步库去实现这个需求。
代码演示
importlogging importrandom importasynciofromaiomysql importcreate_pool #假设mysql表分散在8个host,每个host有16张子表 TBLES={"192.168.1.01":"table_000-015", #000-015表示该ip下的表明从table_000一直连续到table_015 "192.168.1.02":"table_016-031", "192.168.1.03":"table_032-047", "192.168.1.04":"table_048-063", "192.168.1.05":"table_064-079", "192.168.1.06":"table_080-095", "192.168.1.07":"table_096-0111", "192.168.1.08":"table_112-0127", } USER="xxx"PASSWD="xxxx"#wrapper函数,用于捕捉异常defquery_wrapper(func): asyncdefwrapper(*args,**kwargs): try: awaitfunc(*args,**kwargs)exceptExceptionase: print(e)returnwrapper #实际的sql访问处理函数,通过aiomysql实现异步非阻塞请求@ query_wrapperasyncdefquery_do_something(ip,db,table): asyncwithcreate_pool(host=ip,db=db,user=USER,password=PASSWD)aspool: asyncwithpool.get()asconn: asyncwithconn.cursor()ascur: sql=("selectxxxfrom{}wherexxxx") awaitcur.execute(sql.format(table)) res=awaitcur.fetchall() #thendosomething...#生成sql访问队列,队列的每个元素包含要对某个表进行访问的函数及参数defgen_tasks(): tasks=[]forip,tblsinTBLES.items(): cols=re.split('_|-',tbls) tblpre="_".join(cols[:-2]) min_num=int(cols[-2]) max_num=int(cols[-1]) fornuminrange(min_num,max_num+1): tasks.append( (query_do_something,ip,'your_dbname','{}_{}'.format(tblpre,num)) ) random.shuffle(tasks) returntasks#按批量运行sql访问请求队列defrun_tasks(tasks,batch_len): try: foridxinrange(0,len(tasks),batch_len): batch_tasks=tasks[idx:idx+batch_len] logging.info("currentbatch,start_idx:%slen:%s"%(idx,len(batch_tasks))) foriinrange(0,len(batch_tasks)): l=batch_tasks[i] batch_tasks[i]=asyncio.ensure_future( l[0](*l[1:]) ) loop.run_until_complete(asyncio.gather(*batch_tasks)) exceptExceptionase: logging.warn(e)#main方法,通过asyncio实现函数异步调用defmain(): loop=asyncio.get_event_loop() tasks=gen_tasks() batch_len=len(TBLES.keys())*5#alluptoyou run_tasks(tasks,batch_len) loop.close()
以上就是本次相关内容的全部实例代码,大家可以本地测试以下,感谢你对毛票票的支持。