彻底搞定线程池(1)-线程池模型的构建

概述

做后端的应该都知道线程池,即使你没亲自使用过,那也一定听过或者了解过。有时候也会去深入理解,结果往往是当时觉得自己理解了,过一段时间就忘了。因为在日常的开发中,我们都不需要用到线程池,很多都是使用的工具和框架写好,我们直接调接口就完事了。

很多东西没有亲自实践和深入的思考过的,单单看文章和书籍是不可能真正的理解的。以前我看了好多次线程池相关的文章,然后过个半年忘得差不多,又重新看,结果还是没有真正的理解。所以,我就打算动手实践一番,既然平时开发的时候用不到,那我就自己做一个项目来用上。

说做就做,我就选了一个Redis分布式锁作为练手项目,里面有一个定时续期的功能,就是使用线程池定时的运行提交的任务,将key续期,详细的不说了,想了解可以看这个 Redis分布式锁的实现-Redisson

在实现Redis续期功能的时候,一边看别人定时任务怎么实现的,一边看线程池的源码。这时候我仿佛打开了新世界的大门,彻底理解了线程池运行逻辑,也了解了一些线程池设计的艺术。

接下来我想以一个设计者的角度,带领大家从零去设计和实现一个线程池,一步一个脚印,彻底的理解线程池的实现以及一些设计的艺术。

线程池出现的目的和意义

我们要明白,任何技术都是为了解决问题而出现的。那么线程池的出现解决了什么问题呢,答案是:解决了线程资源复用的问题

如果没有线程池,我们处理一个任务就要新开一个线程去执行,当任务完成时,该线程就停止了。如果说这个任务是重复的,总不能来一个就新建一个线程吧,多浪费,而且线程是稀缺资源,重复的创建销毁很耗时间。

有了线程池,我们就可以建立几个固定线程。有任务来了唤醒闲置的线程去处理,任务处理完成后继续处理后续的任务,如果暂时没有任务,可以将线程休眠,有新任务时再唤醒。这样一来就可以更高效的利用线程资源,提高系统并发效率。

任务:抽象的工作单元

在程序中,都是围绕着任务执行来构造的,任务通常是一些抽象的工作单元。比如可以把一个 http请求 当做是一个任务,把一次与数据库的交互当做任务等等。在线程池中,我们把要处理的东西抽象成一个任务单元, 这样可以简化线程池的结构,以此更好的构建线程池的模型。

线程:抽象的工作者

在线程池中,我们可以把每一个线程当做是一个worker,即"工人"的意思。它会不断的尝试获得任务来执行,如果没有任务,则休眠或者做其他处理。

线程池的功能设计

那么,线程池通常要具备和提供什么功能呢,这里把核心的功能需求给罗列一下:

线程池的开启和关闭

线程池作为一个工具,需要有自己的生命周期,可以抽象成三个:

  • 开启状态
  • 运行状态
  • 结束状态

其中结束状态下线程池的处理和考虑的东西要多一些,执行完线程池的关闭接口后:

  • 正在运行的任务怎么处理?
  • 在任务队列的任务要怎么处理?
  • 此时线程池是否还能继续添加任务?

这些东西都是要考虑的并且去处理的。在Java的ExecutorService 提供了两个关闭接口

  • shutdown : 有序的关闭,已提交的任务会被逐一处理,但不会接受任何新任务
  • shutdownNow : 尝试停止所有正在执行的任务,放弃在队列中等待的任务,并返回正在等待执行的任务列表

线程的构建和管理

线程池里线程该怎么构建,构建完后怎么管理,是固定的几个还是动态的构建。这里给出几个模式:

  1. 固定的线程数量 :在线程池启动时就构建固定数量的线程池,且不会关闭任何线程
  2. 动态构建线程 :启动时不新建任何线程,当有任务来临时才会去创建线程。如果任务比较少,则不会继续新建线程,如果任务比较多,则继续构建线程数,直到数量达到最大值。
  3. 有闲置期限的线程 :线程在构建时会有一个闲置的期限,当闲置的时间超过期限时,该线程就会进行回收处理。这个在数据库连接池比较常用到
  4. 单个线程 :只有一个线程,任务按提交的时间顺序执行。

任务管理

在线程池中,会建立一个任务队列,当没有空闲线程时,新来的任务会放到队列中,等待线程执行。

线程池要提供任务执行的接口。

另外,很多任务都会将处理结果作为返回值的,这时任务要有一个完成后的处理机制,在任务完成时做某些操作。(这里就要涉及到FutureTask相关概念了)

任务相关的功能如下:

  • 任务的提交
  • 任务处理结果
  • 任务的取消和中断

