Java并发相关

Table of Contents

并发编程三个重要特性

  • 原子性
    • 一次操作或者多次操作,要么所有的操作全部都得到执行并且不会受到任何因素的干扰而中断,要么都不执行。
    • 在 Java 中,可以借助synchronized、各种 Lock 以及各种原子类实现原子性。
    • synchronized 和各种 Lock 可以保证任一时刻只有一个线程访问该代码块,因此可以保障原子性。
    • 各种原子类是利用 CAS (compare and swap) 操作(可能也会用到 volatile或者final关键字)来保证原子操作。
  • 可见性
    • 当一个线程对共享变量进行了修改,那么另外的线程都是立即可以看到修改后的最新值。
    • 在 Java 中,可以借助synchronized、volatile 以及各种 Lock 实现可见性。
    • 如果我们将变量声明为 volatile ,这就指示 JVM,这个变量是共享且不稳定的,每次使用它都到主存中进行读取。
  • 有序性
    • 由于指令重排序问题,代码的执行顺序未必就是编写代码时候的顺序。
    • 指令重排序可以保证串行语义一致,但是没有义务保证多线程间的语义也一致 ,所以在多线程下,指令重排序可能会导致一些问题。
    • 在 Java 中,volatile 关键字可以禁止指令进行重排序优化。

Java内存模型(Java Memory Model)

  • Java 内存模型(JMM) 抽象了线程和主内存之间的关系,就比如说线程之间的共享变量必须存储在主内存中。
  • Java 是最早尝试提供内存模型的语言,其主要目的是为了简化多线程编程,增强程序可移植性的。

什么是主内存?什么是本地内存?

  • 主内存
    • 所有线程创建的实例对象都存放在主内存中,不管该实例对象是成员变量,还是局部变量,类信息、常量、静态变量都是放在主内存中。
    • 为了获取更好的运行速度,虚拟机及硬件系统可能会让工作内存优先存储于寄存器和高速缓存中。
  • 本地内存
    • 每个线程都有一个私有的本地内存,本地内存存储了该线程以读 / 写共享变量的副本。
    • 每个线程只能操作自己本地内存中的变量,无法直接访问其他线程的本地内存。
    • 如果线程间需要通信,必须通过主内存来进行。
    • 本地内存是 JMM 抽象出来的一个概念,并不真实存在,它涵盖了缓存、写缓冲区、寄存器以及其他的硬件和编译器优化。

Java线程池

目的 :池化技术的思想主要是为了减少每次获取资源的消耗,提高对资源的利用率。

好处:

  1. 降低资源消耗 。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  2. 提高响应速度 。当任务到达时,任务可以不需要等到线程创建就能立即执行。
  3. 提高线程的可管理性 。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。

使用场景:

  • 线程池一般用于执行 多个不相关联 的耗时任务,没有多线程的情况下,任务顺序执行
  • 使用了线程池的话可让多个不相关联的任务同时执行。

Executor框架

  • Executor 框架是 Java5 之后引进的,在 Java 5 之后,通过 Executor 来启动线程比使用 Thread 的 start 方法更好;
  • 更易管理,效率更好。因为内部使用线程池实现,节约开销;
  • 有助于避免 this 逃逸问题。
    • this 逃逸 是指 在构造函数返回之前其他线程就持有该对象的引用,调用尚未构造完全的对象的方法可能引发令人疑惑的错误。
  • Executor 框架不仅包括了线程池的管理,还提供了线程工厂、队列以及拒绝策略等。

三大组成部分

  1. 任务(Runnable /Callable)
    • 执行任务需要实现的 Runnable 接口 或 Callable接口。
  2. 任务的执行(Executor)
    • Runnable 接口或 Callable 接口 实现类都可以被 ThreadPoolExecutor 或 ScheduledThreadPoolExecutor 执行。
  3. 异步计算的结果(Future)
    • Future 接口以及 Future 接口的实现类 FutureTask 类都可以代表异步计算的结果。
    • 调用 submit() 方法时会返回一个 FutureTask 对象;

ThreadPoolExecutor 类介绍

  • 线程池实现类 ThreadPoolExecutor 是 Executor 框架最核心的类。

构造方法

/**
 * 用给定的初始参数创建一个新的ThreadPoolExecutor。
 */
