java利用delayedQueue实现本地的延迟队列
一、了解DelayQueue
DelayQueue是什么?
DelayQueue是一个无界的BlockingQueue,用于放置实现了Delayed接口的对象,其中的对象只能在其到期时才能从队列中取走。这种队列是有序的,即队头对象的延迟到期时间最长。
注意:不能将null元素放置到这种队列中。
DelayQueue能做什么?
在我们的业务中通常会有一些需求是这样的:
- 淘宝订单业务:下单之后如果三十分钟之内没有付款就自动取消订单。
- 饿了吗订餐通知:下单成功后60s之后给用户发送短信通知。
那么这类业务我们可以总结出一个特点:需要延迟工作。
由此的情况,就是我们的DelayQueue应用需求的产生。
二、怎么用DelayQueue来解决这类的问题
先声明一个Delayed的对象
importjava.util.concurrent.Delayed; importjava.util.concurrent.TimeUnit; importjava.util.concurrent.atomic.AtomicLong; /** **[任务调度系统] *
* *@authorwangguangdong *@version1.0 *@Date2015年11月22日19:46:39 */ publicclassTask
*[队列中要执行的任务] *implementsDelayed{ /** *到期时间 */ privatefinallongtime; /** *问题对象 */ privatefinalTtask; privatestaticfinalAtomicLongatomic=newAtomicLong(0); privatefinallongn; publicTask(longtimeout,Tt){ this.time=System.nanoTime()+timeout; this.task=t; this.n=atomic.getAndIncrement(); } /** *返回与此对象相关的剩余延迟时间,以给定的时间单位表示 */ @Override publiclonggetDelay(TimeUnitunit){ returnunit.convert(this.time-System.nanoTime(),TimeUnit.NANOSECONDS); } @Override publicintcompareTo(Delayedother){ //TODOAuto-generatedmethodstub if(other==this)//comparezeroONLYifsameobject return0; if(otherinstanceofTask){ Taskx=(Task)other; longdiff=time-x.time; if(diff<0) return-1; elseif(diff>0) return1; elseif(n 再实现一个管理延迟任务的类
importorg.apache.log4j.Logger; importjava.util.concurrent.DelayQueue; importjava.util.concurrent.Executor; importjava.util.concurrent.Executors; importjava.util.concurrent.TimeUnit; /** **[任务调度系统] *
* *@authorwangguangdong *@version1.0 *@Date2015年11月23日14:19:40 */ publicclassTaskQueueDaemonThread{ privatestaticfinalLoggerLOG=Logger.getLogger(TaskQueueDaemonThread.class); privateTaskQueueDaemonThread(){ } privatestaticclassLazyHolder{ privatestaticTaskQueueDaemonThreadtaskQueueDaemonThread=newTaskQueueDaemonThread(); } publicstaticTaskQueueDaemonThreadgetInstance(){ returnLazyHolder.taskQueueDaemonThread; } Executorexecutor=Executors.newFixedThreadPool(20); /** *守护线程 */ privateThreaddaemonThread; /** *初始化守护线程 */ publicvoidinit(){ daemonThread=newThread(()->execute()); daemonThread.setDaemon(true); daemonThread.setName("TaskQueueDaemonThread"); daemonThread.start(); } privatevoidexecute(){ System.out.println("start:"+System.currentTimeMillis()); while(true){ try{ //从延迟队列中取值,如果没有对象过期则队列一直等待, Taskt1=t.take(); if(t1!=null){ //修改问题的状态 Runnabletask=t1.getTask(); if(task==null){ continue; } executor.execute(task); LOG.info("[attask:"+task+"][Time:"+System.currentTimeMillis()+"]"); } }catch(Exceptione){ e.printStackTrace(); break; } } } /** *创建一个最初为空的新DelayQueue */ privateDelayQueue
*[后台守护线程不断的执行检测工作] *t=newDelayQueue<>(); /** *添加任务, *time延迟时间 *task任务 *用户为问题设置延迟时间 */ publicvoidput(longtime,Runnabletask){ //转换成ns longnanoTime=TimeUnit.NANOSECONDS.convert(time,TimeUnit.MILLISECONDS); //创建一个任务 Taskk=newTask(nanoTime,task); //将任务放在延迟的队列中 t.put(k); } /** *结束订单 *@paramtask */ publicbooleanendTask(Task task){ returnt.remove(task); } } 使用方法
- 在容器初始化的时候调用init方法.
- 实现一个runnable接口的类,调用TaskQueueDaemonThread的put方法传入进去.
- 如果需要实现动态的取消任务的话,需要task任务的类重新hashcode方法,最好用业务限制hashcode的冲突发生.
总结
以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作能带来一定的帮助,如果有疑问大家可以留言交流,谢谢大家对毛票票的支持。