多线程使用&线程池

2022-04-14

一、简介

解决的问题

对于压缩多个文件或数据计算等耗时处理,可以采用多线程的方式来并行处理,减少系统响应时间

二、countDownLatch

CountDownLatch允许一个或者多个线程去等待其他线程完成操作。

2.1基础使用

  • 构造方法
    // num:线程数量
    CountDownLatch countDownLatch = new CountDownLatch(num);
    
  • 分支线程执行完后调用
     countDownLatch.countDown();
    
  • 主线程等待其它线程执行完成
    countDownLatch.await();
    countDownLatch.await(30L, TimeUnit.SECONDS); // 指定超时时间
    

2.2示例

  • 由于线程不同,对于线程局部变量需要重新赋值,否则获取不到,例如:当前登录用户信息
  • 调用主线程方法时注意线程安全问题
  • countDown需要在finally块执行
CountDownLatch countDownLatch = new CountDownLatch(6);
HttpService httpService = HttpServiceHolder.get();
StringBuilder errorMsg = new StringBuilder();

new Thread(() -> {
    try {
        // 如果使用了ThreadLocal记录当前用户信息,需要给新线程赋值,否则取不到数据
        HttpServiceHolder.set(httpService);

        // ... 业务处理,可以引用主线程变量与方法,但要注意【线程安全】问题

    } catch (Exception e) {
        errorMsg.append(e.getMessage());
        e.printStackTrace();
    } finally {
        // 线程执行完调用
        countDownLatch.countDown();
    }
}).start();

// .... 以此类推,可构造多个线程

try {
    // 主线程等待其它线程执行结束
    countDownLatch.await(30L, TimeUnit.SECONDS);
} catch (InterruptedException e) {
    e.printStackTrace();
}

三、线程池简介

3.1为什么使用线程池

  • 线程池的作用是提高系统的性能和线程的利用率,不再需要频繁的创建和销毁线程。如果使用最简单的方式创建线程,在用户量巨大的情况下,消耗的性能是非常恐怖的,所以才有了线程池。
  • 避免频繁的创建和销毁线程,减小开销,实现复用

3.2ThredadPoolExcutor

最常用的 ThredadPoolExcutor ,如下是 ThreadPoolExecutor 的构造函数

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
    //省略代码
 }

分析构造参数:

  • corePoolSize:线程池里的核心线程数量,当正在运行的线程数量小于核心线程数量,就创建一个核心线程。
  • maximumPoolSize:线程池最多能放多少个线程。
  • keepAliveTime:线程的闲置时间,当线程池里面的线程数量大于 corePoolSize 的时候,多出来的线程在等待的时间之后会被释放掉
  • unit:keepAliveTime 的单位
  • workQueue:一个阻塞队列
  • threadFactory:通过这个工厂模式创建线程。
  • handler:处理线程队列满了报错的。

image.png

3.3线程池执行流程

  • 首先判断核心线程 corePoolSize 是不是满了,如果没有满,就执行任务,否则就进入下一步。
  • 线程池判断任务队列 workQueue 是否了,如果没有满,则将新提交的任务放入在这个任务队列里。如果任务队列满了,则进入一步。
  • 判断线程池里的线程达到了最大线程数 maximumPoolSize,如果没有,则创建一个新的线程来执行任务。如果已经满了,则交给拒绝策略来处理这个任务(抛出**RejectedExecutionException **异常)。

image.png

四、基于 ExecutorService 自定义线程池(Java 5中引入的)

Java 5中引入的,其内部使用了线程池机制,它在java.util.cocurrent 包下。

4.1自定义线程池

private Logger logger = LoggerFactory.getLogger(InitBeanConfig.class);

@Bean
public ExecutorService callbackThreadPool() {
    ThreadFactory factory = new ThreadFactoryBuilder()
        .setUncaughtExceptionHandler((t, e) -> logger.error(t.getName() + " excute error:", e))
        .setNameFormat("callback-pool-%d").build();
    int corePoolSize = 3;
    int maxPoolSize = 4;
    long keepAliveTime = 5;
    ExecutorService pool = new ThreadPoolExecutor(corePoolSize, maxPoolSize, keepAliveTime, TimeUnit.MINUTES,
                                                  new ArrayBlockingQueue<Runnable>(100000), factory, new ThreadPoolExecutor.CallerRunsPolicy());
    return pool;
}

