java使用多线程读取超大文件
接上次写的“JAVA读取超大文件”。在读取超过10G的文件时会发现一次读一行的速度实在是不能接受,想到使用多线程+FileChannel来做一个使用多线程版本。
基本思路如下:
1.计算出文件总大小
2.分段处理,计算出每个线程读取文件的开始与结束位置
(文件大小/线程数)*N,N是指第几个线程,这样能得到每个线程在读该文件的大概起始位置
使用"大概起始位置",作为读文件的开始偏移量(fileChannel.position("大概起始位置")),来读取该文件,直到读到第一个换行符,记录下这个换行符的位置,作为该线程的准确起始位置.同时它也是上一个线程的结束位置.最后一个线程的结束位置也直接设置为-1
3.启动线程,每个线程从开始位置读取到结束位置为止
代码如下:
读文件工具类
importjava.io.*;
importjava.nio.ByteBuffer;
importjava.nio.channels.FileChannel;
importjava.util.Observable;
/**
*CreatedwithIntelliJIDEA.
*User:okey
*Date:14-4-2
*Time:下午3:12
*读取文件
*/
publicclassReadFileextendsObservable{
privateintbufSize=1024;
//换行符
privatebytekey="\n".getBytes()[0];
//当前行数
privatelonglineNum=0;
//文件编码,默认为gb2312
privateStringencode="gb2312";
//具体业务逻辑监听器
privateReaderFileListenerreaderListener;
publicvoidsetEncode(Stringencode){
this.encode=encode;
}
publicvoidsetReaderListener(ReaderFileListenerreaderListener){
this.readerListener=readerListener;
}
/**
*获取准确开始位置
*@paramfile
*@paramposition
*@return
*@throwsException
*/
publiclonggetStartNum(Filefile,longposition)throwsException{
longstartNum=position;
FileChannelfcin=newRandomAccessFile(file,"r").getChannel();
fcin.position(position);
try{
intcache=1024;
ByteBufferrBuffer=ByteBuffer.allocate(cache);
//每次读取的内容
byte[]bs=newbyte[cache];
//缓存
byte[]tempBs=newbyte[0];
Stringline="";
while(fcin.read(rBuffer)!=-1){
intrSize=rBuffer.position();
rBuffer.rewind();
rBuffer.get(bs);
rBuffer.clear();
byte[]newStrByte=bs;
//如果发现有上次未读完的缓存,则将它加到当前读取的内容前面
if(null!=tempBs){
inttL=tempBs.length;
newStrByte=newbyte[rSize+tL];
System.arraycopy(tempBs,0,newStrByte,0,tL);
System.arraycopy(bs,0,newStrByte,tL,rSize);
}
//获取开始位置之后的第一个换行符
intendIndex=indexOf(newStrByte,0);
if(endIndex!=-1){
returnstartNum+endIndex;
}
tempBs=substring(newStrByte,0,newStrByte.length);
startNum+=1024;
}
}catch(Exceptione){
e.printStackTrace();
}finally{
fcin.close();
}
returnposition;
}
/**
*从设置的开始位置读取文件,一直到结束为止。如果end设置为负数,刚读取到文件末尾
*@paramfullPath
*@paramstart
*@paramend
*@throwsException
*/
publicvoidreadFileByLine(StringfullPath,longstart,longend)throwsException{
Filefin=newFile(fullPath);
if(fin.exists()){
FileChannelfcin=newRandomAccessFile(fin,"r").getChannel();
fcin.position(start);
try{
ByteBufferrBuffer=ByteBuffer.allocate(bufSize);
//每次读取的内容
byte[]bs=newbyte[bufSize];
//缓存
byte[]tempBs=newbyte[0];
Stringline="";
//当前读取文件位置
longnowCur=start;
while(fcin.read(rBuffer)!=-1){
nowCur+=bufSize;
intrSize=rBuffer.position();
rBuffer.rewind();
rBuffer.get(bs);
rBuffer.clear();
byte[]newStrByte=bs;
//如果发现有上次未读完的缓存,则将它加到当前读取的内容前面
if(null!=tempBs){
inttL=tempBs.length;
newStrByte=newbyte[rSize+tL];
System.arraycopy(tempBs,0,newStrByte,0,tL);
System.arraycopy(bs,0,newStrByte,tL,rSize);
}
//是否已经读到最后一位
booleanisEnd=false;
//如果当前读取的位数已经比设置的结束位置大的时候,将读取的内容截取到设置的结束位置
if(end>0&&nowCur>end){
//缓存长度-当前已经读取位数-最后位数
intl=newStrByte.length-(int)(nowCur-end);
newStrByte=substring(newStrByte,0,l);
isEnd=true;
}
intfromIndex=0;
intendIndex=0;
//每次读一行内容,以key(默认为\n)作为结束符
while((endIndex=indexOf(newStrByte,fromIndex))!=-1){
byte[]bLine=substring(newStrByte,fromIndex,endIndex);
line=newString(bLine,0,bLine.length,encode);
lineNum++;
//输出一行内容,处理方式由调用方提供
readerListener.outLine(line.trim(),lineNum,false);
fromIndex=endIndex+1;
}
//将未读取完成的内容放到缓存中
tempBs=substring(newStrByte,fromIndex,newStrByte.length);
if(isEnd){
break;
}
}
//将剩下的最后内容作为一行,输出,并指明这是最后一行
StringlineStr=newString(tempBs,0,tempBs.length,encode);
readerListener.outLine(lineStr.trim(),lineNum,true);
}catch(Exceptione){
e.printStackTrace();
}finally{
fcin.close();
}
}else{
thrownewFileNotFoundException("没有找到文件:"+fullPath);
}
//通知观察者,当前工作已经完成
setChanged();
notifyObservers(start+"-"+end);
}
/**
*查找一个byte[]从指定位置之后的一个换行符位置
*
*@paramsrc
*@paramfromIndex
*@return
*@throwsException
*/
privateintindexOf(byte[]src,intfromIndex)throwsException{
for(inti=fromIndex;i
读文件线程
/**
*CreatedwithIntelliJIDEA.
*User:okey
*Date:14-4-2
*Time:下午4:50
*TochangethistemplateuseFile|Settings|FileTemplates.
*/
publicclassReadFileThreadextendsThread{
privateReaderFileListenerprocessPoiDataListeners;
privateStringfilePath;
privatelongstart;
privatelongend;
publicReadFileThread(ReaderFileListenerprocessPoiDataListeners,longstart,longend,Stringfile){
this.setName(this.getName()+"-ReadFileThread");
this.start=start;
this.end=end;
this.filePath=file;
this.processPoiDataListeners=processPoiDataListeners;
}
@Override
publicvoidrun(){
ReadFilereadFile=newReadFile();
readFile.setReaderListener(processPoiDataListeners);
readFile.setEncode(processPoiDataListeners.getEncode());
//readFile.addObserver();
try{
readFile.readFileByLine(filePath,start,end+1);
}catch(Exceptione){
e.printStackTrace();
}
}
}
具体业务逻辑监听
/**
*CreatedwithOkey
*User:Okey
*Date:13-3-14
*Time:下午3:19
*NIO逐行读数据回调方法
*/
publicabstractclassReaderFileListener{
//一次读取行数,默认为500
privateintreadColNum=500;
privateStringencode;
privateListlist=newArrayList();
/**
*设置一次读取行数
*@paramreadColNum
*/
protectedvoidsetReadColNum(intreadColNum){
this.readColNum=readColNum;
}
publicStringgetEncode(){
returnencode;
}
publicvoidsetEncode(Stringencode){
this.encode=encode;
}
/**
*每读取到一行数据,添加到缓存中
*@paramlineStr读取到的数据
*@paramlineNum行号
*@paramover是否读取完成
*@throwsException
*/
publicvoidoutLine(StringlineStr,longlineNum,booleanover)throwsException{
if(null!=lineStr)
list.add(lineStr);
if(!over&&(lineNum%readColNum==0)){
output(list);
list.clear();
}elseif(over){
output(list);
list.clear();
}
}
/**
*批量输出
*
*@paramstringList
*@throwsException
*/
publicabstractvoidoutput(ListstringList)throwsException;
}
线程调度
importjava.io.File;
importjava.io.FileInputStream;
importjava.io.IOException;
/**
*CreatedwithIntelliJIDEA.
*User:okey
*Date:14-4-1
*Time:下午6:03
*TochangethistemplateuseFile|Settings|FileTemplates.
*/
publicclassBuildData{
publicstaticvoidmain(String[]args)throwsException{
Filefile=newFile("E:\\1396341974289.csv");
FileInputStreamfis=null;
try{
ReadFilereadFile=newReadFile();
fis=newFileInputStream(file);
intavailable=fis.available();
intmaxThreadNum=50;
//线程粗略开始位置
inti=available/maxThreadNum;
for(intj=0;j
现在就可以尽情的调整 maxThreadNum来享受风一般的速度吧!
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。