线程池模型的构建

梳理了线程池的一些基本功能和要考虑的点,那么线程池的执行过程是怎样,要怎么设计呢。废话不说,直接上图:
20191217-1

当有新任务时查看是否有空闲线程,如果有,直接处理,如果没有则放到任务队列中,等待线程处理。

其实梳理一下线程池,可以发现它的逻辑并不复杂,复杂的是各种情况的处理,比如线程怎么管理,任务取消怎么处理,线程中断如何处理等等,还有各种并发操作的处理。

使用代码实现简易的线程池

接下来实现一个固定数量的线程池,当有任务提交时

线程池要提供的接口

  • 任务的提交

线程池内部要实现的功能

  • 任务队列的实现
  • 线程管理

咱们暂时将线程池的核心功能简单的实现,了解线程池的执行逻辑,其他的之后慢慢添加。

创建任务单元

首先将任务单元给实现了,直接实现Runnable 接口即可。

当然,可以不实现 Runnable 接口,随便写一个类,给一个执行接口,但是呢这样线程池就不够通用了,还是直接实现Runnable接口,往后任意实现该接口的任务都可以交给线程池执行。

static class Task implements Runnable{
    
        private int tag;

        public Task(int tag){
            this.tag = tag;
        }

        @Override
        public void run() {
            System.out.printf("任务 %d 开始执行 \n",tag);
            System.out.printf("任务 %d 执行中 \n",tag);
            System.out.printf("任务 %d 执行结束\n",tag);
        }
}

线程池的实现

详细的说明在注释中,看注释就可以了

package steap1;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.locks.ReentrantLock;

public class ThreadPoolExecutor {

    //工作线程数组
    private Worker[] workers;

    //任务阻塞队列,是线程安全的,里面每个操作都会加锁处理
    private BlockingQueue<Task> queue;

    // 当前工作线程的数量
    private int workerSize = 0;

    //线程池最大的工作线程数量
    private int poolSize;

    public ThreadPoolExecutor(int poolSize, BlockingQueue<Task> queue) {
        this.poolSize = poolSize;
        this.workers = new Worker[poolSize];
        this.queue = queue;
    }

    public void execute(Task task) {
        //如果线程池的线程数量小于最大值,则添加线程
        //否则将任务放入队列中
        if (workerSize < poolSize) {
            addWorker(task);
        } else {
            this.queue.add(task);
        }
    }

    //添加worker工作线程,并立即执行
    private synchronized void addWorker(Task task) {
        //这里做个双重判定,判定线程数量是否小于最大值
        if (workerSize >= poolSize) {
            this.queue.add(task);
            return;
        }

        //构建worker,并启动线程
        workers[workerSize] = new Worker(task);
        workers[workerSize].t.start();

        workerSize++;
    }

    //实际运行的代码
    void runWorker(Worker worker){
        Task task =(Task) worker.task;
        try {
            while (true){
                //线程在这个循环中不断的获取任务来执行
                // queue.task() 方法是一个线程安全的阻塞方法
                //如果队列没有任务,那么所有工作线程都会在这里阻塞,等待获取可用的任务
                if(task == null){
                    task = this.queue.take();
                }
                task.run();
                task = null;
            }
        }catch (InterruptedException e){
            e.printStackTrace();
        }
    }
    
    //工作线程包装类
    private class Worker implements Runnable {
        private Runnable task;

        final Thread t;

        public Worker(Runnable task) {
            this.task = task;
            this.t = new Thread(this);
        }

        @Override
        public void run() {
            runWorker(this);
        }
    }

    //任务类
    static class Task implements Runnable {

        private int tag;

        public Task(int tag) {
            this.tag = tag;
        }

        @Override
        public void run() {
            System.out.printf("任务 %d 开始执行 \n", tag);
            System.out.printf("任务 %d 执行中 \n", tag);
            System.out.printf("任务 %d 执行结束\n", tag);
        }
    }
}

简单的使用

    public static void main(String[] args){
        ThreadPoolExecutor executor = new ThreadPoolExecutor(8,new LinkedBlockingQueue<>());
        for(int i=0;i<1000;i++){
            executor.execute(new ThreadPoolExecutor.Task(i));
        }
    }

执行结果

任务 923 开始执行 
任务 923 执行中 
任务 923 执行结束
任务 912 开始执行 
任务 912 执行中 
任务 912 执行结束

总结

至此,一个简单的线程池就编写完毕,线程池主要的功能都实现了,整个执行过程也进行了详细的描述。

其实这里还有很多东西没写上,线程的生命周期管理,任务的取消和线程的中断等等,这些东西在下一篇章完善吧。