一、简介
解决的问题
对于压缩多个文件或数据计算等耗时处理,可以采用多线程的方式来并行处理,减少系统响应时间
二、countDownLatch
CountDownLatch
允许一个或者多个线程去等待其他线程完成操作。
2.1基础使用
- 构造方法
// num:线程数量 CountDownLatch countDownLatch = new CountDownLatch(num);
- 分支线程执行完后调用
countDownLatch.countDown();
- 主线程等待其它线程执行完成
countDownLatch.await(); countDownLatch.await(30L, TimeUnit.SECONDS); // 指定超时时间
2.2示例
- 由于线程不同,对于线程局部变量需要重新赋值,否则获取不到,例如:当前登录用户信息
- 调用主线程方法时注意线程安全问题
- countDown需要在finally块执行
CountDownLatch countDownLatch = new CountDownLatch(6);
HttpService httpService = HttpServiceHolder.get();
StringBuilder errorMsg = new StringBuilder();
new Thread(() -> {
try {
// 如果使用了ThreadLocal记录当前用户信息,需要给新线程赋值,否则取不到数据
HttpServiceHolder.set(httpService);
// ... 业务处理,可以引用主线程变量与方法,但要注意【线程安全】问题
} catch (Exception e) {
errorMsg.append(e.getMessage());
e.printStackTrace();
} finally {
// 线程执行完调用
countDownLatch.countDown();
}
}).start();
// .... 以此类推,可构造多个线程
try {
// 主线程等待其它线程执行结束
countDownLatch.await(30L, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
}
三、线程池简介
3.1为什么使用线程池
- 线程池的作用是提高系统的性能和线程的利用率,不再需要频繁的创建和销毁线程。如果使用最简单的方式创建线程,在用户量巨大的情况下,消耗的性能是非常恐怖的,所以才有了线程池。
- 避免频繁的创建和销毁线程,减小开销,实现复用
3.2ThredadPoolExcutor
最常用的 ThredadPoolExcutor ,如下是 ThreadPoolExecutor 的构造函数
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
//省略代码
}
分析构造参数:
- corePoolSize:线程池里的核心线程数量,当正在运行的线程数量小于核心线程数量,就创建一个核心线程。
- maximumPoolSize:线程池最多能放多少个线程。
- keepAliveTime:线程的闲置时间,当线程池里面的线程数量大于 corePoolSize 的时候,多出来的线程在等待的时间之后会被释放掉
- unit:keepAliveTime 的单位
- workQueue:一个阻塞队列。
- threadFactory:通过这个工厂模式创建线程。
- handler:处理线程队列满了报错的。
3.3线程池执行流程
- 首先判断核心线程 corePoolSize 是不是满了,如果没有满,就执行任务,否则就进入下一步。
- 线程池判断任务队列 workQueue 是否了,如果没有满,则将新提交的任务放入在这个任务队列里。如果任务队列满了,则进入一步。
- 判断线程池里的线程达到了最大线程数 maximumPoolSize,如果没有,则创建一个新的线程来执行任务。如果已经满了,则交给拒绝策略来处理这个任务(抛出**RejectedExecutionException **异常)。
四、基于 ExecutorService 自定义线程池(Java 5中引入的)
Java 5中引入的,其内部使用了线程池机制,它在java.util.cocurrent 包下。
4.1自定义线程池
private Logger logger = LoggerFactory.getLogger(InitBeanConfig.class);
@Bean
public ExecutorService callbackThreadPool() {
ThreadFactory factory = new ThreadFactoryBuilder()
.setUncaughtExceptionHandler((t, e) -> logger.error(t.getName() + " excute error:", e))
.setNameFormat("callback-pool-%d").build();
int corePoolSize = 3;
int maxPoolSize = 4;
long keepAliveTime = 5;
ExecutorService pool = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, TimeUnit.MINUTES,
new ArrayBlockingQueue<Runnable>(100000), factory, new ThreadPoolExecutor.CallerRunsPolicy());
return pool;
}
4.2使用案例
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.util.concurrent.ExecutorService;
@RestController
@RequestMapping("thread")
public class ThreadPoolController {
@Resource
private ExecutorService callbackThreadPool;
@GetMapping
public Object thread(){
callbackThreadPool.execute(() -> {
// 代码块
});
return 1;
}
}
五、基于 ThreadPoolTaskExecutor 线程池的使用 (Spring提供,以及监听线程池)
在springboot项目启动,如下查询加载的Bean
5.1使用案例
@RestController
public class PageController {
// applicationTaskExecutor 为spring注册时定义得 beanName
@Autowired
ThreadPoolTaskExecutor applicationTaskExecutor;
// 开辟两个线程,后等待两个线程 都执行完的案例
@GetMapping("/thread")
public Object thread() throws ExecutionException, InterruptedException {
CompletableFuture<Void> completableFuture1 = CompletableFuture.runAsync(() -> {
for(int i = 0 ; i < 100000 ; i++){
System.out.println("a-"+i);
}
}, applicationTaskExecutor);
CompletableFuture<Void> completableFuture2 = CompletableFuture.runAsync(() -> {
for(int i = 0 ; i < 100000 ; i++){
System.out.println("w-"+i);
}
}, applicationTaskExecutor);
// 等待这两个线程都执行完
CompletableFuture.allOf(completableFuture1, completableFuture2).get();
return "success";
}
}
六、自定义 ThreadPoolTaskExecutor 线程池
自定义设置线程的最大线程数等参数。
与使用spring的区别就是线程池配置的bean改成自定义的了
6.1自定义bean
@Configuration
public class ThreadPoolConfig {
@Bean
public ThreadPoolTaskExecutor myThreadPoolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
//此方法返回可用处理器的虚拟机的最大数量; 不小于1,等于1时会变成串行
int core = Runtime.getRuntime().availableProcessors();
//设置核心线程数
executor.setCorePoolSize(core);
//设置最大线程数
executor.setMaxPoolSize(core * 2 + 1);
//除核心线程外的线程存活时间
executor.setKeepAliveSeconds(3);
//如果传入值大于0,底层队列使用的是LinkedBlockingQueue,否则默认使用SynchronousQueue
executor.setQueueCapacity(40);
//线程名称前缀
executor.setThreadNamePrefix("my-thread-execute");
//设置拒绝策略
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
return executor;
}
}
6.2使用案例
6.2.1无返回值的线程任务runAsync
@Autowired
ThreadPoolTaskExecutor myThreadPoolTaskExecutor; // myThreadPoolTaskExecutor 为beanName
@GetMapping("/thread")
public Object thread() throws ExecutionException, InterruptedException {
CompletableFuture<Void> completableFuture1 = CompletableFuture.runAsync(() -> {
for(int i = 0 ; i < 100000 ; i++){
System.out.println("a-"+i);
}
}, myThreadPoolTaskExecutor);
CompletableFuture<Void> completableFuture2 = CompletableFuture.runAsync(() -> {
for(int i = 0 ; i < 100000 ; i++){
System.out.println("w-"+i);
}
}, myThreadPoolTaskExecutor);
// 等待两个线程执行完
CompletableFuture.allOf(completableFuture1, completableFuture2)
.get(8L, TimeUnit.SECONDS);
return "success";
}
6.2.1有返回值的线程任务supplyAsync
CompletableFuture<String> completableFuture1 = CompletableFuture.supplyAsync(() -> {
// do something
return "线程1执行结果";
}, myThreadPoolTaskExecutor);
CompletableFuture<String> completableFuture2 = CompletableFuture.supplyAsync(() -> {
// do something
return "线程2执行结果";
}, myThreadPoolTaskExecutor);
CompletableFuture.allOf(sumCompletableFuture, dataCompletableFuture)
.get(8L, TimeUnit.SECONDS);
参考:
- 为什么使用线程池? - Java技术指北的回答 - 知乎 https://www.zhihu.com/question/41134816/answer/2053696306
- Springboot项目中如何使用线程池 https://blog.csdn.net/weixin_40516924/article/details/121098799