Spring - @Async实现异步调用

wuchangjian2021-11-03 11:06:42编程学习
★.区分:
同步调用:顺序执行调用,当执行完毕,并且返回结果时,才继续下一个调用。
异步调用:发送调用指令,无需等待被调用的方法执行完毕,继续执行下面的流程。

一、@Aysnc

Spring中,基于@Async标注的方法or类,称为异步方法or类。这些方法在执行时,将会在独立的线程中被执行。

@Async默认异步配置使用SimpleAsyncTaskExecutor,该线程池默认给每一个新任务创建新的线程。如果系统中不断地创建线程,会导致系统占用内存过高,达不到线程复用的功能。

自定义线程池,可以控制线程数量、线程的存活时间,避免出现出现大量线程占用cpu的情况。

二、@Aysnc的使用

1.配置自定义线程池

方法一:

在启动类添加自定义配置,但在后续使用@Async注解时,需要在注解后指定线程池。
eg:@Async(“asyncTaskExecutor”)

@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }

    @EnableAsync
    @Configuration
    class TaskPoolConfig {

        @Bean("asyncTaskExecutor")
        public Executor taskExecutor() {
            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            //核心线程数10:线程池创建时候初始化的线程数
            executor.setCorePoolSize(10);
            //最大线程数20:线程池最大的线程数,只有在缓冲队列满了之后才会申请超过核心线程数的线程
            executor.setMaxPoolSize(20);
            // 缓冲队列200:用来缓冲执行任务的队列
            executor.setQueueCapacity(200);
            // 允许线程的空闲时间60秒:当超过了核心线程数之外的线程在空闲时间到达之后会被销毁
            executor.setKeepAliveSeconds(60);
            // 线程池名的前缀:设置好了之后可以方便我们定位处理任务所在的线程池
            executor.setThreadNamePrefix("Async-TaskExecutor-");
            // 线程池对拒绝任务的处理策略:这里采用了CallerRunsPolicy策略,当线程池没有处理能力的时候,该策略会直接在 execute 方法的调用线程中运行被拒绝的任务;如果执行程序已关闭,则会丢弃该任务
            executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
            return executor;
        }
    }
}

为了在使用时不需要再注解后指定线程池,可做如下配置。

@Configuration
@EnableAsync
public class AsyncThreadConfig implements AsyncConfigurer {
 
	@Autowired
	private ThreadPoolTaskExecutor threadPoolTaskExecutor;
 
	@Autowired
	private ThreadExceptionHandler threadExceptionHandler;
 
	@Override
	public Executor getAsyncExecutor() {
		return threadPoolTaskExecutor;
	}
 
	@Override
	public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
		return threadExceptionHandler;
	}
}
方法二:

在启动类使用注解@EnableAsync

@EnableAsync 	// 启用@Async
@SpringBootApplication
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

在application.yml配置文件中,指定@Async注解的各类配置

async:
    corePoolSize: 10	//核心线程数:线程创建时初始化的线程数
    maxPoolSize: 20		// 线程池最大线程数,只有在缓冲队列满了之后才会申请超过核心线程数的线程
    queueCapacity: 200		// 缓冲队列,用来缓冲执行任务的队列
    keepAliveSeconds: 300		// 允许线程的空闲时间:超过了核心线程数之外的线程在空闲时间到达之后会被销毁
    threadNamePrefix: 'async-executor-'		// 线程池名前缀

获得异步线程配置 AsyncProperties

@Data
@ConfigurationProperties(PREFIX)
public class AsyncProperties {

    public static final String PREFIX = "async";

    private boolean enabled = true;
    /**
     * 异步核心线程数,默认:2
     */
    private int corePoolSize = 2;
    /**
     * 异步最大线程数,默认:50
     */
    private int maxPoolSize = 50;
    /**
     * 队列容量,默认:10000
     */
    private int queueCapacity = 10000;
    /**
     * 线程存活时间,默认:300
     */
    private int keepAliveSeconds = 300;
    /**
     * 线程名前缀
     */
    private String threadNamePrefix = "async-executor-";

异步任务配置 DefaultAsyncTaskConfig

@Data
@AllArgsConstructor
@Configuration
@EnableAsync(proxyTargetClass = true)
@EnableConfigurationProperties({AsyncProperties.class})
@ConditionalOnProperty(prefix = AsyncProperties.PREFIX, name = "enabled", havingValue = "true", matchIfMissing = true)
public class DefaultAsyncTaskConfig extends AsyncConfigurerSupport {
    
    private final AsyncProperties asyncProperties;

    @Override
    @Bean
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(asyncProperties.getCorePoolSize());
        executor.setMaxPoolSize(asyncProperties.getMaxPoolSize());
        executor.setQueueCapacity(asyncProperties.getQueueCapacity());
        executor.setKeepAliveSeconds(asyncProperties.getKeepAliveSeconds());
        executor.setThreadNamePrefix(asyncProperties.getThreadNamePrefix());
        /*
           rejection-policy:当pool已经达到max size的时候,如何处理新任务
           CALLER_RUNS:不在新线程中执行任务,而是有调用者所在的线程来执行
        */
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        return executor;
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return new SimpleAsyncUncaughtExceptionHandler();
    }
}

2.创建异步方法类

因为在同一个service方法中调用异步注解不会生效。

import java.util.List;
import java.util.concurrent.Future;

/**
 * 异步调用方法 服务层
 */
public interface AsyncService {

    // 存在返回值的方法
    Future<Integer> getQueryNum(QueryParam param);
    
    Future<Integer> getQueryNum2(QueryParam param);

	// 无返回值的方法
    void getQueryNum3(QueryParam param);

}

3.实现异步方法

import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.AsyncResult;
import org.springframework.stereotype.Service;

import javax.annotation.Resource;
import java.util.List;
import java.util.concurrent.Future;

@Async
@Service
public class AsyncServiceImpl implements AsyncService {

    @Resource
    private CustomerMapper customerMapper;

    @Override
    public Future<Integer> getQueryNum(QueryParam param) {
        return new AsyncResult<>(customerMapper.getQueryNum(param));
    }

	@Override
    public Future<Integer> getQueryNum2(QueryParam param) {
        return new AsyncResult<>(customerMapper.getQueryNum(param));
    }

	@Override
    public void getQueryNum3(QueryParam param) {
    	customerMapper.getQueryNum(param);
        return null;
    }
}

4.调用异步方法

public CustomerVO getVariousStatistics(QueryParam param) throws ExecutionException, InterruptedException {

        // 封装结果对象
        CustomerVO customerVO = new CustomerVO ();

       // 异步查询方法
        Future<Integer> num = asyncService.getQueryNum(bparam);
        Future<Integer> num2 = asyncService.getQueryNum2(param);
       
		// 如果有多个异步调用,需要等待结果都返回了再组装结果
        // 等待异步查询全部返回
        while (true) {
            if (num .isDone() && num2 .isDone()) {
                break;
            }
        }

        // 采集人数
        customerVO .setNum(num .get());
        // 转运人数
        customerVO .setNum2(num2 .get());

		// 将结果返回
        return collectionVO;
}

参考文章

Spring中@Async用法总结
Spring使用@Async注解,多线程
使用@Async注解创建多线程,自定义线程池

发表评论    

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。