Spring Boot 异步框架的使用详解
1.前言
随着数据量和调用量的增长,用户对应用的性能要求越来越高。另外,在实际的服务中,还存在着这样的场景:系统在组装数据的时候,对于数据的各个部分的获取实际上是没有前后依赖关系的。这些问题都很容易让我们想到将这些同步调用全都改造为异步调用。不过自己实现起来比较麻烦,还容易出错。好在Spring已经提供了该问题的解决方案,而且使用起来十分方便。
2.Spring异步执行框架的使用方法
2.1maven依赖
Spring异步执行框架的相关bean包含在spring-context和spring-aop模块中,所以只要引入上述的模块即可。
2.2开启异步任务支持
Spring提供了@EnableAsync的注解来标注是否启用异步任务支持。使用方式如下:
@Configuration @EnableAsync publicclassAppConfig{ }
Note:@EnableAsync必须要配合@Configuration使用,否则会不生效
2.3方法标记为异步调用
将同步方法的调用改为异步调用也很简单。对于返回值为void的方法,直接加上@Async注解即可。对于有返回值的方法,除了加上上述的注解外,还需要将方法的返回值修改为Future类型和将返回值用AsyncResult包装起来。如下所示:
//无返回值的方法直接加上注解即可。 @Async publicvoidmethod1(){ ... } //有返回值的方法需要修改返回值。 @Async publicFuture
2.4方法调用
对于void的方法,和普通的调用没有任何区别。对于非void的方法,由于返回值是Future类型,所以需要用get()方法来获取返回值。如下所示:
publicstaticvoidmain(String[]args){ service.method1(); Future
3.原理简介
这块的源码的逻辑还是比较简单的,主要是Spring帮我们生成并管理了一个线程池,然后方法调用的时候使用动态代理将方法的执行包装为Callable类型并提交到线程池中执行。核心的实现逻辑在AsyncExecutionInterceptor类的invoke()方法中。如下所示:
@Override publicObjectinvoke(finalMethodInvocationinvocation)throwsThrowable{ Class>targetClass=(invocation.getThis()!=null?AopUtils.getTargetClass(invocation.getThis()):null); MethodspecificMethod=ClassUtils.getMostSpecificMethod(invocation.getMethod(),targetClass); finalMethoduserDeclaredMethod=BridgeMethodResolver.findBridgedMethod(specificMethod); AsyncTaskExecutorexecutor=determineAsyncExecutor(userDeclaredMethod); if(executor==null){ thrownewIllegalStateException( "NoexecutorspecifiedandnodefaultexecutorsetonAsyncExecutionInterceptoreither"); } Callable
4.自定义taskExecutor及异常处理
4.1自定义taskExecutor
Spring查找TaskExecutor逻辑是:
1.如果Springcontext中存在唯一的TaskExecutorbean,那么就使用这个bean。
2.如果1中的bean不存在,那么就会查找是否存在一个beanName为taskExecutor且是java.util.concurrent.Executor实例的bean,有则使用这个bean。
3.如果1、2中的都不存在,那么Spring就会直接使用默认的Executor,即SimpleAsyncTaskExecutor。
在第2节的实例中,我们直接使用的是Spring默认的TaskExecutor。但是对于每一个新的任务,SimpleAysncTaskExecutor都是直接创建新的线程来执行,所以无法重用线程。具体的执行的代码如下:
@Override publicvoidexecute(Runnabletask,longstartTimeout){ Assert.notNull(task,"Runnablemustnotbenull"); RunnabletaskToUse=(this.taskDecorator!=null?this.taskDecorator.decorate(task):task); if(isThrottleActive()&&startTimeout>TIMEOUT_IMMEDIATE){ this.concurrencyThrottle.beforeAccess(); doExecute(newConcurrencyThrottlingRunnable(taskToUse)); } else{ doExecute(taskToUse); } } protectedvoiddoExecute(Runnabletask){ Threadthread=(this.threadFactory!=null?this.threadFactory.newThread(task):createThread(task)); thread.start(); }
所以我们在使用的时候,最好是使用自定义的TaskExecutor。结合上面描述的Spring查找TaskExecutor的逻辑,最简单的自定义的方法是使用@Bean注解。示例如下:
//ThreadPoolTaskExecutor的配置基本等同于线程池 @Bean("taskExecutor") publicExecutorgetAsyncExecutor(){ ThreadPoolTaskExecutortaskExecutor=newThreadPoolTaskExecutor(); taskExecutor.setMaxPoolSize(MAX_POOL_SIZE); taskExecutor.setCorePoolSize(CORE_POOL_SIZE); taskExecutor.setQueueCapacity(CORE_POOL_SIZE*10); taskExecutor.setThreadNamePrefix("wssys-async-task-thread-pool"); taskExecutor.setWaitForTasksToCompleteOnShutdown(true); taskExecutor.setAwaitTerminationSeconds(60*10); taskExecutor.setRejectedExecutionHandler(newThreadPoolExecutor.AbortPolicy()); returntaskExecutor; }
另外,Spring还提供了一个AsyncConfigurer接口,通过实现该接口,除了可以实现自定义Executor以外,还可以自定义异常的处理。代码如下:
@Configuration @Slf4j publicclassAsyncConfigimplementsAsyncConfigurer{ privatestaticfinalintMAX_POOL_SIZE=50; privatestaticfinalintCORE_POOL_SIZE=20; @Override @Bean("taskExecutor") publicExecutorgetAsyncExecutor(){ ThreadPoolTaskExecutortaskExecutor=newThreadPoolTaskExecutor(); taskExecutor.setMaxPoolSize(MAX_POOL_SIZE); taskExecutor.setCorePoolSize(CORE_POOL_SIZE); taskExecutor.setQueueCapacity(CORE_POOL_SIZE*10); taskExecutor.setThreadNamePrefix("async-task-thread-pool"); taskExecutor.setWaitForTasksToCompleteOnShutdown(true); taskExecutor.setAwaitTerminationSeconds(60*10); taskExecutor.setRejectedExecutionHandler(newThreadPoolExecutor.AbortPolicy()); returntaskExecutor; } @Override publicAsyncUncaughtExceptionHandlergetAsyncUncaughtExceptionHandler(){ return(ex,method,params)->log.error("invokeasyncmethodoccurserror.method:{},params:{}", method.getName(),JSON.toJSONString(params),ex); } }
Note:
Spring还提供了一个AsyncConfigurerSupport类,该类也实现了AsyncConfigurer接口,且方法的返回值都是null,旨在提供一个方便的实现。
当getAsyncExecutor()方法返回null的时候,Spring会使用默认的处理器(强烈不推荐)。
当getAsyncUncaughtExceptionHandler()返回null的时候,Spring会使用SimpleAsyncUncaughtExceptionHandler来处理异常,该类会打印出异常的信息。
所以对该类的使用,最佳的实践是继承该类,并且覆盖实现getAsyncExecutor()方法。
4.2异常处理
Spring异步框架对异常的处理如下所示:
//所在类:AsyncExecutionAspectSupport protectedvoidhandleError(Throwableex,Methodmethod,Object...params)throwsException{ if(Future.class.isAssignableFrom(method.getReturnType())){ ReflectionUtils.rethrowException(ex); } else{ //Couldnottransmittheexceptiontothecallerwithdefaultexecutor try{ this.exceptionHandler.handleUncaughtException(ex,method,params); } catch(Throwableex2){ logger.error("Exceptionhandlerforasyncmethod'"+method.toGenericString()+ "'threwunexpectedexceptionitself",ex2); } } }
从代码来看,如果返回值是Future类型,那么直接将异常抛出。如果返回值不是Future类型(基本上包含的是所有返回值void类型的方法,因为如果方法有返回值,必须要用Future包装起来),那么会调用handleUncaughtException方法来处理异常。
注意:在handleUncaughtException()方法中抛出的任何异常,都会被SpringCatch住,所以没有办法将void的方法再次抛出并传播到上层调用方的!!!
关于Spring这个设计的缘由我的理解是:既然方法的返回值是void,就说明调用方不关心方法执行是否成功,所以也就没有必要去处理方法抛出的异常。如果需要关心异步方法是否成功,那么返回值改为boolean就可以了。
4.4最佳实践的建议
- @Async可以指定方法执行的Executor,用法:@Async("MyTaskExecutor")。推荐指定Executor,这样可以避免因为Executor配置没有生效而Spring使用默认的Executor的问题。
- 实现接口AsyncConfigurer的时候,方法getAsyncExecutor()必须要使用@Bean,并指定Bean的name。如果不使用@Bean,那么该方法返回的Executor并不会被Spring管理。用javadocapi的原话是:isnotafullymanagedSpringbean.(具体含义没有太理解,不过亲测不加这个注解无法正常使用)
- 由于其本质上还是基于代理实现的,所以如果一个类中有A、B两个异步方法,而A中存在对B的调用,那么调用A方法的时候,B方法不会去异步执行的。
- 在异步方法上标注@Transactional是无效的。
- future.get()的时候,最好使用get(longtimeout,TimeUnitunit)方法,避免长时间阻塞。
- ListenableFuture和CompletableFuture也是推荐使用的,他们相比Future,提供了对异步调用的各个阶段或过程进行介入的能力。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。