4.2使用案例

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
 
import javax.annotation.Resource;
import java.util.concurrent.ExecutorService;
 
@RestController
@RequestMapping("thread")
public class ThreadPoolController {
 
    @Resource
    private ExecutorService callbackThreadPool;
 
    @GetMapping
    public Object thread(){
        callbackThreadPool.execute(() -> {
            // 代码块
        });
        return 1;
    }
}

五、基于 ThreadPoolTaskExecutor 线程池的使用 (Spring提供,以及监听线程池)

在springboot项目启动,如下查询加载的Bean

image.png

5.1使用案例

@RestController
public class PageController {

    // applicationTaskExecutor 为spring注册时定义得 beanName
    @Autowired
    ThreadPoolTaskExecutor applicationTaskExecutor;  


    // 开辟两个线程,后等待两个线程 都执行完的案例
    @GetMapping("/thread")
    public Object thread() throws ExecutionException, InterruptedException {
        CompletableFuture<Void> completableFuture1 = CompletableFuture.runAsync(() -> {
            for(int i = 0 ; i < 100000 ; i++){
                System.out.println("a-"+i);
            }
        }, applicationTaskExecutor);

        CompletableFuture<Void> completableFuture2 = CompletableFuture.runAsync(() -> {
            for(int i = 0 ; i < 100000 ; i++){
                System.out.println("w-"+i);
            }
        }, applicationTaskExecutor);

        // 等待这两个线程都执行完
        CompletableFuture.allOf(completableFuture1, completableFuture2).get();
        return "success";
    }
}

六、自定义 ThreadPoolTaskExecutor 线程池

自定义设置线程的最大线程数等参数。

与使用spring的区别就是线程池配置的bean改成自定义的了

6.1自定义bean

@Configuration
public class ThreadPoolConfig {

    @Bean
    public ThreadPoolTaskExecutor myThreadPoolTaskExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        //此方法返回可用处理器的虚拟机的最大数量; 不小于1,等于1时会变成串行
        int core = Runtime.getRuntime().availableProcessors();
        //设置核心线程数
        executor.setCorePoolSize(core);
        //设置最大线程数
        executor.setMaxPoolSize(core * 2 + 1);
        //除核心线程外的线程存活时间
        executor.setKeepAliveSeconds(3);
        //如果传入值大于0,底层队列使用的是LinkedBlockingQueue,否则默认使用SynchronousQueue
        executor.setQueueCapacity(40);
        //线程名称前缀
        executor.setThreadNamePrefix("my-thread-execute");
        //设置拒绝策略
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        return executor;
    }

}

6.2使用案例

6.2.1无返回值的线程任务runAsync

@Autowired
ThreadPoolTaskExecutor myThreadPoolTaskExecutor;  // myThreadPoolTaskExecutor 为beanName
 
@GetMapping("/thread")
public Object thread() throws ExecutionException, InterruptedException {
 
    CompletableFuture<Void> completableFuture1 = CompletableFuture.runAsync(() -> {
        for(int i = 0 ; i < 100000 ; i++){
            System.out.println("a-"+i);
        }
    }, myThreadPoolTaskExecutor);
 
    CompletableFuture<Void> completableFuture2 = CompletableFuture.runAsync(() -> {
        for(int i = 0 ; i < 100000 ; i++){
            System.out.println("w-"+i);
        }
    }, myThreadPoolTaskExecutor);
 
    // 等待两个线程执行完
    CompletableFuture.allOf(completableFuture1, completableFuture2)
        .get(8L, TimeUnit.SECONDS);
    return "success";
}

6.2.1有返回值的线程任务supplyAsync

CompletableFuture<String> completableFuture1 = CompletableFuture.supplyAsync(() -> {
    // do something
    return "线程1执行结果";
}, myThreadPoolTaskExecutor);


CompletableFuture<String> completableFuture2 = CompletableFuture.supplyAsync(() -> {
    // do something
    return "线程2执行结果";
}, myThreadPoolTaskExecutor);


CompletableFuture.allOf(sumCompletableFuture, dataCompletableFuture)
    .get(8L, TimeUnit.SECONDS);

参考: