java多线程消息队列的实现代码
本文介绍了java多线程消息队列的实现代码,分享给大家,希望对大家有帮助,顺便也自己留个笔记
1、定义一个队列缓存池:
//static修饰的成员变量和成员方法独立于该类的任何对象。也就是说,它不依赖类特定的实例,被类的所有实例共享。 privatestaticListqueueCache=newLinkedList ();
2、定义队列缓冲池最大消息数,如果达到该值,那么队列检入将等待检出低于该值时继续进行。
privateIntegerofferMaxQueue=2000;
3、定义检出线程,如果队列缓冲池没有消息,那么检出线程会线程等待中
newThread(){ publicvoidrun(){ while(true){ Stringip=null; try{ synchronized(queueCache){ Integersize=queueCache.size(); if(size==0){ //队列缓存池没有消息,等待。。。。queueCache.wait(); } Queuequeue=queueCache.remove(0); if(isIpLock(queueStr)){//假若这个是一个多应用的分布式系统,那么这个判断应该是分布式锁,这里说的锁不是线程停止,而是跳过该消息,滞后处理 queueCache.add(queue);该queue重新加入队列缓冲池,滞后处理, continue; }else{ ;//这里是处理该消息的操作。 } size=queueCache.size(); if(size=0){queueCache.notifyAll();//在队列缓存池不超过最大值的前提下,假若检入正在等待中,那么那么让他们排队检入。 } } }catch(Exceptione){ e.printStackTrace(); }finally{ try{//检出该消息队列的锁 unIpLock(queueStr); }catch(Execptione){//捕获异常,不能让线程挂掉 e.printStackTrace(); } } } }.start();
4、检入队列
synchronized(queueCache){ while(true){ Integersize=queueCache.size(); if(size>=offerMaxQueue){ try{ queueCache.wait(); continue;//继续执行等待中的检入任务。 }catch(InterruptedExceptione){ e.printStackTrace(); } }//IF if(size<=offerMaxQueue&&size>0){ queueCache.notifyAll(); } break;//检入完毕 }//while }
5、锁方法实现
/** *锁 *@paramip *@return *@throws */ publicBooleanisLock(StringqueueStr){ returnthis.redisManager.setnx(queueStr+"_lock","LOCK",10000)!=1; } //解锁 publicvoidunIpLock(StringqueueStr){ if(ip!=null){ this.redisManager.del(queueStr+"_lock"); //lock.unlock(); } }
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。
声明:本文内容来源于网络,版权归原作者所有,内容由互联网用户自发贡献自行上传,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任。如果您发现有涉嫌版权的内容,欢迎发送邮件至:czq8825#qq.com(发邮件时,请将#更换为@)进行举报,并提供相关证据,一经查实,本站将立刻删除涉嫌侵权内容。