public ThreadPoolExecutor(int corePoolSize,//线程池的核心线程数量
                          int maximumPoolSize,//线程池的最大线程数
                          long keepAliveTime,//当线程数大于核心线程数时,多余的空闲线程存活的最长时间
                          TimeUnit unit,//时间单位
                          BlockingQueue<Runnable> workQueue,//任务队列,用来储存等待执行任务的队列
                          ThreadFactory threadFactory,//线程工厂,用来创建线程,一般默认即可
                          RejectedExecutionHandler handler//拒绝策略,当提交的任务过多而不能及时处理时,我们可以定制策略来处理任务
                          ) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

参数解释

ThreadPoolExecutor 3 个最重要的参数:

  • corePoolSize : 任务队列未达到队列容量时,最大可以同时运行的线程数量。
  • maximumPoolSize : 任务队列中存放的任务达到队列容量的时候,当前可以同时运行的线程数量变为最大线程数。
  • workQueue: 新任务来的时候会先判断当前运行的线程数量是否达到核心线程数,如果达到的话,新任务就会被存放在队列中。

ThreadPoolExecutor其他常见参数 :

  • keepAliveTime: 线程池中的线程数量大于 corePoolSize 的时候,如果这时没有新的任务提交,核心线程外的线程不会立即销毁,而是会等待,直到等待的时间超过了 keepAliveTime才会被回收销毁。
  • unit : keepAliveTime 参数的时间单位。
  • threadFactory : executor 创建新线程的时候会用到。
  • handler : ThreadPoolExecutor 拒绝策略。
    • 如果当前同时运行的线程数量达到最大线程数量并且队列也已经被放满了任务时,ThreadPoolExecutor 定义一些策略:
    • ThreadPoolExecutor.AbortPolicy:抛出 RejectedExecutionException来拒绝新任务的处理。
    • ThreadPoolExecutor.CallerRunsPolicy:调用执行自己的线程运行任务,也就是直接在调用execute方法的线程中运行(run)被拒绝的任务,如果执行程序已关闭,则会丢弃该任务。因此这种策略会降低对于新任务提交速度,影响程序的整体性能。如果您的应用程序可以承受此延迟并且你要求任何一个任务请求都要被执行的话,你可以选择这个策略。
    • ThreadPoolExecutor.DiscardPolicy:不处理新任务,直接丢弃掉。
    • ThreadPoolExecutor.DiscardOldestPolicy:此策略将丢弃最早的未处理的任务请求。

线程池创建的两种方式

  • 方式一:通过ThreadPoolExecutor构造函数来创建(推荐)。
  • 方式二:通过 Executor 框架的工具类 Executors 来创建(不推荐,因为会有资源耗尽的风险)。
    • FixedThreadPool 和 SingleThreadExecutor:使用的是无界的 LinkedBlockingQueue,任务队列最大长度为 Integer.MAX_VALUE,可能堆积大量的请求,从而导致 OOM
    • CachedThreadPool:使用的是同步队列 SynchronousQueue, 允许创建的线程数量为 Integer.MAX_VALUE ,如果任务数量过多且执行速度较慢,可能会创建大量的线程,从而导致 OOM
    • ScheduledThreadPool 和 SingleThreadScheduledExecutor:使用的无界的延迟阻塞队列DelayedWorkQueue,任务队列最大长度为 Integer.MAX_VALUE,可能堆积大量的请求,从而导致 OOM

execute 源码分析

// 存放线程池的运行状态 (runState) 和线程池内有效线程的数量 (workerCount)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

private static int workerCountOf(int c) {
    return c & CAPACITY;
}
//任务队列
private final BlockingQueue<Runnable> workQueue;

public void execute(Runnable command) {
    // 如果任务为null,则抛出异常。
    if (command == null)
        throw new NullPointerException();
    // ctl 中保存的线程池当前的一些状态信息
    int c = ctl.get();

    //  下面会涉及到 3 步 操作
    // 1.首先判断当前线程池中执行的任务数量是否小于 corePoolSize
    // 如果小于的话,通过addWorker(command, true)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 2.如果当前执行的任务数量大于等于 corePoolSize 的时候就会走到这里,表明创建新的线程失败。
    // 通过 isRunning 方法判断线程池状态,线程池处于 RUNNING 状态并且队列可以加入任务,该任务才会被加入进去
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 再次获取线程池状态,如果线程池状态不是 RUNNING 状态就需要从任务队列中移除任务,并尝试判断线程是否全部执行完毕。同时执行拒绝策略。
        if (!isRunning(recheck) && remove(command))
            reject(command);
        // 如果当前工作线程数量为0,新创建一个线程并执行。
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    //3. 通过addWorker(command, false)新建一个线程,并将任务(command)添加到该线程中;然后,启动该线程从而执行任务。
    // 传入 false 代表增加线程时判断当前线程数是否少于 maxPoolSize
    //如果addWorker(command, false)执行失败,则通过reject()执行相应的拒绝策略的内容。
    else if (!addWorker(command, false))
        reject(command);
}

