Python实现 多进程导入CSV数据到 MySQL
前段时间帮同事处理了一个把CSV数据导入到MySQL的需求。两个很大的CSV文件,分别有3GB、2100万条记录和7GB、3500万条记录。对于这个量级的数据,用简单的单进程/单线程导入会耗时很久,最终用了多进程的方式来实现。具体过程不赘述,记录一下几个要点:
- 批量插入而不是逐条插入
- 为了加快插入速度,先不要建索引
- 生产者和消费者模型,主进程读文件,多个worker进程执行插入
- 注意控制worker的数量,避免对MySQL造成太大的压力
- 注意处理脏数据导致的异常
- 原始数据是GBK编码,所以还要注意转换成UTF-8
- 用click封装命令行工具
具体的代码实现如下:
#!/usr/bin/envpython #-*-coding:utf-8-*- importcodecs importcsv importlogging importmultiprocessing importos importwarnings importclick importMySQLdb importsqlalchemy warnings.filterwarnings('ignore',category=MySQLdb.Warning) #批量插入的记录数量 BATCH=5000 DB_URI='mysql://root@localhost:3306/example?charset=utf8' engine=sqlalchemy.create_engine(DB_URI) defget_table_cols(table): sql='SELECT*FROM`{table}`LIMIT0'.format(table=table) res=engine.execute(sql) returnres.keys() definsert_many(table,cols,rows,cursor): sql='INSERTINTO`{table}`({cols})VALUES({marks})'.format( table=table, cols=','.join(cols), marks=','.join(['%s']*len(cols))) cursor.execute(sql,*rows) logging.info('process%sinserted%srowsintotable%s',os.getpid(),len(rows),table) definsert_worker(table,cols,queue): rows=[] #每个子进程创建自己的engine对象 cursor=sqlalchemy.create_engine(DB_URI) whileTrue: row=queue.get() ifrowisNone: ifrows: insert_many(table,cols,rows,cursor) break rows.append(row) iflen(rows)==BATCH: insert_many(table,cols,rows,cursor) rows=[] definsert_parallel(table,reader,w=10): cols=get_table_cols(table) #数据队列,主进程读文件并往里写数据,worker进程从队列读数据 #注意一下控制队列的大小,避免消费太慢导致堆积太多数据,占用过多内存 queue=multiprocessing.Queue(maxsize=w*BATCH*2) workers=[] foriinrange(w): p=multiprocessing.Process(target=insert_worker,args=(table,cols,queue)) p.start() workers.append(p) logging.info('starting#%sworkerprocess,pid:%s...',i+1,p.pid) dirty_data_file='./{}_dirty_rows.csv'.format(table) xf=open(dirty_data_file,'w') writer=csv.writer(xf,delimiter=reader.dialect.delimiter) forlineinreader: #记录并跳过脏数据:键值数量不一致 iflen(line)!=len(cols): writer.writerow(line) continue #把None值替换为'NULL' clean_line=[Noneifx=='NULL'elsexforxinline] #往队列里写数据 queue.put(tuple(clean_line)) ifreader.line_num%500000==0: logging.info('put%stasksintoqueue.',reader.line_num) xf.close() #给每个worker发送任务结束的信号 logging.info('sendclosesignaltoworkerprocesses') foriinrange(w): queue.put(None) forpinworkers: p.join() defconvert_file_to_utf8(f,rv_file=None): ifnotrv_file: name,ext=os.path.splitext(f) ifisinstance(name,unicode): name=name.encode('utf8') rv_file='{}_utf8{}'.format(name,ext) logging.info('starttoprocessfile%s',f) withopen(f)asinfd: withopen(rv_file,'w')asoutfd: lines=[] loop=0 chunck=200000 first_line=infd.readline().strip(codecs.BOM_UTF8).strip()+'\n' lines.append(first_line) forlineininfd: clean_line=line.decode('gb18030').encode('utf8') clean_line=clean_line.rstrip()+'\n' lines.append(clean_line) iflen(lines)==chunck: outfd.writelines(lines) lines=[] loop+=1 logging.info('processed%slines.',loop*chunck) outfd.writelines(lines) logging.info('processed%slines.',loop*chunck+len(lines)) @click.group() defcli(): logging.basicConfig(level=logging.INFO, format='%(asctime)s-%(levelname)s-%(name)s-%(message)s') @cli.command('gbk_to_utf8') @click.argument('f') defconvert_gbk_to_utf8(f): convert_file_to_utf8(f) @cli.command('load') @click.option('-t','--table',required=True,help='表名') @click.option('-i','--filename',required=True,help='输入文件') @click.option('-w','--workers',default=10,help='worker数量,默认10') defload_fac_day_pro_nos_sal_table(table,filename,workers): withopen(filename)asfd: fd.readline()#skipheader reader=csv.reader(fd) insert_parallel(table,reader,w=workers) if__name__=='__main__': cli()
以上就是本文给大家分享的全部没人了,希望大家能够喜欢