java并发问题概述
1什么是并发问题。
多个进程或线程同时(或着说在同一段时间内)访问同一资源会产生并发问题。
银行两操作员同时操作同一账户就是典型的例子。比如A、B操作员同时读取一余额为1000元的账户,A操作员为该账户增加100元,B操作员同时为该账户减去50元,A先提交,B后提交。最后实际账户余额为1000-50=950元,但本该为1000+100-50=1050。这就是典型的并发问题。如何解决?可以用锁。
2java中synchronized的用法
用法1
publicclassTest{ publicsynchronizedvoidprint(){ ….; } }
某线程执行print()方法,则该对象将加锁。其它线程将无法执行该对象的所有synchronized块。
用法2
publicclassTest{ publicvoidprint(){ synchronized(this){ //锁住本对象 …; } } }
同用法1,但更能体现synchronized用法的本质。
用法3
publicclassTest{ privateStringa=“test”; publicvoidprint(){ synchronized(a){ //锁住a对象 …; } } publicsynchronizedvoidt(){ …; //这个同步代码块不会因为print()而锁定. } }
执行print(),会给对象a加锁,注意不是给Test的对象加锁,也就是说Test对象的其它synchronized方法不会因为print()而被锁。同步代码块执行完,则释放对a的锁。
为了锁住一个对象的代码块而不影响该对象其它synchronized块的高性能写法:
publicclassTest{ privatebyte[]lock=newbyte[0]; publicvoidprint(){ synchronized(lock){ …; } } publicsynchronizedvoidt(){ …; } }
静态方法的锁
publicclassTest{ publicsynchronizedstaticvoidexecute(){ …; } }
效果同
publicclassTest{ publicstaticvoidexecute(){ synchronized(TestThread.class){ …; } } }
3Java中的锁与排队上厕所。
锁就是阻止其它进程或线程进行资源访问的一种方式,即锁住的资源不能被其它请求访问。在JAVA中,sychronized关键字用来对一个对象加锁。比如:
publicclassMyStack{ intidx=0; char[]data=newchar[6]; publicsynchronizedvoidpush(charc){ data[idx]=c; idx++; } publicsynchronizedcharpop(){ idx--; returndata[idx]; } publicstaticvoidmain(Stringargs[]){ MyStackm=newMyStack(); /** 下面对象m被加锁。严格的说是对象m的所有synchronized块被加锁。 如果存在另一个试图访问m的线程T,那么T无法执行m对象的push和 pop方法。 */ m.pop(); //对象m被加锁。 } }
Java的加锁解锁跟多个人排队等一个公共厕位完全一样。第一个人进去后顺手把门从里面锁住,其它人只好排队等。第一个人结束后出来时,门才会打开(解锁)。轮到第二个人进去,同样他又会把门从里面锁住,其它人继续排队等待。
用厕所理论可以很容易明白:一个人进了一个厕位,这个厕位就会锁住,但不会导致另一个厕位也被锁住,因为一个人不能同时蹲在两个厕位里。对于Java就是说:Java中的锁是针对同一个对象的,不是针对class的。看下例:
MyStatckm1=newMyStack(); MyStatckm2=newMystatck(); m1.pop(); m2.pop();
m1对象的锁是不会影响m2的锁的,因为它们不是同一个厕位。就是说,假设有3线程t1,t2,t3操作m1,那么这3个线程只可能在m1上排队等,假设另2个线程t8,t9在操作m2,那么t8,t9只会在m2上等待。而t2和t8则没有关系,即使m2上的锁释放了,t1,t2,t3可能仍要在m1上排队。原因无它,不是同一个厕位耳。
Java不能同时对一个代码块加两个锁,这和数据库锁机制不同,数据库可以对一条记录同时加好几种不同的锁。
4何时释放锁?
一般是执行完毕同步代码块(锁住的代码块)后就释放锁,也可以用wait()方式半路上释放锁。wait()方式就好比蹲厕所到一半,突然发现下水道堵住了,不得已必须出来站在一边,好让修下水道师傅(准备执行notify的一个线程)进去疏通马桶,疏通完毕,师傅大喊一声:“已经修好了”(notify),刚才出来的同志听到后就重新排队。注意啊,必须等师傅出来啊,师傅不出来,谁也进不去。也就是说notify后,不是其它线程马上可以进入封锁区域活动了,而是必须还要等notify代码所在的封锁区域执行完毕从而释放锁以后,其它线程才可进入。
这里是wait与notify代码示例:
publicsynchronizedcharpop(){ charc; while(buffer.size()==0){ try{ this.wait(); //从厕位里出来 } catch(InterruptedExceptione){ //ignoreit… } } c=((Character)buffer.remove(buffer.size()-1)). charValue(); returnc; } publicsynchronizedvoidpush(charc){ this.notify(); //通知那些wait()的线程重新排队。注意:仅仅是通知它们重新排队。 CharactercharObj=newCharacter(c); buffer.addElement(charObj); } //执行完毕,释放锁。那些排队的线程就可以进来了。
再深入一些。
由于wait()操作而半路出来的同志没收到notify信号前是不会再排队的,他会在旁边看着这些排队的人(其中修水管师傅也在其中)。注意,修水管的师傅不能插队,也得跟那些上厕所的人一样排队,不是说一个人蹲了一半出来后,修水管师傅就可以突然冒出来然后立刻进去抢修了,他要和原来排队的那帮人公平竞争,因为他也是个普通线程。如果修水管师傅排在后面,则前面的人进去后,发现堵了,就wait,然后出来站到一边,再进去一个,再wait,出来,站到一边,只到师傅进去执行notify.这样,一会儿功夫,排队的旁边就站了一堆人,等着notify.
终于,师傅进去,然后notify了,接下来呢?
1.有一个wait的人(线程)被通知到。
2.为什么被通知到的是他而不是另外一个wait的人?取决于JVM.我们无法预先
判断出哪一个会被通知到。也就是说,优先级高的不一定被优先唤醒,等待
时间长的也不一定被优先唤醒,一切不可预知!(当然,如果你了解该JVM的
实现,则可以预知)。
3.他(被通知到的线程)要重新排队。
4.他会排在队伍的第一个位置吗?回答是:不一定。他会排最后吗?也不一定。
但如果该线程优先级设的比较高,那么他排在前面的概率就比较大。
5.轮到他重新进入厕位时,他会从上次wait()的地方接着执行,不会重新执行。
恶心点说就是,他会接着拉巴巴,不会重新拉。
6.如果师傅notifyAll().则那一堆半途而废出来的人全部重新排队。顺序不可知。
JavaDOC上说,Theawakenedthreadswillnotbeabletoproceeduntilthecurrentthreadrelinquishesthelockonthisobject(当前线程释放锁前,唤醒的线程不能去执行)。
这用厕位理论解释就是显而易见的事。
5Lock的使用
用synchronized关键字可以对资源加锁。用Lock关键字也可以。它是JDK1.5中新增内容。用法如下:
classBoundedBuffer{ finalLocklock=newReentrantLock(); finalConditionnotFull=lock.newCondition(); finalConditionnotEmpty=lock.newCondition(); finalObject[]items=newObject[100]; intputptr,takeptr,count; publicvoidput(Objectx)throwsInterruptedException{ lock.lock(); try{ while(count==items.length) notFull.await(); items[putptr]=x; if(++putptr==items.length)putptr=0; ++count; notEmpty.signal(); } finally{ lock.unlock(); } } publicObjecttake()throwsInterruptedException{ lock.lock(); try{ while(count==0) notEmpty.await(); Objectx=items[takeptr]; if(++takeptr==items.length)takeptr=0; --count; notFull.signal(); returnx; } finally{ lock.unlock(); } } }
(注:这是JavaDoc里的例子,是一个阻塞队列的实现例子。所谓阻塞队列,就是一个队列如果满了或者空了,都会导致线程阻塞等待。Java里的ArrayBlockingQueue提供了现成的阻塞队列,不需要自己专门再写一个了。)
一个对象的lock.lock()和lock.unlock()之间的代码将会被锁住。这种方式比起synchronize好在什么地方?简而言之,就是对wait的线程进行了分类。用厕位理论来描述,则是那些蹲了一半而从厕位里出来等待的人原因可能不一样,有的是因为马桶堵了,有的是因为马桶没水了。通知(notify)的时候,就可以喊:因为马桶堵了而等待的过来重新排队(比如马桶堵塞问题被解决了),或者喊,因为马桶没水而等待的过来重新排队(比如马桶没水问题被解决了)。这样可以控制得更精细一些。不像synchronize里的wait和notify,不管是马桶堵塞还是马桶没水都只能喊:刚才等待的过来排队!假如排队的人进来一看,发现原来只是马桶堵塞问题解决了,而自己渴望解决的问题(马桶没水)还没解决,只好再回去等待(wait),白进来转一圈,浪费时间与资源。
Lock方式与synchronized对应关系:
LockawaitsignalsignalAll
synchronizedwaitnotifynotifyAll
注意:不要在Lock方式锁住的块里调用wait、notify、notifyAll
6利用管道进行线程间通信
原理简单。两个线程,一个操作PipedInputStream,一个操作PipedOutputStream。PipedOutputStream写入的数据先缓存在Buffer中,如果Buffer满,此线程wait。PipedInputStream读出Buffer中的数据,如果Buffer没数据,此线程wait。
jdk1.5中的阻塞队列可实现同样功能。
packageio; importjava.io.*; publicclassPipedStreamTest{ publicstaticvoidmain(String[]args){ PipedOutputStreamops=newPipedOutputStream(); PipedInputStreampis=newPipedInputStream(); try{ ops.connect(pis); //实现管道连接 newProducer(ops).run(); newConsumer(pis).run(); } catch(Exceptione){ e.printStackTrace(); } } } //生产者 classProducerimplementsRunnable{ privatePipedOutputStreamops; publicProducer(PipedOutputStreamops) { this.ops=ops; } publicvoidrun() { try{ ops.write("hell,spell".getBytes()); ops.close(); } catch(Exceptione) { e.printStackTrace(); } } } //消费者 classConsumerimplementsRunnable{ privatePipedInputStreampis; publicConsumer(PipedInputStreampis) { this.pis=pis; } publicvoidrun() { try{ byte[]bu=newbyte[100]; intlen=pis.read(bu); System.out.println(newString(bu,0,len)); pis.close(); } catch(Exceptione) { e.printStackTrace(); } } }
例2对上面的程序做少许改动就成了两个线程。
packageio; importjava.io.*; publicclassPipedStreamTest{ publicstaticvoidmain(String[]args){ PipedOutputStreamops=newPipedOutputStream(); PipedInputStreampis=newPipedInputStream(); try{ ops.connect(pis); //实现管道连接 Producerp=newProducer(ops); newThread(p).start(); Consumerc=newConsumer(pis); newThread(c).start(); } catch(Exceptione){ e.printStackTrace(); } } } //生产者 classProducerimplementsRunnable{ privatePipedOutputStreamops; publicProducer(PipedOutputStreamops) { this.ops=ops; } publicvoidrun() { try{ for(;;){ ops.write("hell,spell".getBytes()); ops.close(); } } catch(Exceptione) { e.printStackTrace(); } } } //消费者 classConsumerimplementsRunnable{ privatePipedInputStreampis; publicConsumer(PipedInputStreampis) { this.pis=pis; } publicvoidrun() { try{ for(;;){ byte[]bu=newbyte[100]; intlen=pis.read(bu); System.out.println(newString(bu,0,len)); } pis.close(); } catch(Exceptione) { e.printStackTrace(); } } }
例3.这个例子更加贴进应用
importjava.io.*; publicclassPipedIO{ //程序运行后将sendFile文件的内容拷贝到receiverFile文件中 publicstaticvoidmain(Stringargs[]){ try{ //构造读写的管道流对象 PipedInputStreampis=newPipedInputStream(); PipedOutputStreampos=newPipedOutputStream(); //实现关联 pos.connect(pis); //构造两个线程,并且启动。 newSender(pos,”c:\text2.txt”).start(); newReceiver(pis,”c:\text3.txt”).start(); } catch(IOExceptione){ System.out.println(“PipeError”+e); } } } //线程发送 classSenderextendsThread{ PipedOutputStreampos; Filefile; //构造方法 Sender(PipedOutputStreampos,StringfileName){ this.pos=pos; file=newFile(fileName); } //线程运行方法 publicvoidrun(){ try{ //读文件内容 FileInputStreamfs=newFileInputStream(file); intdata; while((data=fs.read())!=-1){ //写入管道始端 pos.write(data); } pos.close(); } catch(IOExceptione){ System.out.println(“SenderError”+e); } } } //线程读 classReceiverextendsThread{ PipedInputStreampis; Filefile; //构造方法 Receiver(PipedInputStreampis,StringfileName){ this.pis=pis; file=newFile(fileName); } //线程运行 publicvoidrun(){ try{ //写文件流对象 FileOutputStreamfs=newFileOutputStream(file); intdata; //从管道末端读 while((data=pis.read())!=-1){ //写入本地文件 fs.write(data); } pis.close(); } catch(IOExceptione){ System.out.println("ReceiverError"+e); } } }
7阻塞队列
阻塞队列可以代替管道流方式来实现进水管/排水管模式(生产者/消费者).JDK1.5提供了几个现成的阻塞队列.现在来看ArrayBlockingQueue的代码如下:
这里是一个阻塞队列
BlockingQueueblockingQ=newArrayBlockingQueue10;
一个线程从队列里取
for(;;){ Objecto=blockingQ.take();//队列为空,则等待(阻塞) }
另一个线程往队列存
for(;;){ blockingQ.put(newObject());//队列满,则等待(阻塞) }
可见,阻塞队列使用起来比管道简单。
8使用Executors、Executor、ExecutorService、ThreadPoolExecutor
可以使用线程管理任务。还可以使用jdk1.5提供的一组类来更方便的管理任务。从这些类里我们可以体会一种面向任务的思维方式。这些类是:
Executor接口。使用方法:
Executorexecutor=anExecutor;//生成一个Executor实例。 executor.execute(newRunnableTask1());
用意:使用者只关注任务执行,不用操心去关注任务的创建、以及执行细节等这些第三方实现者关心的问题。也就是说,把任务的调用执行和任务的实现解耦。
实际上,JDK1.5中已经有该接口出色的实现。够用了。
Executors是一个如同Collections一样的工厂类或工具类,用来产生各种不同接口的实例。
ExecutorService接口它继承自Executor.Executor只管把任务扔进executor()里去执行,剩余的事就不管了。而ExecutorService则不同,它会多做点控制工作。比如:
classNetworkService{ privatefinalServerSocketserverSocket; privatefinalExecutorServicepool; publicNetworkService(intport,intpoolSize)throwsIOException{ serverSocket=newServerSocket(port); pool=Executors.newFixedThreadPool(poolSize); } publicvoidserve(){ try{ for(;;){ pool.execute(newHandler(serverSocket.accept())); } } catch(IOExceptionex){ pool.shutdown(); //不再执行新任务 } } } classHandlerimplementsRunnable{ privatefinalSocketsocket; Handler(Socketsocket){ this.socket=socket; } publicvoidrun(){ //readandservicerequest } }
ExecutorService(也就是代码里的pool对象)执行shutdown后,它就不能再执行新任务了,但老任务会继续执行完毕,那些等待执行的任务也不再等待了。
任务提交者与执行者通讯
publicstaticvoidmain(Stringargs[])throwsException{ ExecutorServiceexecutor=Executors.newSingleThreadExecutor(); Callabletask=newCallable(){ publicStringcall()throwsException{ return“test”; } } ; Futuref=executor.submit(task); Stringresult=f.get(); //等待(阻塞)返回结果 System.out.println(result); executor.shutdown(); }
Executors.newSingleThreadExecutor()取得的Executor实例有以下特性:
任务顺序执行.比如:
executor.submit(task1); executor.submit(task2);
必须等task1执行完,task2才能执行。
task1和task2会被放入一个队列里,由一个工作线程来处理。即:一共有2个线程(主线程、处理任务的工作线程)。
其它的类请参考JavaDoc
9并发流程控制
本节例子来自温少的Java并发教程,可能会有改动。向温少致敬。
CountDownLatch门插销计数器
启动线程,然后等待线程结束。即常用的主线程等所有子线程结束后再执行的问题。
publicstaticvoidmain(String[]args)throwsException{ //TODOAuto-generatedmethodstub finalintcount=10; finalCountDownLatchcompleteLatch=newCountDownLatch(count); //定义了门插销的数目是10 for(inti=0;iJDK1.4时,常用办法是给子线程设置状态,主线程循环检测。易用性和效率都不好。
启动很多线程,等待通知才能开始
publicstaticvoidmain(String[]args)throwsException{ //TODOAuto-generatedmethodstub finalCountDownLatchstartLatch=newCountDownLatch(1); //定义了一根门插销 for(inti=0;i<10;i++){ Threadthread=newThread("workerthread"+i){ publicvoidrun(){ try{ startLatch.await(); //如果门插销还没减完则等待 } catch(InterruptedExceptione){ } //doxxxx } } ; thread.start(); } startLatch.countDown(); //减少一根门插销 }CycliBarrier.等所有线程都达到一个起跑线后才能开始继续运行。
publicclassCycliBarrierTestimplementsRunnable{ privateCyclicBarrierbarrier; publicCycliBarrierTest(CyclicBarrierbarrier){ this.barrier=barrier; } publicvoidrun(){ //doxxxx; try{ this.barrier.await(); //线程运行至此会检查是否其它线程都到齐了,没到齐就继续等待。到齐了就执行barrier的run函数体里的内容 } catch(Exceptione){ } } /** *@paramargs */ publicstaticvoidmain(String[]args){ //参数2代表两个线程都达到起跑线才开始一起继续往下执行 CyclicBarrierbarrier=newCyclicBarrier(2,newRunnable(){ publicvoidrun(){ //doxxxx; } } ); Threadt1=newThread(newCycliBarrierTest(barrier)); Threadt2=newThread(newCycliBarrierTest(barrier)); t1.start(); t2.start(); } }这简化了传统的用计数器+wait/notifyAll来实现该功能的方式。
10并发3定律
Amdahl定律.给定问题规模,可并行化部分占12%,那么即使把并行运用到极致,系统的性能最多也只能提高1/(1-0.12)=1.136倍。即:并行对提高系统性能有上限。
Gustafson定律.Gustafson定律说Amdahl定律没有考虑随着cpu的增多而有更多的计算能力可被使用。其本质在于更改问题规模从而可以把Amdahl定律中那剩下的88%的串行处理并行化,从而可以突破性能门槛。本质上是一种空间换时间。
Sun-Ni定律.是前两个定律的进一步推广。其主要思想是计算的速度受限于存储而不是CPU的速度.所以要充分利用存储空间等计算资源,尽量增大问题规模以产生更好/更精确的解.
11由并发到并行
计算机识别物体需要飞速的计算,以至于芯片发热发烫,而人在识别物体时却一目了然,却并不会导致某个脑细胞被烧热烧焦(夸张)而感到不适,是由于大脑是一个分布式并行运行系统,就像google用一些廉价的linux服务器可以进行庞大复杂的计算一样,大脑内部无数的神经元的独自计算,互相分享成果,从而瞬间完成需要单个cpu万亿次运算才能有的效果。试想,如果在并行处理领域有所创建,将对计算机的发展和未来产生不可估量的影响。当然,其中的挑战也可想而知:许多的问题是并不容易轻易就“分割”的了的。
总结
以上就是本文关于java并发问题概述的全部内容,希望对大家有所帮助。感兴趣的朋友可以继续参阅本站其他相关专题,如有不足之处,欢迎留言指出。感谢朋友们对本站的支持!