这里简单分析一下整个流程(对整个逻辑进行了简化,方便理解):

  1. 如果当前运行的线程数小于核心线程数,那么就会新建一个线程来执行任务。
  2. 如果当前运行的线程数等于或大于核心线程数,但是小于最大线程数,那么就把该任务放入到任务队列里等待执行。
  3. 如果向任务队列投放任务失败(任务队列已经满了),但是当前运行的线程数是小于最大线程数的,就新建一个线程来执行任务。
  4. 如果当前运行的线程数已经等同于最大线程数了,新建线程将会使当前运行的线程超出最大线程数,那么当前任务会被拒绝,拒绝策略会调用RejectedExecutionHandler.rejectedExecution()方法。

Runnable vs Callable

  • Runnable自 Java 1.0 以来一直存在,但Callable仅在 Java 1.5 中引入,目的就是为了来处理Runnable不支持的用例。
  • Runnable 接口不会返回结果或抛出检查异常,但是 Callable 接口可以。
  • 所以,如果任务不需要返回结果或抛出异常推荐使用 Runnable 接口,这样代码看起来会更加简洁。
  • 工具类 Executors 可以实现将 Runnable 对象转换成 Callable 对象。(Executors.callable(Runnable task) 或 Executors.callable(Runnable task, Object result))。
@FunctionalInterface
public interface Runnable {
   /**
    * 被线程执行,没有返回值也无法抛出异常
    */
    public abstract void run();
}
@FunctionalInterface
public interface Callable<V> {
    /**
     * 计算结果,或在无法这样做时抛出异常。
     * @return 计算得出的结果
     * @throws 如果无法计算结果,则抛出异常
     */
    V call() throws Exception;
}

execute() vs submit()

  • execute() 和 submit()是两种提交任务到线程池的方法;
  • execute() 方法用于提交不需要返回值的任务。
    • 通常用于执行 Runnable 任务,无法判断任务是否被线程池成功执行。
    • 异常处理需要通过自定义的 ThreadFactory (在线程工厂创建线程的时候设置UncaughtExceptionHandler对象来 处理异常)或 ThreadPoolExecutor 的 afterExecute() 方法来处理
  • submit() 方法用于提交需要返回值的任务。
    • 可以提交 Runnable 或 Callable 任务。
    • submit() 方法返回一个 Future 对象,通过这个 Future 对象可以判断任务是否执行成功,并获取任务的返回值。
    • get() 方法会阻塞当前线程直到任务完成, get(long timeout,TimeUnit unit) 多了一个超时时间,如果在 timeout 时间内任务还没有执行完,就会抛出 java.util.concurrent.TimeoutException。
    • 可以通过 Future 对象处理任务执行过程中抛出的异常;

shutdown() VS shutdownNow()

  • shutdown() :
    • 关闭线程池,线程池的状态变为 SHUTDOWN。
    • 线程池不再接受新任务了,但是队列里的任务得执行完毕。
  • shutdownNow() :
    • 关闭线程池,线程池的状态变为 STOP。
    • 线程池会终止当前正在运行的任务,并停止处理排队的任务并返回正在等待执行的 List。

isTerminated() VS isShutdown()

  • isShutDown 当调用 shutdown() 方法后返回为 true。
  • isTerminated 当调用 shutdown() 方法后,并且所有提交的任务完成后返回为 true。

Java线程池最佳实践

1、正确声明线程池

  • 线程池必须手动通过 ThreadPoolExecutor 的构造函数来声明,避免使用Executors 类创建线程池,会有 OOM 风险。
  • 使用有界队列,控制线程创建数量。

2、监测线程池运行状态

  • SpringBoot 中的 Actuator 组件。
  • ThreadPoolExecutor 的相关 API 做一个简陋的监控。ThreadPoolExecutor提供了 获取线程池当前的线程数活跃线程数已经执行完成的任务数正在排队中的任务数 等等。

3、建议不同类别的业务用不同的线程池

