Java使用NIO包实现Socket通信的实例代码
前面几篇文章介绍了使用java.io和java.net类库实现的Socket通信,下面介绍一下使用java.nio类库实现的Socket。
java.nio包是Java在1.4之后增加的,用来提高I/O操作的效率。在nio包中主要包括以下几个类或接口:
- Buffer:缓冲区,用来临时存放输入或输出数据。
- Charset:用来把Unicode字符编码和其它字符编码互转。
- Channel:数据传输通道,用来把Buffer中的数据写入到数据源,或者把数据源中的数据读入到Buffer。
- Selector:用来支持异步I/O操作,也叫非阻塞I/O操作。
nio包中主要通过下面两个方面来提高I/O操作效率:
- 通过Buffer和Channel来提高I/O操作的速度。
- 通过Selector来支持非阻塞I/O操作。
下面来看一下程序中是怎么通过这些类库实现Socket功能。
首先介绍一下几个辅助类
辅助类SerializableUtil,这个类用来把java对象序列化成字节数组,或者把字节数组反序列化成java对象。
packagecom.googlecode.garbagecan.test.socket; importjava.io.ByteArrayInputStream; importjava.io.ByteArrayOutputStream; importjava.io.IOException; importjava.io.ObjectInputStream; importjava.io.ObjectOutputStream; publicclassSerializableUtil{ publicstaticbyte[]toBytes(Objectobject){ ByteArrayOutputStreambaos=newByteArrayOutputStream(); ObjectOutputStreamoos=null; try{ oos=newObjectOutputStream(baos); oos.writeObject(object); byte[]bytes=baos.toByteArray(); returnbytes; }catch(IOExceptionex){ thrownewRuntimeException(ex.getMessage(),ex); }finally{ try{ oos.close(); }catch(Exceptione){} } } publicstaticObjecttoObject(byte[]bytes){ ByteArrayInputStreambais=newByteArrayInputStream(bytes); ObjectInputStreamois=null; try{ ois=newObjectInputStream(bais); Objectobject=ois.readObject(); returnobject; }catch(IOExceptionex){ thrownewRuntimeException(ex.getMessage(),ex); }catch(ClassNotFoundExceptionex){ thrownewRuntimeException(ex.getMessage(),ex); }finally{ try{ ois.close(); }catch(Exceptione){} } } }
辅助类MyRequestObject和MyResponseObject,这两个类是普通的java对象,实现了Serializable接口。MyRequestObject类是Client发出的请求,MyResponseObject是Server端作出的响应。
packagecom.googlecode.garbagecan.test.socket.nio; importjava.io.Serializable; publicclassMyRequestObjectimplementsSerializable{ privatestaticfinallongserialVersionUID=1L; privateStringname; privateStringvalue; privatebyte[]bytes; publicMyRequestObject(Stringname,Stringvalue){ this.name=name; this.value=value; this.bytes=newbyte[1024]; } publicStringgetName(){ returnname; } publicvoidsetName(Stringname){ this.name=name; } publicStringgetValue(){ returnvalue; } publicvoidsetValue(Stringvalue){ this.value=value; } @Override publicStringtoString(){ StringBuffersb=newStringBuffer(); sb.append("Request[name:"+name+",value:"+value+",bytes:"+bytes.length+"]"); returnsb.toString(); } } packagecom.googlecode.garbagecan.test.socket.nio; importjava.io.Serializable; publicclassMyResponseObjectimplementsSerializable{ privatestaticfinallongserialVersionUID=1L; privateStringname; privateStringvalue; privatebyte[]bytes; publicMyResponseObject(Stringname,Stringvalue){ this.name=name; this.value=value; this.bytes=newbyte[1024]; } publicStringgetName(){ returnname; } publicvoidsetName(Stringname){ this.name=name; } publicStringgetValue(){ returnvalue; } publicvoidsetValue(Stringvalue){ this.value=value; } @Override publicStringtoString(){ StringBuffersb=newStringBuffer(); sb.append("Response[name:"+name+",value:"+value+",bytes:"+bytes.length+"]"); returnsb.toString(); } }
下面主要看一下Server端的代码,其中有一些英文注释对理解代码很有帮助,注释主要是来源jdk的文档和例子,这里就没有再翻译
packagecom.googlecode.garbagecan.test.socket.nio; importjava.io.ByteArrayOutputStream; importjava.io.IOException; importjava.net.InetSocketAddress; importjava.nio.ByteBuffer; importjava.nio.channels.ClosedChannelException; importjava.nio.channels.SelectionKey; importjava.nio.channels.Selector; importjava.nio.channels.ServerSocketChannel; importjava.nio.channels.SocketChannel; importjava.util.Iterator; importjava.util.logging.Level; importjava.util.logging.Logger; importcom.googlecode.garbagecan.test.socket.SerializableUtil; publicclassMyServer3{ privatefinalstaticLoggerlogger=Logger.getLogger(MyServer3.class.getName()); publicstaticvoidmain(String[]args){ Selectorselector=null; ServerSocketChannelserverSocketChannel=null; try{ //Selectorforincomingtimerequests selector=Selector.open(); //Createanewserversocketandsettononblockingmode serverSocketChannel=ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); //Bindtheserversockettothelocalhostandport serverSocketChannel.socket().setReuseAddress(true); serverSocketChannel.socket().bind(newInetSocketAddress(10000)); //Registeracceptsontheserversocketwiththeselector.This //steptellstheselectorthatthesocketwantstobeputonthe //readylistwhenacceptoperationsoccur,soallowingmultiplexed //non-blockingI/Ototakeplace. serverSocketChannel.register(selector,SelectionKey.OP_ACCEPT); //Here'swhereeverythinghappens.Theselectmethodwill //returnwhenanyoperationsregisteredabovehaveoccurred,the //threadhasbeeninterrupted,etc. while(selector.select()>0){ //SomeoneisreadyforI/O,getthereadykeys Iterator<SelectionKey>it=selector.selectedKeys().iterator(); //Walkthroughthereadykeyscollectionandprocessdaterequests. while(it.hasNext()){ SelectionKeyreadyKey=it.next(); it.remove(); //Thekeyindexesintotheselectorsoyou //canretrievethesocketthat'sreadyforI/O execute((ServerSocketChannel)readyKey.channel()); } } }catch(ClosedChannelExceptionex){ logger.log(Level.SEVERE,null,ex); }catch(IOExceptionex){ logger.log(Level.SEVERE,null,ex); }finally{ try{ selector.close(); }catch(Exceptionex){} try{ serverSocketChannel.close(); }catch(Exceptionex){} } } privatestaticvoidexecute(ServerSocketChannelserverSocketChannel)throwsIOException{ SocketChannelsocketChannel=null; try{ socketChannel=serverSocketChannel.accept(); MyRequestObjectmyRequestObject=receiveData(socketChannel); logger.log(Level.INFO,myRequestObject.toString()); MyResponseObjectmyResponseObject=newMyResponseObject( "responsefor"+myRequestObject.getName(), "responsefor"+myRequestObject.getValue()); sendData(socketChannel,myResponseObject); logger.log(Level.INFO,myResponseObject.toString()); }finally{ try{ socketChannel.close(); }catch(Exceptionex){} } } privatestaticMyRequestObjectreceiveData(SocketChannelsocketChannel)throwsIOException{ MyRequestObjectmyRequestObject=null; ByteArrayOutputStreambaos=newByteArrayOutputStream(); ByteBufferbuffer=ByteBuffer.allocate(1024); try{ byte[]bytes; intsize=0; while((size=socketChannel.read(buffer))>=0){ buffer.flip(); bytes=newbyte[size]; buffer.get(bytes); baos.write(bytes); buffer.clear(); } bytes=baos.toByteArray(); Objectobj=SerializableUtil.toObject(bytes); myRequestObject=(MyRequestObject)obj; }finally{ try{ baos.close(); }catch(Exceptionex){} } returnmyRequestObject; } privatestaticvoidsendData(SocketChannelsocketChannel,MyResponseObjectmyResponseObject)throwsIOException{ byte[]bytes=SerializableUtil.toBytes(myResponseObject); ByteBufferbuffer=ByteBuffer.wrap(bytes); socketChannel.write(buffer); } }
下面是Client的代码,代码比较简单就是启动了100个线程来访问Server
packagecom.googlecode.garbagecan.test.socket.nio; importjava.io.ByteArrayOutputStream; importjava.io.IOException; importjava.net.InetSocketAddress; importjava.net.SocketAddress; importjava.nio.ByteBuffer; importjava.nio.channels.SocketChannel; importjava.util.logging.Level; importjava.util.logging.Logger; importcom.googlecode.garbagecan.test.socket.SerializableUtil; publicclassMyClient3{ privatefinalstaticLoggerlogger=Logger.getLogger(MyClient3.class.getName()); publicstaticvoidmain(String[]args)throwsException{ for(inti=0;i<100;i++){ finalintidx=i; newThread(newMyRunnable(idx)).start(); } } privatestaticfinalclassMyRunnableimplementsRunnable{ privatefinalintidx; privateMyRunnable(intidx){ this.idx=idx; } publicvoidrun(){ SocketChannelsocketChannel=null; try{ socketChannel=SocketChannel.open(); SocketAddresssocketAddress=newInetSocketAddress("localhost",10000); socketChannel.connect(socketAddress); MyRequestObjectmyRequestObject=newMyRequestObject("request_"+idx,"request_"+idx); logger.log(Level.INFO,myRequestObject.toString()); sendData(socketChannel,myRequestObject); MyResponseObjectmyResponseObject=receiveData(socketChannel); logger.log(Level.INFO,myResponseObject.toString()); }catch(Exceptionex){ logger.log(Level.SEVERE,null,ex); }finally{ try{ socketChannel.close(); }catch(Exceptionex){} } } privatevoidsendData(SocketChannelsocketChannel,MyRequestObjectmyRequestObject)throwsIOException{ byte[]bytes=SerializableUtil.toBytes(myRequestObject); ByteBufferbuffer=ByteBuffer.wrap(bytes); socketChannel.write(buffer); socketChannel.socket().shutdownOutput(); } privateMyResponseObjectreceiveData(SocketChannelsocketChannel)throwsIOException{ MyResponseObjectmyResponseObject=null; ByteArrayOutputStreambaos=newByteArrayOutputStream(); try{ ByteBufferbuffer=ByteBuffer.allocateDirect(1024); byte[]bytes; intcount=0; while((count=socketChannel.read(buffer))>=0){ buffer.flip(); bytes=newbyte[count]; buffer.get(bytes); baos.write(bytes); buffer.clear(); } bytes=baos.toByteArray(); Objectobj=SerializableUtil.toObject(bytes); myResponseObject=(MyResponseObject)obj; socketChannel.socket().shutdownInput(); }finally{ try{ baos.close(); }catch(Exceptionex){} } returnmyResponseObject; } } }
最后测试上面的代码,首先运行Server类,然后运行Client类,就可以分别在Server端和Client端控制台看到发送或接收到的MyRequestObject或MyResponseObject对象了。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。