5个并发处理技巧代码示例
【译者注】在本文中,作者总结出了5个关于处理并发性程序的技巧,并给出代码示例,让读者更好地理解和使用这5种方法。以下为译文:
1.捕获InterruptedException错误
请检查下面的代码片段:
publicclassTaskimplementsRunnable{ privatefinalBlockingQueuequeue=...; @Override publicvoidrun(){ while(!Thread.currentThread().isInterrupted()){ Stringresult=getOrDefault(()->queue.poll(1L,TimeUnit.MINUTES),"default"); //dosmthwiththeresult } } TgetOrDefault(Callablesupplier,TdefaultValue){ try{ returnsupplier.call(); } catch(Exceptione){ logger.error("Gotexceptionwhileretrievingvalue.",e); returndefaultValue; } } }
代码的问题是,在等待队列中的新元素时,是不可能终止线程的,因为中断的标志永远不会被恢复:
2.BlockingQueue#poll()方法抛出InterruptedException异常,并清除了中断的标志。
3.while中的循环条件(!Thread.currentThread().isInterrupted())的判断是true,因为标记已被清除。
为了防止这种行为,当一个方法被显式抛出(通过声明抛出InterruptedException)或隐式抛出(通过声明/抛出一个原始异常)时,总是捕获InterruptedException异常,并恢复中断的标志。
TgetOrDefault(Callablesupplier,TdefaultValue){ try{ returnsupplier.call(); } catch(InterruptedExceptione){ logger.error("Gotinterruptedwhileretrievingvalue.",e); Thread.currentThread().interrupt(); returndefaultValue; } catch(Exceptione){ logger.error("Gotexceptionwhileretrievingvalue.",e); returndefaultValue; } }
2.使用特定的执行程序来阻止操作
因为一个缓慢的操作而使整个服务器变得无响应,这通常不是开发人员想要的。不幸的是,对于RPC,响应时间通常是不可预测的。
假设服务器有100个工作线程,有一个端点,称为100RPS。在内部,它发出一个RPC调用,通常需要10毫秒。在某个时间点,此RPC的响应时间变为2秒,在峰值期间服务器能够做的惟一的一件事就是等待这些调用,而其他端点则无法访问。
@GET @Path("/genre/{name}") @Produces(MediaType.APPLICATION_JSON) publicResponsegetGenre(@PathParam("name")StringgenreName){ Genregenre=potentiallyVerySlowSynchronousCall(genreName); returnResponse.ok(genre).build(); }
解决这个问题最简单的方法是提交代码,它将阻塞调用变成一个线程池:
@GET @Path("/genre/{name}") @Produces(MediaType.APPLICATION_JSON) publicvoidgetGenre(@PathParam("name")StringgenreName,@SuspendedAsyncResponseresponse){ response.setTimeout(1L,TimeUnit.SECONDS); executorService.submit(()->{ Genregenre=potentiallyVerySlowSynchronousCall(genreName); returnresponse.resume(Response.ok(genre).build()); } ); }
3.传MDC的值
MDC(MappedDiagnosticContext)通常用于存储单个任务的特定值。例如,在web应用程序中,它可能为每个请求存储一个请求id和一个用户id,因此MDC查找与单个请求或整个用户活动相关的日志记录变得更加容易。
2017-08-2714:38:30,893INFO[server-thread-0][requestId=060d8c7f,userId=2928ea66]c.g.s.web.Controller-Message.
可是如果代码的某些部分是在专用线程池中执行的,则线程(提交任务的线程)中MDC就不会被继续传值。在下面的示例中,第7行的日志中包含“requestId”,而第9行的日志则没有:
@GET @Path("/genre/{name}") @Produces(MediaType.APPLICATION_JSON) publicvoidgetGenre(@PathParam("name")StringgenreName,@SuspendedAsyncResponseresponse){ try(MDC.MDCCloseableignored=MDC.putCloseable("requestId",UUID.randomUUID().toString())){ StringgenreId=getGenreIdbyName(genreName); //Synccall logger.trace("Submittingtasktofindgenrewithid'{}'.",genreId); //'requestId'islogged executorService.submit(()->{ logger.trace("Startingtasktofindgenrewithid'{}'.",genreId); //'requestId'isnotlogged Responseresult=getGenre(genreId)//Asynccall .map(artist->Response.ok(artist).build()) .orElseGet(()->Response.status(Response.Status.NOT_FOUND).build()); response.resume(result); } ); } }
这可以通过MDC#getCopyOfContextMap()方法来解决:
... publicvoidgetGenre(@PathParam("name")StringgenreName,@SuspendedAsyncResponseresponse){ try(MDC.MDCCloseableignored=MDC.putCloseable("requestId",UUID.randomUUID().toString())){ ... logger.trace("Submittingtasktofindgenrewithid'{}'.",genreId);//'requestId'islogged withCopyingMdc(executorService,()->{ logger.trace("Startingtasktofindgenrewithid'{}'.",genreId);//'requestId'islogged ... }); } } privatevoidwithCopyingMdc(ExecutorServiceexecutorService,Runnablefunction){ Map
4.更改线程名称
为了简化日志读取和线程转储,可以自定义线程的名称。这可以通过创建ExecutorService时用一个ThreadFactory来完成。在流行的实用程序库中有许多ThreadFactory接口的实现:
com.google.common.util.concurrent.ThreadFactoryBuilde+rinGuava. org.springframework.scheduling.concurrent.CustomizableThreadFactoryinSpring. org.apache.commons.lang3.concurrent.BasicThreadFactoryinApacheCommonsLang3.
ThreadFactorythreadFactory=newBasicThreadFactory.Builder() .namingPattern("computation-thread-%d") .build(); ExecutorServiceexecutorService=Executors.newFixedThreadPool(numberOfThreads,threadFactory);
尽管ForkJoinPool不使用ThreadFactory接口,但也支持对线程的重命名:
ForkJoinPool.ForkJoinWorkerThreadFactoryforkJoinThreadFactory=pool->{ ForkJoinWorkerThreadthread=ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool); thread.setName("computation-thread-"+thread.getPoolIndex()); returnthread; }; ForkJoinPoolforkJoinPool=newForkJoinPool(numberOfThreads,forkJoinThreadFactory,null,false);
将线程转储与默认命名进行比较:
"pool-1-thread-3"#14prio=5os_prio=31tid=0x00007fc06b19f000nid=0x5703runnable[0x0000700001ff9000] java.lang.Thread.State:RUNNABLE atcom.github.sorokinigor.article.tipsaboutconcurrency.setthreadsname.TaskHandler.compute(TaskHandler.java:16) ... "pool-2-thread-3"#15prio=5os_prio=31tid=0x00007fc06aa10800nid=0x5903runnable[0x00007000020fc000] java.lang.Thread.State:RUNNABLE atcom.github.sorokinigor.article.tipsaboutconcurrency.setthreadsname.HealthCheckCallback.recordFailure(HealthChecker.java:21) atcom.github.sorokinigor.article.tipsaboutconcurrency.setthreadsname.HealthChecker.check(HealthChecker.java:9) ... "pool-1-thread-2"#12prio=5os_prio=31tid=0x00007fc06aa10000nid=0x5303runnable[0x0000700001df3000] java.lang.Thread.State:RUNNABLE atcom.github.sorokinigor.article.tipsaboutconcurrency.setthreadsname.TaskHandler.compute(TaskHandler.java:16) ...
与自定义命名进行比较:
"task-handler-thread-1"#14prio=5os_prio=31tid=0x00007fb49c9df000nid=0x5703runnable[0x000070000334a000] java.lang.Thread.State:RUNNABLE atcom.github.sorokinigor.article.tipsaboutconcurrency.setthreadsname.TaskHandler.compute(TaskHandler.java:16) ... "authentication-service-ping-thread-0"#15prio=5os_prio=31tid=0x00007fb49c9de000nid=0x5903runnable[0x0000700003247000] java.lang.Thread.State:RUNNABLE atcom.github.sorokinigor.article.tipsaboutconcurrency.setthreadsname.HealthCheckCallback.recordFailure(HealthChecker.java:21) atcom.github.sorokinigor.article.tipsaboutconcurrency.setthreadsname.HealthChecker.check(HealthChecker.java:9) ... "task-handler-thread-0"#12prio=5os_prio=31tid=0x00007fb49b9b5000nid=0x5303runnable[0x0000700003144000] java.lang.Thread.State:RUNNABLE atcom.github.sorokinigor.article.tipsaboutconcurrency.setthreadsname.TaskHandler.compute(TaskHandler.java:16) ...
想象一下,可能会不止3个线程。
5.使用LongAdder计数器
在高竞争的情况下,会采用java.util.concurrent.atomic.LongAdder进行计数,而不会采用AtomicLong/AtomicInteger。
LongAdder可以跨越多个单元间仍保持值不变,但是如果需要的话,也可以增加它们的值,但与父类AtomicXX比较,这会导致更高的吞吐量,也会增加内存消耗。
LongAddercounter=newLongAdder(); counter.increment(); ... longcurrentValue=counter.sum();
以上就是本文关于5个并发处理技巧代码示例的全部内容,希望对大家有所帮助。感兴趣的朋友可以继续参阅本站:Java并发编程Semaphore计数信号量详解、优化Tomcat配置(内存、并发、缓存等方面)方法详解等,有什么问题可以随时留言,小编会及时回复大家的。感谢朋友们对本站的支持!