上面的代码可能会存在死锁的情况,为什么呢?画个图给大家捋一捋。 试想这样一种极端情况:

  • 假如我们线程池的
    • 核心线程数为 n,
    • 父任务(扣费任务)数量为 n,
    • 父任务下面有两个子任务(扣费任务下的子任务),其中一个已经执行完成,另外一个被放在了任务队列中。
  • 由于父任务把线程池核心线程资源用完,所以子任务因为无法获取到线程资源无法正常执行,一直被阻塞在队列中。
  • 父任务等待子任务执行完成,而子任务等待父任务释放线程池资源,这也就造成了 "死锁" 。
  • 解决方法也很简单,就是新增加一个用于执行子任务的线程池专门为其服务。

4、给线程池命名

  • 初始化线程池的时候需要显示命名(设置线程池名称前缀),有利于定位问题。
  • 默认情况下创建的线程名字类似 pool-1-thread-n 这样的,没有业务含义,不利于我们定位问题。
  • 给线程池里的线程命名通常有下面两种方式:
    • 1、利用 guava 的 ThreadFactoryBuilder
    • 2、自己实现 ThreadFactory。
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 线程工厂,它设置线程名称,有利于我们定位问题。
 */
public final class NamingThreadFactory implements ThreadFactory {

    private final AtomicInteger threadNum = new AtomicInteger();
    private final String name;

    /**
     * 创建一个带名字的线程池生产工厂
     */
    public NamingThreadFactory(String name) {
        this.name = name;
    }

    @Override
    public Thread newThread(Runnable r) {
        Thread t = new Thread(r);
        t.setName(name + " [#" + threadNum.incrementAndGet() + "]");
        return t;
    }
}

5、正确配置线程池参数

  • CPU 密集型任务: N+1
    • CPU 密集型简单理解就是利用 CPU 计算能力的任务比如你在内存中对大量数据进行排序。
  • I/O 密集型任务:2N
    • 但凡涉及到网络读取,文件读取这类都是 IO 密集型,这类任务的特点是 CPU 计算耗费时间相比于等待 IO 操作完成的时间来说很少,大部分时间都花在了等待 IO 操作完成上。

线程数更严谨的计算的方法应该是:

  • 最佳线程数 = N(CPU 核心数)∗(1+WT(线程等待时间)/ST(线程计算时间)),
  • 其中 WT(线程等待时间)= 线程运行总时间 - ST(线程计算时间)。
  • 线程等待时间所占比例越高,需要越多线程。线程计算时间所占比例越高,需要越少线程。

6、别忘记关闭线程池

  • 当线程池不再需要使用时,应该显式地关闭线程池,释放线程资源。
  • 线程池提供了两个关闭方法:shutdown()、shutdownNow()。

7、线程池尽量不要放耗时任务

  • 线程池本身的目的是为了提高任务执行效率,避免因频繁创建和销毁线程而带来的性能开销。
  • 如果将耗时任务提交到线程池中执行,可能会导致线程池中的线程被长时间占用,无法及时响应其他任务,甚至会导致线程池崩溃或者程序假死。
  • 因此,在使用线程池时,我们应该尽量避免将耗时任务提交到线程池中执行。
  • 对于一些比较耗时的操作,如网络请求、文件读写等,可以采用 CompletableFuture 等其他异步操作的方式来处理,以避免阻塞线程池中的线程。

demo

public void fixRefreshHis() {
    int pageSize = 200; // 每页查询的数量
    int pageNum = 0;

    int maxThreads = 3;
    // new
    ExecutorService executorService = Executors.newFixedThreadPool(maxThreads);

    boolean hasMoreData = true;
    while (hasMoreData) {
        Query query = MongoUtil.getQuery("userId").with(PageRequest.of(pageNum, pageSize));
        List<String> uids = mongoReadonlyTwoDAO.find(query, User.class).stream()
            .map(x -> x.getId())
            .collect(Collectors.toList());

        // 把具体执行任务的方法(在这里是this.refreshHisBy(uids))赋值给一个 runnable function
        Runnable task = () -> this.refreshHisBy(uids);
        // 无返回值execute,若需要返回值,则使用submit
        executorService.execute(task);

        if (uids.size() < pageSize) {
            hasMoreData = false;
        } else {
            pageNum += 1;
        }
    }

    // 关闭线程池(等待所有任务完成)
    executorService.shutdown();
    try {
        if (!executorService.awaitTermination(5, TimeUnit.HOURS)) { // 可以设置一个合理的超时时间
            System.err.println("Pool did not terminate");
        }
    } catch (InterruptedException ie) {
        System.err.println("Interrupted while waiting for the pool to terminate");
        Thread.currentThread().interrupt(); // 重新抛出中断信号
    }
}

Date: 2024-09-11 Wed 13:14