java 中ThreadPoolExecutor原理分析
java中ThreadPoolExecutor原理分析
线程池简介
Java线程池是开发中常用的工具,当我们有异步、并行的任务要处理时,经常会用到线程池,或者在实现一个服务器时,也需要使用线程池来接收连接处理请求。
线程池使用
JDK中提供的线程池实现位于java.util.concurrent.ThreadPoolExecutor。在使用时,通常使用ExecutorService接口,它提供了submit,invokeAll,shutdown等通用的方法。
在线程池配置方面,Executors类中提供了一些静态方法能够提供一些常用场景的线程池,如newFixedThreadPool,newCachedThreadPool,newSingleThreadExecutor等,这些方法最终都是调用到了ThreadPoolExecutor的构造函数。
ThreadPoolExecutor的包含所有参数的构造函数是
/** *@paramcorePoolSizethenumberofthreadstokeepinthepool,even *iftheyareidle,unless{@codeallowCoreThreadTimeOut}isset *@parammaximumPoolSizethemaximumnumberofthreadstoallowinthe *pool *@paramkeepAliveTimewhenthenumberofthreadsisgreaterthan *thecore,thisisthemaximumtimethatexcessidlethreads *willwaitfornewtasksbeforeterminating. *@paramunitthetimeunitforthe{@codekeepAliveTime}argument *@paramworkQueuethequeuetouseforholdingtasksbeforetheyare *executed.Thisqueuewillholdonlythe{@codeRunnable} *taskssubmittedbythe{@codeexecute}method. *@paramthreadFactorythefactorytousewhentheexecutor *createsanewthread *@paramhandlerthehandlertousewhenexecutionisblocked *becausethethreadboundsandqueuecapacitiesarereached publicThreadPoolExecutor(intcorePoolSize, intmaximumPoolSize, longkeepAliveTime, TimeUnitunit, BlockingQueueworkQueue, ThreadFactorythreadFactory, RejectedExecutionHandlerhandler){ if(corePoolSize<0|| maximumPoolSize<=0|| maximumPoolSize
- corePoolSize设置线程池的核心线程数,当添加新任务时,如果线程池中的线程数小于corePoolSize,则不管当前是否有线程闲置,都会创建一个新的线程来执行任务。
- maximunPoolSize是线程池中允许的最大的线程数
- workQueue用于存放排队的任务
- keepAliveTime是大于corePoolSize的线程闲置的超时时间
- handler用于在任务逸出、线程池关闭时的任务处理,线程池的线程增长策略为,当前线程数小于corePoolSize时,新增线程,当线程数=corePoolSize且corePoolSize时,只有在workQueue不能存放新的任务时创建新线程,超出的线程在闲置keepAliveTime后销毁。
实现(基于JDK1.8)
ThreadPoolExecutor中保存的状态有
当前线程池状态,包括RUNNING,SHUTDOWN,STOP,TIDYING,TERMINATED。
当前有效的运行线程的数量。
将这两个状态放到一个int变量中,前三位作为线程池状态,后29位作为线程数量。
例如0b11100000000000000000000000000001,表示RUNNING,一个线程。
通过HashSet来存储工作者集合,访问该HashSet前必须先获取保护状态的mainLock:ReentrantLock
submit、execute
execute的执行方式为,首先检查当前worker数量,如果小于corePoolSize,则尝试add一个coreWorker。线程池在维护线程数量以及状态检查上做了大量检测。
publicvoidexecute(Runnablecommand){ intc=ctl.get(); //如果当期数量小于corePoolSize if(workerCountOf(c)addWorker方法实现
privatebooleanaddWorker(RunnablefirstTask,booleancore){ retry: for(;;){ intc=ctl.get(); intrs=runStateOf(c); //Checkifqueueemptyonlyifnecessary. if(rs>=SHUTDOWN&& !(rs==SHUTDOWN&& firstTask==null&& !workQueue.isEmpty())) returnfalse; for(;;){ intwc=workerCountOf(c); if(wc>=CAPACITY|| wc>=(core?corePoolSize:maximumPoolSize)) returnfalse; if(compareAndIncrementWorkerCount(c)) breakretry; c=ctl.get();//Re-readctl if(runStateOf(c)!=rs) continueretry; //elseCASfailedduetoworkerCountchange;retryinnerloop } } booleanworkerStarted=false; booleanworkerAdded=false; Workerw=null; try{ w=newWorker(firstTask); finalThreadt=w.thread; if(t!=null){ finalReentrantLockmainLock=this.mainLock; mainLock.lock(); try{ //Recheckwhileholdinglock. //BackoutonThreadFactoryfailureorif //shutdownbeforelockacquired. intrs=runStateOf(ctl.get()); if(rslargestPoolSize) largestPoolSize=s; workerAdded=true; } }finally{ mainLock.unlock(); } if(workerAdded){ //如果添加成功,则启动该线程,执行Worker的run方法,Worker的run方法执行外部的runWorker(Worker) t.start(); workerStarted=true; } } }finally{ if(!workerStarted) addWorkerFailed(w); } returnworkerStarted; } Worker类继承了AbstractQueuedSynchronizer获得了同步等待这样的功能。
privatefinalclassWorker extendsAbstractQueuedSynchronizer implementsRunnable { /** *Thisclasswillneverbeserialized,butweprovidea *serialVersionUIDtosuppressajavacwarning. */ privatestaticfinallongserialVersionUID=6138294804551838833L; /**Threadthisworkerisrunningin.Nulliffactoryfails.*/ finalThreadthread; /**Initialtasktorun.Possiblynull.*/ RunnablefirstTask; /**Per-threadtaskcounter*/ volatilelongcompletedTasks; /** *CreateswithgivenfirsttaskandthreadfromThreadFactory. *@paramfirstTaskthefirsttask(nullifnone) */ Worker(RunnablefirstTask){ setState(-1);//inhibitinterruptsuntilrunWorker this.firstTask=firstTask; this.thread=getThreadFactory().newThread(this); } /**DelegatesmainrunlooptoouterrunWorker*/ publicvoidrun(){ runWorker(this); } //Lockmethods // //Thevalue0representstheunlockedstate. //Thevalue1representsthelockedstate. protectedbooleanisHeldExclusively(){ returngetState()!=0; } protectedbooleantryAcquire(intunused){ if(compareAndSetState(0,1)){ setExclusiveOwnerThread(Thread.currentThread()); returntrue; } returnfalse; } protectedbooleantryRelease(intunused){ setExclusiveOwnerThread(null); setState(0); returntrue; } publicvoidlock(){acquire(1);} publicbooleantryLock(){returntryAcquire(1);} publicvoidunlock(){release(1);} publicbooleanisLocked(){returnisHeldExclusively();} voidinterruptIfStarted(){ Threadt; if(getState()>=0&&(t=thread)!=null&&!t.isInterrupted()){ try{ t.interrupt(); }catch(SecurityExceptionignore){ } } }runWorker(Worker)是Worker的轮询执行逻辑,不断地从工作队列中获取任务并执行它们。Worker每次执行任务前需要进行lock,防止在执行任务时被interrupt。
finalvoidrunWorker(Workerw){ Threadwt=Thread.currentThread(); Runnabletask=w.firstTask; w.firstTask=null; w.unlock();//allowinterrupts booleancompletedAbruptly=true; try{ while(task!=null||(task=getTask())!=null){ w.lock(); //Ifpoolisstopping,ensurethreadisinterrupted; //ifnot,ensurethreadisnotinterrupted.This //requiresarecheckinsecondcasetodealwith //shutdownNowracewhileclearinginterrupt if((runStateAtLeast(ctl.get(),STOP)|| (Thread.interrupted()&& runStateAtLeast(ctl.get(),STOP)))&& !wt.isInterrupted()) wt.interrupt(); try{ beforeExecute(wt,task); Throwablethrown=null; try{ task.run(); }catch(RuntimeExceptionx){ thrown=x;throwx; }catch(Errorx){ thrown=x;throwx; }catch(Throwablex){ thrown=x;thrownewError(x); }finally{ afterExecute(task,thrown); } }finally{ task=null; w.completedTasks++; w.unlock(); } } completedAbruptly=false; }finally{ processWorkerExit(w,completedAbruptly); } }ThreadPoolExecutor的submit方法中将Callable包装成FutureTask后交给execute方法。
FutureTask
FutureTask继承于Runnable和Future,FutureTask定义的几个状态为
NEW,尚未执行
COMPLETING,正在执行
NORMAL,正常执行完成得到结果
EXCEPTIONAL,执行抛出异常
CANCELLED,执行被取消
INTERRUPTING,执行正在被中断
INTERRUPTED,已经中断。
其中关键的get方法
publicVget()throwsInterruptedException,ExecutionException{ ints=state; if(s<=COMPLETING) s=awaitDone(false,0L); returnreport(s); }先获取当前状态,如果还未执行完成并且正常,则进入等待结果流程。在awaitDone不断循环获取当前状态,如果没有结果,则将自己通过CAS的方式添加到等待链表的头部,如果设置了超时,则LockSupport.parkNanos到指定的时间。
staticfinalclassWaitNode{ volatileThreadthread; volatileWaitNodenext; WaitNode(){thread=Thread.currentThread();} } privateintawaitDone(booleantimed,longnanos) throwsInterruptedException{ finallongdeadline=timed?System.nanoTime()+nanos:0L; WaitNodeq=null; booleanqueued=false; for(;;){ if(Thread.interrupted()){ removeWaiter(q); thrownewInterruptedException(); } ints=state; if(s>COMPLETING){ if(q!=null) q.thread=null; returns; } elseif(s==COMPLETING)//cannottimeoutyet Thread.yield(); elseif(q==null) q=newWaitNode(); elseif(!queued) queued=UNSAFE.compareAndSwapObject(this,waitersOffset, q.next=waiters,q); elseif(timed){ nanos=deadline-System.nanoTime(); if(nanos<=0L){ removeWaiter(q); returnstate; } LockSupport.parkNanos(this,nanos); } else LockSupport.park(this); } }FutureTask的run方法是执行任务并设置结果的位置,首先判断当前状态是否为NEW并且将当前线程设置为执行线程,然后调用Callable的call获取结果后设置结果修改FutureTask状态。
publicvoidrun(){ if(state!=NEW|| !UNSAFE.compareAndSwapObject(this,runnerOffset, null,Thread.currentThread())) return; try{ Callablec=callable; if(c!=null&&state==NEW){ Vresult; booleanran; try{ result=c.call(); ran=true; }catch(Throwableex){ result=null; ran=false; setException(ex); } if(ran) set(result); } }finally{ //runnermustbenon-nulluntilstateissettledto //preventconcurrentcallstorun() runner=null; //statemustbere-readafternullingrunnertoprevent //leakedinterrupts ints=state; if(s>=INTERRUPTING) handlePossibleCancellationInterrupt(s); } } 感谢阅读,希望能帮助到大家,谢谢大家对本站的支持!