彻底搞定线程池(2)-基于模型实现线程池

概述

在上一篇文章 彻底搞定线程池(1)-线程池模型的构建 中,我把线程池的主要的功能和流程给梳理了一下,并且在最后使用代码实现了一个简单的线程池。不过距离一个完整的线程池还需要多做一些东西。

本篇的代码会在上一版代码的基础上进行修改。地址为 github地址,其中steap1为上一篇的代码,steap2为本篇代码

这里先把线程池的功能罗列出来,之后一点点的把功能实现。

线程池需要提供的接口

  1. 任务的提交
  2. 温和的关闭线程池:把已提交的任务完成,但拒绝新任务提交
  3. 强制关闭:不再执行任务,并且尝试停止正在运行的任务

线程池内部需要实现的功能

  1. 线程的创建和管理
  2. 任务队列的管理
  3. 处理任务(不断从队列中获取任务来处理)
  4. 线程的闲置时间限制(可以设定闲置一定时间后停止该线程)

线程池中特殊情况的处理

  1. 饱和策略:有界的任务队列中,如果任务放满的了该如何处理
  2. 线程中断重启:在执行任务时线程被中断了,需要及时的新开一个线程来继续运行

流程图

根据线程的功能需求,将任务提交的流程画出来:
20191218-1

在线程池中,线程的运行过程也整理一下:
20191218-2

代码实现

这里把一些关键的代码实现拿出来讲讲,反正都是按照整理好的流程来写的。详细的源码在这里,看懂我这个代码,说明对线程池有足够的了解,这时候去看Java的ThreadPoolExecutor源码或相关的设计,就不会有云里雾里的感觉。

线程池的状态管理

线程池的状态通过state 变量来控制,当结束时,改变state的值即可。其他线程在运行的时候时检测状态,做出处理。

    //线程池的状态
    private volatile int state = STATE_RUNNING;

    //线程池的3种状态
    private final static int STATE_RUNNING = -1;
    private final static int STATE_SHUTDOWN = 0;
    private final static int STATE_SHUTDOWN_NOW = 2;
    
    //工作线程集合
    private volatile Set<Worker> workers;

    /**
     * 停止线程池
     * 温和的停止,拒绝提交新任务,但是已提交的任务会全部完成
     */
    public void shutdown() {
        this.state = STATE_SHUTDOWN;
    }

    /**
     * 强制停止线程池
     * 拒绝提交任务,剩下的任务不再执行,并尝试中断线程池
     */
    public void shutdownNow() {
          this.state = STATE_SHUTDOWN_NOW;
          for(Worker w:workers){
              try {
                  w.t.interrupt();
              }catch (Exception e){
                  e.printStackTrace();
              }
          }
    }

任务的提交

    /**
     * 任务提交接口
     */
    public void execute(Task task) {

        if (state > STATE_RUNNING)
            throw new RejectedExecutionException("线程池已关闭,禁止提交任务");

        if (workers.size() < poolSize) {
            addWorker(task);
        } else {
            this.queue.add(task);
        }
    }
    
      /**
     * 添加worker工作线程,并立即执行
     * 这里做个双重判定,防止并发时多创建了线程
     */
    private void addWorker(Task task) {
        mainLock.lock();
        try {
            if (workers.size() >= poolSize) {
                this.queue.add(task);
                return;
            }
            Worker w = new Worker(task);
            workers.add(w);
            w.t.start();
        } finally {
            mainLock.unlock();
        }
    }

线程工作流程、线程异常中断处理、线程闲置处理

     /**
     * 工作线程实际运行任务的方法
     */
    void runWorker(Worker worker) {
        Task task = (Task) worker.task;
        boolean completedAbruptly = false;
        try {
            while (true) {
                //线程在这个循环中不断的获取任务来执行
                // getTask() 为从队列中获取任务,如果为null,表示该线程超过闲置时间限制,停止该线程
                if (task == null) {
                    task = getTask();
                }

                if (task == null) {
                    completedAbruptly = true;
                    break;
                }

                task.run();
                task = null;
            }
            completedAbruptly = true;
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            //线程中断,做一些处理
            processWorkerExit(worker, completedAbruptly);
        }
    }

    /**
     * 线程中断,重新开启线程
     */
    private void processWorkerExit(Worker w, boolean completedAbruptly) {
        //如果是因为线程池关闭导致线程中断,不做任何处理
        if (completedAbruptly)
            return;

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }

        //判断线程池是否关闭状态
        if (state == STATE_SHUTDOWN)
            return;

        addWorker(null);
    }

    /**
     * 从队列中获取可用的线程
     */
    private Task getTask() throws InterruptedException {
        if (state == STATE_SHUTDOWN_NOW)
            return null;

        if (state == STATE_RUNNING && queue.size() <= 0) {
            return null;
        }

        //如果有闲置时间限制,使用poll方法
        //一定时间内还未获得可用任务,返回null
        if (allowThreadTimeOut) {
            return queue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
        }
        //如果队列中暂时没有任务,则线程进入阻塞状态,直到获取到任务
        return queue.take();
    }

总结

关键的代码已给出,建议看完整的源码,克隆下来,在自己电脑跑一跑。

至此一个线程池就完成了,虽然比较简陋,但重要的东西都有。代码写会简单一些,这是让大家能更好的理解线程池。完全理解这个代码时,就可以看java的线程池ThreadPoolExecutor的源码了。到时候看源码应该不会费劲了。