Java学习者论坛

 找回密码
 立即注册

QQ登录

只需一步,快速开始

手机号码,快捷登录

恭喜Java学习者论坛(https://www.javaxxz.com)已经为数万Java学习者服务超过8年了!积累会员资料超过10000G+
成为本站VIP会员,下载本站10000G+会员资源,购买链接:点击进入购买VIP会员
JAVA高级面试进阶视频教程Java架构师系统进阶VIP课程

分布式高可用全栈开发微服务教程

Go语言视频零基础入门到精通

Java架构师3期(课件+源码)

Java开发全终端实战租房项目视频教程

SpringBoot2.X入门到高级使用教程

大数据培训第六期全套视频教程

深度学习(CNN RNN GAN)算法原理

Java亿级流量电商系统视频教程

互联网架构师视频教程

年薪50万Spark2.0从入门到精通

年薪50万!人工智能学习路线教程

年薪50万!大数据从入门到精通学习路线年薪50万!机器学习入门到精通视频教程
仿小米商城类app和小程序视频教程深度学习数据分析基础到实战最新黑马javaEE2.1就业课程从 0到JVM实战高手教程 MySQL入门到精通教程
查看: 308|回复: 0

[Java线程学习]Java 线程池的原理与实现

[复制链接]
  • TA的每日心情
    开心
    2021-3-12 23:18
  • 签到天数: 2 天

    [LV.1]初来乍到

    发表于 2014-11-3 23:59:18 | 显示全部楼层 |阅读模式
    线程池:
         多线程技术主要解决处理器单元内多个线程执行的问题,它可以显著减少处理器单元的闲置时间,增加处理器单元的吞吐能力。
         
         假设一个服务器完成一项任务所需时间为:T1 创建线程时间,T2 在线程中执行任务的时间,T3 销毁线程时间。
         
         如果:T1 + T3 远大于 T2,则可以采用线程池,以提高服务器性能。
                     一个线程池包括以下四个基本组成部分:
                     1、线程池管理器(ThreadPool):用于创建并管理线程池,包括 创建线程池,销毁线程池,添加新任务;
                     2、工作线程(PoolWorker):线程池中线程,在没有任务时处于等待状态,可以循环的执行任务;
                     3、任务接口(Task):每个任务必须实现的接口,以供工作线程调度任务的执行,它主要规定了任务的入口,任务执行完后的收尾工作,任务的执行状态等;
                     4、任务队列(taskQueue):用于存放没有处理的任务。提供一种缓冲机制。
                     
         线程池技术正是关注如何缩短或调整T1,T3时间的技术,从而提高服务器程序性能的。它把T1,T3分别安排在服务器程序的启动和结束的时间段或者一些空闲的时间段,这样在服务器程序处理客户请求时,不会有T1,T3的开销了。

         线程池不仅调整T1,T3产生的时间段,而且它还显著减少了创建线程的数目,看一个例子:

         假设一个服务器一天要处理50000个请求,并且每个请求需要一个单独的线程完成。在线程池中,线程数一般是固定的,所以产生线程总数不会超过线程池中线程的数目,而如果服务器不利用线程池来处理这些请求则线程总数为50000。一般线程池大小是远小于50000。所以利用线程池的服务器程序不会为了创建50000而在处理请求时浪费时间,从而提高效率。
    ------------------------------------------------------------------------------
    好了,废话就到这里了,下面就是程序了,我也不讲解了,注释已经很清晰了:

    /** 线程池类,工作线程作为其内部类 **/

    package org.ymcn.util;

    import java.util.Collections;
    import java.util.Date;
    import java.util.LinkedList;
    import java.util.List;

    import org.apache.log4j.Logger;

    /**
    * 线程池
    * 创建线程池,销毁线程池,添加新任务
    *
    * @author obullxl
    */
    public final class ThreadPool {
         private static Logger logger = Logger.getLogger(ThreadPool.class);
         private static Logger taskLogger = Logger.getLogger("TaskLogger");

         private static boolean debug = taskLogger.isDebugEnabled();
         // private static boolean debug = taskLogger.isInfoEnabled();
         /* 单例 */
         private static ThreadPool instance = ThreadPool.getInstance();

         public static final int SYSTEM_BUSY_TASK_COUNT = 150;
         /* 默认池中线程数 */
         public static int worker_num = 5;
         /* 已经处理的任务数 */
         private static int taskCounter = 0;

         public static boolean systemIsBusy = false;

         private static List<Task> taskQueue = Collections
                 .synchronizedList(new LinkedList<Task>());
         /* 池中的所有线程 */
         public PoolWorker[] workers;

         private ThreadPool() {
             workers = new PoolWorker[5];
             for (int i = 0; i < workers.length; i++) {
                 workers = new PoolWorker(i);
             }
         }

         private ThreadPool(int pool_worker_num) {
             worker_num = pool_worker_num;
             workers = new PoolWorker[worker_num];
             for (int i = 0; i < workers.length; i++) {
                 workers = new PoolWorker(i);
             }
         }

         public static synchronized ThreadPool getInstance() {
             if (instance == null)
                 return new ThreadPool();
             return instance;
         }
         /**
         * 增加新的任务
         * 每增加一个新任务,都要唤醒任务队列
         * @param newTask
         */
         public void addTask(Task newTask) {
             synchronized (taskQueue) {
                 newTask.setTaskId(++taskCounter);
                 newTask.setSubmitTime(new Date());
                 taskQueue.add(newTask);
                 /* 唤醒队列, 开始执行 */
                 taskQueue.notifyAll();
             }
             logger.info("Submit Task<" + newTask.getTaskId() + ">: "
                     + newTask.info());
         }
         /**
         * 批量增加新任务
         * @param taskes
         */
         public void batchAddTask(Task[] taskes) {
             if (taskes == null || taskes.length == 0) {
                 return;
             }
             synchronized (taskQueue) {
                 for (int i = 0; i < taskes.length; i++) {
                     if (taskes == null) {
                         continue;
                     }
                     taskes.setTaskId(++taskCounter);
                     taskes.setSubmitTime(new Date());
                     taskQueue.add(taskes);
                 }
                 /* 唤醒队列, 开始执行 */
                 taskQueue.notifyAll();
             }
             for (int i = 0; i < taskes.length; i++) {
                 if (taskes == null) {
                     continue;
                 }
                 logger.info("Submit Task<" + taskes.getTaskId() + ">: "
                         + taskes.info());
             }
         }
         /**
         * 线程池信息
         * @return
         */
         public String getInfo() {
             StringBuffer sb = new StringBuffer();
             sb.append("
    Task Queue Size:" + taskQueue.size());
             for (int i = 0; i < workers.length; i++) {
                 sb.append("
    Worker " + i + " is "
                         + ((workers.isWaiting()) ? "Waiting." : "Running."));
             }
             return sb.toString();
         }
         /**
         * 销毁线程池
         */
         public synchronized void destroy() {
             for (int i = 0; i < worker_num; i++) {
                 workers.stopWorker();
                 workers = null;
             }
             taskQueue.clear();
         }

         /**
         * 池中工作线程
         *
         * @author obullxl
         */
         private class PoolWorker extends Thread {
             private int index = -1;
             /* 该工作线程是否有效 */
             private boolean isRunning = true;
             /* 该工作线程是否可以执行新任务 */
             private boolean isWaiting = true;

             public PoolWorker(int index) {
                 this.index = index;
                 start();
             }

             public void stopWorker() {
                 this.isRunning = false;
             }

             public boolean isWaiting() {
                 return this.isWaiting;
             }
             /**
             * 循环执行任务
             * 这也许是线程池的关键所在
             */
             public void run() {
                 while (isRunning) {
                     Task r = null;
                     synchronized (taskQueue) {
                         while (taskQueue.isEmpty()) {
                             try {
                                 /* 任务队列为空,则等待有新任务加入从而被唤醒 */
                                 taskQueue.wait(20);
                             } catch (InterruptedException ie) {
                                 logger.error(ie);
                             }
                         }
                         /* 取出任务执行 */
                         r = (Task) taskQueue.remove(0);
                     }
                     if (r != null) {
                         isWaiting = false;
                         try {
                             if (debug) {
                                 r.setBeginExceuteTime(new Date());
                                 taskLogger.debug("Worker<" + index
                                         + "> start execute Task<" + r.getTaskId() + ">");
                                 if (r.getBeginExceuteTime().getTime()
                                         - r.getSubmitTime().getTime() > 1000)
                                     taskLogger.debug("longer waiting time. "
                                             + r.info() + ",<" + index + ">,time:"
                                             + (r.getFinishTime().getTime() - r
                                                     .getBeginExceuteTime().getTime()));
                             }
                             /* 该任务是否需要立即执行 */
                             if (r.needExecuteImmediate()) {
                                 new Thread(r).start();
                             } else {
                                 r.run();
                             }
                             if (debug) {
                                 r.setFinishTime(new Date());
                                 taskLogger.debug("Worker<" + index
                                         + "> finish task<" + r.getTaskId() + ">");
                                 if (r.getFinishTime().getTime()
                                         - r.getBeginExceuteTime().getTime() > 1000)
                                     taskLogger.debug("longer execution time. "
                                             + r.info() + ",<" + index + ">,time:"
                                             + (r.getFinishTime().getTime() - r
                                                     .getBeginExceuteTime().getTime()));
                             }
                         } catch (Exception e) {
                             e.printStackTrace();
                             logger.error(e);
                         }
                         isWaiting = true;
                         r = null;
                     }
                 }
             }
         }
    }

    /** 任务接口类 **/

    package org.ymcn.util;

    import java.util.Date;

    /**
    * 所有任务接口
    * 其他任务必须继承访类
    *
    * @author obullxl
    */
    public abstract class Task implements Runnable {
         // private static Logger logger = Logger.getLogger(Task.class);
         /* 产生时间 */
         private Date generateTime = null;
         /* 提交执行时间 */
         private Date submitTime = null;
         /* 开始执行时间 */
         private Date beginExceuteTime = null;
         /* 执行完成时间 */
         private Date finishTime = null;

         private long taskId;

         public Task() {
             this.generateTime = new Date();
         }

         /**
         * 任务执行入口
         */
         public void run() {
             /**
             * 相关执行代码
             *
             * beginTransaction();
             *
             * 执行过程中可能产生新的任务 subtask = taskCore();
             *
             * commitTransaction();
             *
             * 增加新产生的任务 ThreadPool.getInstance().batchAddTask(taskCore());
             */
         }

         /**
         * 所有任务的核心 所以特别的业务逻辑执行之处
         *
         * @throws Exception
         */
         public abstract Task[] taskCore() throws Exception;

         /**
         * 是否用到数据库
         *
         * @return
         */
         protected abstract boolean useDb();

         /**
         * 是否需要立即执行
         *
         * @return
         */
         protected abstract boolean needExecuteImmediate();

         /**
         * 任务信息
         *
         * @return String
         */
         public abstract String info();

         public Date getGenerateTime() {
             return generateTime;
         }

         public Date getBeginExceuteTime() {
             return beginExceuteTime;
         }

         public void setBeginExceuteTime(Date beginExceuteTime) {
             this.beginExceuteTime = beginExceuteTime;
         }

         public Date getFinishTime() {
             return finishTime;
         }

         public void setFinishTime(Date finishTime) {
             this.finishTime = finishTime;
         }

         public Date getSubmitTime() {
             return submitTime;
         }

         public void setSubmitTime(Date submitTime) {
             this.submitTime = submitTime;
         }

         public long getTaskId() {
             return taskId;
         }

         public void setTaskId(long taskId) {
             this.taskId = taskId;
         }

    }

       
         
         
          
          

            
          

            
          
         
       

      


    源码下载:http://file.javaxxz.com/2014/11/3/235918000.zip
    回复

    使用道具 举报

    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    QQ|手机版|Java学习者论坛 ( 声明:本站资料整理自互联网,用于Java学习者交流学习使用,对资料版权不负任何法律责任,若有侵权请及时联系客服屏蔽删除 )

    GMT+8, 2025-2-25 13:56 , Processed in 0.364217 second(s), 48 queries .

    Powered by Discuz! X3.4

    © 2001-2017 Comsenz Inc.

    快速回复 返回顶部 返回列表