Java FTPClient连接池的实现
最近在写一个FTP上传工具,用到了Apache的FTPClient,为了提高上传效率,我采用了多线程的方式,但是每个线程频繁的创建和销毁FTPClient对象势必会造成不必要的开销,因此,此处最好使用一个FTPClient连接池。仔细翻了一下Apache的api,发现它并没有一个FTPClientPool的实现,所以,不得不自己写一个FTPClientPool。下面就大体介绍一下开发连接池的整个过程,供大家参考。
关于对象池
有些对象的创建开销是比较大的,比如数据库连接等。为了减少频繁创建、销毁对象带来的性能消耗,我们可以利用对象池的技术来实现对象的复用。对象池提供了一种机制,它可以管理对象池中对象的生命周期,提供了获取和释放对象的方法,可以让客户端很方便的使用对象池中的对象。
如果我们要自己实现一个对象池,一般需要完成如下功能:
1.如果池中有可用的对象,对象池应当能返回给客户端
2.客户端把对象放回池里后,可以对这些对象进行重用
3.对象池能够创建新的对象来满足客户端不断增长的需求
4.需要有一个正确关闭池的机制来结束对象的生命周期
Apache的对象池工具包
为了方便我们开发自己的对象池,Apache提供的common-pool工具包,里面包含了开发通用对象池的一些接口和实现类。其中最基本的两个接口是ObjectPool和PoolableObjectFactory。
ObjectPool接口中有几个最基本的方法:
1.addObject():添加对象到池
2.borrowObject():客户端从池中借出一个对象
3.returnObject():客户端归还一个对象到池中
4.close():关闭对象池,清理内存释放资源等
5.setFactory(ObjectFactoryfactory):需要一个工厂来制造池中的对象
PoolableObjectFactory接口中几个最基本的方法:
1.makeObject():制造一个对象
2.destoryObject():销毁一个对象
3.validateObject():验证一个对象是否还可用
通过以上两个接口我们就可以自己实现一个对象池了。
实例:开发一个FTPClient对象池
最近在开发一个项目,需要把hdfs中的文件上传到一组ftp服务器,为了提高上传效率,自然考虑到使用多线程的方式进行上传。我上传ftp用的工具是Apachecommon-net包中的FTPClient,但Apache并没有提供FTPClientPool,于是为了减少FTPClient的创建销毁次数,我们就自己开发一个FTPClientPool来复用FTPClient连接。
通过上面的介绍,我们可以利用Apache提供的common-pool包来协助我们开发连接池。而开发一个简单的对象池,仅需要实现common-pool包中的ObjectPool和PoolableObjectFactory两个接口即可。下面就看一下我写的实现:
写一个ObjectPool接口的实现FTPClientPool
importjava.io.IOException; importjava.util.NoSuchElementException; importjava.util.concurrent.ArrayBlockingQueue; importjava.util.concurrent.BlockingQueue; importjava.util.concurrent.TimeUnit; importorg.apache.commons.net.ftp.FTPClient; importorg.apache.commons.pool.ObjectPool; importorg.apache.commons.pool.PoolableObjectFactory; /** *实现了一个FTPClient连接池 *@authorheaven */ publicclassFTPClientPoolimplementsObjectPool{ privatestaticfinalintDEFAULT_POOL_SIZE=10; privatefinalBlockingQueue pool; privatefinalFtpClientFactoryfactory; /** *初始化连接池,需要注入一个工厂来提供FTPClient实例 *@paramfactory *@throwsException */ publicFTPClientPool(FtpClientFactoryfactory)throwsException{ this(DEFAULT_POOL_SIZE,factory); } /** * *@parammaxPoolSize *@paramfactory *@throwsException */ publicFTPClientPool(intpoolSize,FtpClientFactoryfactory)throwsException{ this.factory=factory; pool=newArrayBlockingQueue (poolSize*2); initPool(poolSize); } /** *初始化连接池,需要注入一个工厂来提供FTPClient实例 *@parammaxPoolSize *@throwsException */ privatevoidinitPool(intmaxPoolSize)throwsException{ for(inti=0;i factory)throwsIllegalStateException,UnsupportedOperationException{ } }
再写一个PoolableObjectFactory接口的实现FTPClientFactory
importjava.io.IOException; importorg.apache.commons.net.ftp.FTPClient; importorg.apache.commons.net.ftp.FTPReply; importorg.apache.commons.pool.PoolableObjectFactory; importorg.slf4j.Logger; importorg.slf4j.LoggerFactory; importcom.hdfstoftp.util.FTPClientException; /** *FTPClient工厂类,通过FTPClient工厂提供FTPClient实例的创建和销毁 *@authorheaven */ publicclassFtpClientFactoryimplementsPoolableObjectFactory{ privatestaticLoggerlogger=LoggerFactory.getLogger("file"); privateFTPClientConfigureconfig; //给工厂传入一个参数对象,方便配置FTPClient的相关参数 publicFtpClientFactory(FTPClientConfigureconfig){ this.config=config; } /*(non-Javadoc) *@seeorg.apache.commons.pool.PoolableObjectFactory#makeObject() */ publicFTPClientmakeObject()throwsException{ FTPClientftpClient=newFTPClient(); ftpClient.setConnectTimeout(config.getClientTimeout()); try{ ftpClient.connect(config.getHost(),config.getPort()); intreply=ftpClient.getReplyCode(); if(!FTPReply.isPositiveCompletion(reply)){ ftpClient.disconnect(); logger.warn("FTPServerrefusedconnection"); returnnull; } booleanresult=ftpClient.login(config.getUsername(),config.getPassword()); if(!result){ thrownewFTPClientException("ftpClient登陆失败!userName:"+config.getUsername()+";password:"+config.getPassword()); } ftpClient.setFileType(config.getTransferFileType()); ftpClient.setBufferSize(1024); ftpClient.setControlEncoding(config.getEncoding()); if(config.getPassiveMode().equals("true")){ ftpClient.enterLocalPassiveMode(); } }catch(IOExceptione){ e.printStackTrace(); }catch(FTPClientExceptione){ e.printStackTrace(); } returnftpClient; } /*(non-Javadoc) *@seeorg.apache.commons.pool.PoolableObjectFactory#destroyObject(java.lang.Object) */ publicvoiddestroyObject(FTPClientftpClient)throwsException{ try{ if(ftpClient!=null&&ftpClient.isConnected()){ ftpClient.logout(); } }catch(IOExceptionio){ io.printStackTrace(); }finally{ //注意,一定要在finally代码中断开连接,否则会导致占用ftp连接情况 try{ ftpClient.disconnect(); }catch(IOExceptionio){ io.printStackTrace(); } } } /*(non-Javadoc) *@seeorg.apache.commons.pool.PoolableObjectFactory#validateObject(java.lang.Object) */ publicbooleanvalidateObject(FTPClientftpClient){ try{ returnftpClient.sendNoOp(); }catch(IOExceptione){ thrownewRuntimeException("Failedtovalidateclient:"+e,e); } } publicvoidactivateObject(FTPClientftpClient)throwsException{ } publicvoidpassivateObject(FTPClientftpClient)throwsException{ } }
最后,我们最好给工厂传递一个参数对象,方便我们设置FTPClient的一些参数
packageorg.apache.commons.pool.impl.contrib; /** *FTPClient配置类,封装了FTPClient的相关配置 * *@authorheaven */ publicclassFTPClientConfigure{ privateStringhost; privateintport; privateStringusername; privateStringpassword; privateStringpassiveMode; privateStringencoding; privateintclientTimeout; privateintthreadNum; privateinttransferFileType; privatebooleanrenameUploaded; privateintretryTimes; publicStringgetHost(){ returnhost; } publicvoidsetHost(Stringhost){ this.host=host; } publicintgetPort(){ returnport; } publicvoidsetPort(intport){ this.port=port; } publicStringgetUsername(){ returnusername; } publicvoidsetUsername(Stringusername){ this.username=username; } publicStringgetPassword(){ returnpassword; } publicvoidsetPassword(Stringpassword){ this.password=password; } publicStringgetPassiveMode(){ returnpassiveMode; } publicvoidsetPassiveMode(StringpassiveMode){ this.passiveMode=passiveMode; } publicStringgetEncoding(){ returnencoding; } publicvoidsetEncoding(Stringencoding){ this.encoding=encoding; } publicintgetClientTimeout(){ returnclientTimeout; } publicvoidsetClientTimeout(intclientTimeout){ this.clientTimeout=clientTimeout; } publicintgetThreadNum(){ returnthreadNum; } publicvoidsetThreadNum(intthreadNum){ this.threadNum=threadNum; } publicintgetTransferFileType(){ returntransferFileType; } publicvoidsetTransferFileType(inttransferFileType){ this.transferFileType=transferFileType; } publicbooleanisRenameUploaded(){ returnrenameUploaded; } publicvoidsetRenameUploaded(booleanrenameUploaded){ this.renameUploaded=renameUploaded; } publicintgetRetryTimes(){ returnretryTimes; } publicvoidsetRetryTimes(intretryTimes){ this.retryTimes=retryTimes; } @Override publicStringtoString(){ return"FTPClientConfig[host="+host+"\nport="+port+"\nusername="+username+"\npassword="+password+"\npassiveMode="+passiveMode +"\nencoding="+encoding+"\nclientTimeout="+clientTimeout+"\nthreadNum="+threadNum+"\ntransferFileType=" +transferFileType+"\nrenameUploaded="+renameUploaded+"\nretryTimes="+retryTimes+"]"; } }
FTPClientPool连接池类管理FTPClient对象的生命周期,负责对象的借出、规划、池的销毁等;FTPClientPool类依赖于FtpClientFactory类,由这个工程类来制造和销毁对象;FtpClientFactory又依赖FTPClientConfigure类,FTPClientConfigure负责封装FTPClient的配置参数。至此,我们的FTPClient连接池就开发完成了。
需要注意的是,FTPClientPool中用到了一个阻塞队列ArrayBlockingQueue来管理存放FTPClient对象,关于阻塞队列,请参考我的这篇文章:【Java并发之】BlockingQueue
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。