java结合WebSphere MQ实现接收队列文件功能
首先我们先来简单介绍下webspheremq以及安装使用简介
webspheremq :用于传输信息具有跨平台的功能。
1安装webspheremq并启动
2webspheremq建立queueManager(如:MQSI_SAMPLE_QM)
3建立queue类型选择Local类型的(如lq )
4 建立channels类型选择ServerConnection(如BridgeChannel)
接下来,我们来看实例代码:
MQFileReceiver.java packagecom.mq.dpca.file; importjava.io.File; importjava.io.FileOutputStream; importcom.ibm.mq.MQEnvironment; importcom.ibm.mq.MQException; importcom.ibm.mq.MQGetMessageOptions; importcom.ibm.mq.MQMessage; importcom.ibm.mq.MQQueue; importcom.ibm.mq.MQQueueManager; importcom.ibm.mq.constants.MQConstants; importcom.mq.dpca.msg.MQConfig; importcom.mq.dpca.util.ReadCmdLine; importcom.mq.dpca.util.RenameUtil; /** * *MQ分组接收文件功能 *主动轮询 */ publicclassMQFileReceiver{ privateMQQueueManagerqmgr;//连接到队列管理器 privateMQQueueinQueue;//传输队列 privateStringqueueName="";//队列名称 privateStringhost="";// privateintport=1414;//侦听器的端口号 privateStringchannel="";//通道名称 privateStringqmgrName="";//队列管理器 privateMQMessageinMsg;//创建消息缓冲 privateMQGetMessageOptionsgmo;//设置获取消息选项 privatestaticStringfileName=null;//接收队列上的消息并存入文件 privateintccsid=0; privatestaticStringfile_dir=null; /** *程序的入口 * *@paramargs */ publicstaticvoidmain(Stringargs[]){ MQFileReceivermfs=newMQFileReceiver(); //初始化连接 mfs.initproperty(); //接收文件 mfs.runGoupReceiver(); //获取shell脚本名 //Stringshellname=MQConfig.getValueByKey(fileName); //if(shellname!=null&&!"".equals(shellname)){ ////调用shell //ReadCmdLine.callShell(shellname); //}else{ //System.out.println("havenoshellname,Onlyreceivefiles."); //} } publicvoidrunGoupReceiver(){ try{ init(); getGroupMessages(); qmgr.commit(); System.out.println("\nMessagessuccessfullyReceive"); }catch(MQExceptionmqe){ mqe.printStackTrace(); try{ System.out.println("\nBackingoutTransaction"); qmgr.backout(); System.exit(2); }catch(Exceptione){ e.printStackTrace(); System.exit(2); } }catch(Exceptione){ e.printStackTrace(); System.exit(2); } } /** *初始化服务器连接信息 * *@throwsException */ privatevoidinit()throwsException{ /*为客户机连接设置MQEnvironment属性*/ MQEnvironment.hostname=host; MQEnvironment.channel=channel; MQEnvironment.port=port; /*连接到队列管理器*/ qmgr=newMQQueueManager(qmgrName); /*设置队列打开选项以输*/ intopnOptn=MQConstants.MQOO_INPUT_AS_Q_DEF |MQConstants.MQOO_FAIL_IF_QUIESCING; /*打开队列以输*/ inQueue=qmgr.accessQueue(queueName,opnOptn,null,null,null); } /** *接受文件的主函数 * *@throwsException */ publicvoidgetGroupMessages(){ /*设置获取消息选项*/ gmo=newMQGetMessageOptions(); gmo.options=MQConstants.MQGMO_FAIL_IF_QUIESCING; gmo.options=gmo.options+MQConstants.MQGMO_SYNCPOINT; /*等待消息*/ gmo.options=gmo.options+MQConstants.MQGMO_WAIT; /*设置等待时间限制*/ gmo.waitInterval=5000; /*只获取消息*/ gmo.options=gmo.options+MQConstants.MQGMO_ALL_MSGS_AVAILABLE; /*以辑顺序获取消息*/ gmo.options=gmo.options+MQConstants.MQGMO_LOGICAL_ORDER; gmo.matchOptions=MQConstants.MQMO_MATCH_GROUP_ID; /*创建消息缓冲*/ inMsg=newMQMessage(); try{ FileOutputStreamfos=null; /*处理组消息*/ while(true){ try{ inQueue.get(inMsg,gmo); if(fos==null){ try{ fileName=inMsg.getStringProperty("fileName"); StringfileName_full=null; fileName_full=file_dir+RenameUtil.rename(fileName); fos=newFileOutputStream(newFile(fileName_full)); intmsgLength=inMsg.getMessageLength(); byte[]buffer=newbyte[msgLength]; inMsg.readFully(buffer); fos.write(buffer,0,msgLength); /*查看是否是最后消息标识*/ charx=gmo.groupStatus; if(x==MQConstants.MQGS_LAST_MSG_IN_GROUP){ System.out.println("LastMsginGroup"); break; } inMsg.clearMessage(); }catch(Exceptione){ System.out .println("Receiverthemessagewithoutproperty,donothing!"); inMsg.clearMessage(); } }else{ intmsgLength=inMsg.getMessageLength(); byte[]buffer=newbyte[msgLength]; inMsg.readFully(buffer); fos.write(buffer,0,msgLength); /*查看是否是最后消息标识*/ charx=gmo.groupStatus; if(x==MQConstants.MQGS_LAST_MSG_IN_GROUP){ System.out.println("LastMsginGroup"); break; } inMsg.clearMessage(); } }catch(Exceptione){ charx=gmo.groupStatus; if(x==MQConstants.MQGS_LAST_MSG_IN_GROUP){ System.out.println("LastMsginGroup"); } break; } } if(fos!=null) fos.close(); }catch(Exceptione){ System.out.println(e.getMessage()); } } publicvoidinitproperty(){ MQConfigconfig=newMQConfig().getInstance(); if(config.getMQ_MANAGER()!=null){ qmgrName=config.getMQ_MANAGER(); queueName=config.getMQ_QUEUE_NAME(); channel=config.getMQ_CHANNEL(); host=config.getMQ_HOST_NAME(); port=Integer.valueOf(config.getMQ_PROT()); ccsid=Integer.valueOf(config.getMQ_CCSID()); file_dir=config.getFILE_DIR(); } } }