Java并发编程Semaphore计数信号量详解
Semaphore是一个计数信号量,它的本质是一个共享锁。信号量维护了一个信号量许可集。线程可以通过调用acquire()来获取信号量的许可;当信号量中有可用的许可时,线程能获取该许可;否则线程必须等待,直到有可用的许可为止。线程可以通过release()来释放它所持有的信号量许可(用完信号量之后必须释放,不然其他线程可能会无法获取信号量)。
简单示例:
packageme.socketthread; importjava.util.concurrent.ExecutorService; importjava.util.concurrent.Executors; importjava.util.concurrent.Semaphore; publicclassSemaphoreLearn{ //信号量总数 privatestaticfinalintSEM_MAX=12; publicstaticvoidmain(String[]args){ Semaphoresem=newSemaphore(SEM_MAX); //创建线程池 ExecutorServicethreadPool=Executors.newFixedThreadPool(3); //在线程池中执行任务 threadPool.execute(newMyThread(sem,7)); threadPool.execute(newMyThread(sem,4)); threadPool.execute(newMyThread(sem,2)); //关闭池 threadPool.shutdown(); } } classMyThreadextendsThread{ privatevolatileSemaphoresem;//信号量 privateintcount;//申请信号量的大小 MyThread(Semaphoresem,intcount){ this.sem=sem; this.count=count; } publicvoidrun(){ try{ //从信号量中获取count个许可 sem.acquire(count); Thread.sleep(2000); System.out.println(Thread.currentThread().getName()+"acquirecount="+count); }catch(InterruptedExceptione){ e.printStackTrace(); }finally{ //释放给定数目的许可,将其返回到信号量。 sem.release(count); System.out.println(Thread.currentThread().getName()+"release"+count+""); } } }
执行结果:
pool-1-thread-2acquirecount=4 pool-1-thread-1acquirecount=7 pool-1-thread-1release7 pool-1-thread-2release4 pool-1-thread-3acquirecount=2 pool-1-thread-3release2
线程1和线程2会并发执行,因为两者的信号量和没有超过总信号量,当前两个线程释放掉信号量之后线程3才能继续执行。
源码分析:
1、构造函数
在构造函数中会初始化信号量值,这值最终是作为锁标志位state的值
Semaphoresem=newSemaphore(12);//简单来说就是给锁标识位state赋值为12
2、Semaphore.acquire(n);简单理解为获取锁资源,如果获取不到线程阻塞
Semaphore.acquire(n);//从锁标识位state中获取n个信号量,简单来说是state=state-n此时state大于0表示可以获取信号量,如果小于0则将线程阻塞
publicvoidacquire(intpermits)throwsInterruptedException{ if(permits<0)thrownewIllegalArgumentException(); //获取锁 sync.acquireSharedInterruptibly(permits); }
acquireSharedInterruptibly中的操作是获取锁资源,如果可以获取则将state=state-permits,否则将线程阻塞
publicfinalvoidacquireSharedInterruptibly(intarg) throwsInterruptedException{ if(Thread.interrupted()) thrownewInterruptedException(); if(tryAcquireShared(arg)<0)//tryAcquireShared中尝试获取锁资源 doAcquireSharedInterruptibly(arg);//将线程阻塞 }
tryAcquireShared中的操作是尝试获取信号量值,简单来说就是state=state-acquires,如果此时小于0则返回负值,否则返回大于新值,再判断是否将当线程线程阻塞
protectedinttryAcquireShared(intacquires){ for(;;){ if(hasQueuedPredecessors()) return-1; //获取state值 intavailable=getState(); //从state中获取信号量 intremaining=available-acquires; if(remaining<0|| compareAndSetState(available,remaining)) //如果信号量小于0则直接返回,表示无法获取信号量,否则将state值修改为新值 returnremaining; } }
doAcquireSharedInterruptibly中的操作简单来说是将当前线程添加到FIFO队列中并将当前线程阻塞。
/会将线程添加到FIFO队列中,并阻塞 privatevoiddoAcquireSharedInterruptibly(intarg) throwsInterruptedException{ //将线程添加到FIFO队列中 finalNodenode=addWaiter(Node.SHARED); booleanfailed=true; try{ for(;;){ finalNodep=node.predecessor(); if(p==head){ intr=tryAcquireShared(arg); if(r>=0){ setHeadAndPropagate(node,r); p.next=null;//helpGC failed=false; return; } } //parkAndCheckInterrupt完成线程的阻塞操作 if(shouldParkAfterFailedAcquire(p,node)&& parkAndCheckInterrupt()) thrownewInterruptedException(); } }finally{ if(failed) cancelAcquire(node); } }
3、Semaphore.release(intpermits),这个函数的实现操作是将state=state+permits并唤起处于FIFO队列中的阻塞线程。
publicvoidrelease(intpermits){ if(permits<0)thrownewIllegalArgumentException(); //state=state+permits,并将FIFO队列中的阻塞线程唤起 sync.releaseShared(permits); }
releaseShared中的操作是将state=state+permits,并将FIFO队列中的阻塞线程唤起。
publicfinalbooleanreleaseShared(intarg){ //tryReleaseShared将state设置为state=state+arg if(tryReleaseShared(arg)){ //唤起FIFO队列中的阻塞线程 doReleaseShared(); returntrue; } returnfalse; }
tryReleaseShared将state设置为state=state+arg
protectedfinalbooleantryReleaseShared(intreleases){ for(;;){ intcurrent=getState(); intnext=current+releases; if(nextdoReleaseShared()唤起FIFO队列中的阻塞线程
privatevoiddoReleaseShared(){ for(;;){ Nodeh=head; if(h!=null&&h!=tail){ intws=h.waitStatus; if(ws==Node.SIGNAL){ if(!compareAndSetWaitStatus(h,Node.SIGNAL,0)) continue;//looptorecheckcases //完成阻塞线程的唤起操作 unparkSuccessor(h); } elseif(ws==0&& !compareAndSetWaitStatus(h,0,Node.PROPAGATE)) continue;//looponfailedCAS } if(h==head)//loopifheadchanged break; } }总结:Semaphore简单来说设置了一个信号量池state,当线程执行时会从state中获取值,如果可以获取则线程执行,并且在执行后将获取的资源返回到信号量池中,并唤起其他阻塞线程;如果信号量池中的资源无法满足某个线程的需求则将此线程阻塞。
Semaphore源码:
publicclassSemaphoreimplementsjava.io.Serializable{ privatestaticfinallongserialVersionUID=-3222578661600680210L; privatefinalSyncsync; abstractstaticclassSyncextendsAbstractQueuedSynchronizer{ privatestaticfinallongserialVersionUID=1192457210091910933L; //设置锁标识位state的初始值 Sync(intpermits){ setState(permits); } //获取锁标识位state的值,如果state值大于其需要的值则表示锁可以获取 finalintgetPermits(){ returngetState(); } //获取state值减去acquires后的值,如果大于等于0则表示锁可以获取 finalintnonfairTryAcquireShared(intacquires){ for(;;){ intavailable=getState(); intremaining=available-acquires; if(remaining<0|| compareAndSetState(available,remaining)) returnremaining; } } //释放锁 protectedfinalbooleantryReleaseShared(intreleases){ for(;;){ intcurrent=getState(); //将state值加上release值 intnext=current+releases; if(nextcurrent)//underflow thrownewError("Permitcountunderflow"); if(compareAndSetState(current,next)) return; } } finalintdrainPermits(){ for(;;){ intcurrent=getState(); if(current==0||compareAndSetState(current,0)) returncurrent; } } } //非公平锁 staticfinalclassNonfairSyncextendsSync{ privatestaticfinallongserialVersionUID=-2694183684443567898L; NonfairSync(intpermits){ super(permits); } protectedinttryAcquireShared(intacquires){ returnnonfairTryAcquireShared(acquires); } } //公平锁 staticfinalclassFairSyncextendsSync{ privatestaticfinallongserialVersionUID=2014338818796000944L; FairSync(intpermits){ super(permits); } protectedinttryAcquireShared(intacquires){ for(;;){ if(hasQueuedPredecessors()) return-1; intavailable=getState(); intremaining=available-acquires; if(remaining<0|| compareAndSetState(available,remaining)) returnremaining; } } } //设置信号量 publicSemaphore(intpermits){ sync=newNonfairSync(permits); } publicSemaphore(intpermits,booleanfair){ sync=fair?newFairSync(permits):newNonfairSync(permits); } //获取锁 publicvoidacquire()throwsInterruptedException{ sync.acquireSharedInterruptibly(1); } publicvoidacquireUninterruptibly(){ sync.acquireShared(1); } publicbooleantryAcquire(){ returnsync.nonfairTryAcquireShared(1)>=0; } publicbooleantryAcquire(longtimeout,TimeUnitunit) throwsInterruptedException{ returnsync.tryAcquireSharedNanos(1,unit.toNanos(timeout)); } publicvoidrelease(){ sync.releaseShared(1); } //获取permits值锁 publicvoidacquire(intpermits)throwsInterruptedException{ if(permits<0)thrownewIllegalArgumentException(); sync.acquireSharedInterruptibly(permits); } publicvoidacquireUninterruptibly(intpermits){ if(permits<0)thrownewIllegalArgumentException(); sync.acquireShared(permits); } publicbooleantryAcquire(intpermits){ if(permits<0)thrownewIllegalArgumentException(); returnsync.nonfairTryAcquireShared(permits)>=0; } publicbooleantryAcquire(intpermits,longtimeout,TimeUnitunit) throwsInterruptedException{ if(permits<0)thrownewIllegalArgumentException(); returnsync.tryAcquireSharedNanos(permits,unit.toNanos(timeout)); } //释放 publicvoidrelease(intpermits){ if(permits<0)thrownewIllegalArgumentException(); sync.releaseShared(permits); } publicintavailablePermits(){ returnsync.getPermits(); } publicintdrainPermits(){ returnsync.drainPermits(); } protectedvoidreducePermits(intreduction){ if(reduction<0)thrownewIllegalArgumentException(); sync.reducePermits(reduction); } publicbooleanisFair(){ returnsyncinstanceofFairSync; } publicfinalbooleanhasQueuedThreads(){ returnsync.hasQueuedThreads(); } publicfinalintgetQueueLength(){ returnsync.getQueueLength(); } protectedCollection getQueuedThreads(){ returnsync.getQueuedThreads(); } publicStringtoString(){ returnsuper.toString()+"[Permits="+sync.getPermits()+"]"; } }
总结 以上就是本文关于Java并发编程Semaphore计数信号量详解的全部内容,希望对大家有所帮助。感兴趣的朋友可以继续参阅本站:Java并发编程之重入锁与读写锁、Java系统的高并发解决方法详解、java高并发锁的3种实现示例代码等,有什么问题,可以留言交流讨论。感谢朋友们对本站的支持!