spring与disruptor集成的简单示例
disruptor不过多介绍了,描述下当前的业务场景,两个应用A,B,应用A向应用B传递数据.数据传送比较快,如果用http直接push数据然后入库,效率不高.有可能导致A应用比较大的压力.使用mq太重量级,所以选择了disruptor.也可以使用Reactor
BaseQueueHelper.java
/** *lmax.disruptor高效队列处理模板.支持初始队列,即在init()前进行发布。 * *调用init()时才真正启动线程开始处理系统退出自动清理资源. * *@authorxielongwang *@create2018-01-18下午3:49 *@emailxielong.wang@nvr-china.com *@description */ publicabstractclassBaseQueueHelper,HextendsWorkHandler >{ /** *记录所有的队列,系统退出时统一清理资源 */ privatestaticList queueHelperList=newArrayList (); /** *Disruptor对象 */ privateDisruptor disruptor; /** *RingBuffer */ privateRingBuffer ringBuffer; /** *initQueue */ privateList initQueue=newArrayList (); /** *队列大小 * *@return队列长度,必须是2的幂 */ protectedabstractintgetQueueSize(); /** *事件工厂 * *@returnEventFactory */ protectedabstractEventFactory eventFactory(); /** *事件消费者 * *@returnWorkHandler[] */ protectedabstractWorkHandler[]getHandler(); /** *初始化 */ publicvoidinit(){ ThreadFactorynamedThreadFactory=newThreadFactoryBuilder().setNameFormat("DisruptorThreadPool").build(); disruptor=newDisruptor (eventFactory(),getQueueSize(),namedThreadFactory,ProducerType.SINGLE,getStrategy()); disruptor.setDefaultExceptionHandler(newMyHandlerException()); disruptor.handleEventsWithWorkerPool(getHandler()); ringBuffer=disruptor.start(); //初始化数据发布 for(Ddata:initQueue){ ringBuffer.publishEvent(newEventTranslatorOneArg (){ @Override publicvoidtranslateTo(Eevent,longsequence,Ddata){ event.setValue(data); } },data); } //加入资源清理钩子 synchronized(queueHelperList){ if(queueHelperList.isEmpty()){ Runtime.getRuntime().addShutdownHook(newThread(){ @Override publicvoidrun(){ for(BaseQueueHelperbaseQueueHelper:queueHelperList){ baseQueueHelper.shutdown(); } } }); } queueHelperList.add(this); } } /** *如果要改变线程执行优先级,override此策略.YieldingWaitStrategy会提高响应并在闲时占用70%以上CPU, *慎用SleepingWaitStrategy会降低响应更减少CPU占用,用于日志等场景. * *@returnWaitStrategy */ protectedabstractWaitStrategygetStrategy(); /** *插入队列消息,支持在对象init前插入队列,则在队列建立时立即发布到队列处理. */ publicsynchronizedvoidpublishEvent(Ddata){ if(ringBuffer==null){ initQueue.add(data); return; } ringBuffer.publishEvent(newEventTranslatorOneArg (){ @Override publicvoidtranslateTo(Eevent,longsequence,Ddata){ event.setValue(data); } },data); } /** *关闭队列 */ publicvoidshutdown(){ disruptor.shutdown(); } }
EventFactory.java
/** *@authorxielongwang *@create2018-01-18下午6:24 *@emailxielong.wang@nvr-china.com *@description */ publicclassEventFactoryimplementscom.lmax.disruptor.EventFactory{ @Override publicSeriesDataEventnewInstance(){ returnnewSeriesDataEvent(); } }
MyHandlerException.java
publicclassMyHandlerExceptionimplementsExceptionHandler{ privateLoggerlogger=LoggerFactory.getLogger(MyHandlerException.class); /* *(non-Javadoc)运行过程中发生时的异常 * *@see *com.lmax.disruptor.ExceptionHandler#handleEventException(java.lang.Throwable *,long,java.lang.Object) */ @Override publicvoidhandleEventException(Throwableex,longsequence,Objectevent){ ex.printStackTrace(); logger.error("processdataerrorsequence==[{}]event==[{}],ex==[{}]",sequence,event.toString(),ex.getMessage()); } /* *(non-Javadoc)启动时的异常 * *@see *com.lmax.disruptor.ExceptionHandler#handleOnStartException(java.lang. *Throwable) */ @Override publicvoidhandleOnStartException(Throwableex){ logger.error("startdisruptorerror==[{}]!",ex.getMessage()); } /* *(non-Javadoc)关闭时的异常 * *@see *com.lmax.disruptor.ExceptionHandler#handleOnShutdownException(java.lang *.Throwable) */ @Override publicvoidhandleOnShutdownException(Throwableex){ logger.error("shutdowndisruptorerror==[{}]!",ex.getMessage()); } }
SeriesData.java(代表应用A发送给应用B的消息)
publicclassSeriesData{ privateStringdeviceInfoStr; publicSeriesData(){ } publicSeriesData(StringdeviceInfoStr){ this.deviceInfoStr=deviceInfoStr; } publicStringgetDeviceInfoStr(){ returndeviceInfoStr; } publicvoidsetDeviceInfoStr(StringdeviceInfoStr){ this.deviceInfoStr=deviceInfoStr; } @Override publicStringtoString(){ return"SeriesData{"+ "deviceInfoStr='"+deviceInfoStr+'\''+ '}'; } }
SeriesDataEvent.java
publicclassSeriesDataEventextendsValueWrapper{ }
SeriesDataEventHandler.java
publicclassSeriesDataEventHandlerimplementsWorkHandler{ privateLoggerlogger=LoggerFactory.getLogger(SeriesDataEventHandler.class); @Autowired privateDeviceInfoServicedeviceInfoService; @Override publicvoidonEvent(SeriesDataEventevent){ if(event.getValue()==null||StringUtils.isEmpty(event.getValue().getDeviceInfoStr())){ logger.warn("receiverseriesdataisempty!"); } //业务处理 deviceInfoService.processData(event.getValue().getDeviceInfoStr()); } }
SeriesDataEventQueueHelper.java
@Component publicclassSeriesDataEventQueueHelperextendsBaseQueueHelperimplementsInitializingBean{ privatestaticfinalintQUEUE_SIZE=1024; @Autowired privateList seriesDataEventHandler; @Override protectedintgetQueueSize(){ returnQUEUE_SIZE; } @Override protectedcom.lmax.disruptor.EventFactoryeventFactory(){ returnnewEventFactory(); } @Override protectedWorkHandler[]getHandler(){ intsize=seriesDataEventHandler.size(); SeriesDataEventHandler[]paramEventHandlers=(SeriesDataEventHandler[])seriesDataEventHandler.toArray(newSeriesDataEventHandler[size]); returnparamEventHandlers; } @Override protectedWaitStrategygetStrategy(){ returnnewBlockingWaitStrategy(); //returnnewYieldingWaitStrategy(); } @Override publicvoidafterPropertiesSet()throwsException{ this.init(); } }
ValueWrapper.java
publicabstractclassValueWrapper{ privateTvalue; publicValueWrapper(){} publicValueWrapper(Tvalue){ this.value=value; } publicTgetValue(){ returnvalue; } publicvoidsetValue(Tvalue){ this.value=value; } }
DisruptorConfig.java
@Configuration @ComponentScan(value={"com.portal.disruptor"}) //多实例几个消费者 publicclassDisruptorConfig{ /** *smsParamEventHandler1 * *@returnSeriesDataEventHandler */ @Bean publicSeriesDataEventHandlersmsParamEventHandler1(){ returnnewSeriesDataEventHandler(); } /** *smsParamEventHandler2 * *@returnSeriesDataEventHandler */ @Bean publicSeriesDataEventHandlersmsParamEventHandler2(){ returnnewSeriesDataEventHandler(); } /** *smsParamEventHandler3 * *@returnSeriesDataEventHandler */ @Bean publicSeriesDataEventHandlersmsParamEventHandler3(){ returnnewSeriesDataEventHandler(); } /** *smsParamEventHandler4 * *@returnSeriesDataEventHandler */ @Bean publicSeriesDataEventHandlersmsParamEventHandler4(){ returnnewSeriesDataEventHandler(); } /** *smsParamEventHandler5 * *@returnSeriesDataEventHandler */ @Bean publicSeriesDataEventHandlersmsParamEventHandler5(){ returnnewSeriesDataEventHandler(); } }
测试
//注入SeriesDataEventQueueHelper消息生产者 @Autowired privateSeriesDataEventQueueHelperseriesDataEventQueueHelper; @RequestMapping(value="/data",method=RequestMethod.POST,produces=MediaType.APPLICATION_JSON_VALUE) publicDataResponseVoreceiverDeviceData(@RequestBodyStringdeviceData){ longstartTime1=System.currentTimeMillis(); if(StringUtils.isEmpty(deviceData)){ logger.info("receiverdataisempty!"); returnnewDataResponseVo (400,"failed"); } seriesDataEventQueueHelper.publishEvent(newSeriesData(deviceData)); longstartTime2=System.currentTimeMillis(); logger.info("receiverdata==[{}]millisecond==[{}]",deviceData,startTime2-startTime1); returnnewDataResponseVo (200,"success"); }
应用A通过/data接口把数据发送到应用B,然后通过seriesDataEventQueueHelper把消息发给disruptor队列,消费者去消费,整个过程对不会堵塞应用A.可接受消息丢失,可以通过扩展SeriesDataEventQueueHelper来达到对disruptor队列的监控
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。