Java学习者论坛

 找回密码
 立即注册

QQ登录

只需一步,快速开始

手机号码,快捷登录

恭喜Java学习者论坛(https://www.javaxxz.com)已经为数万Java学习者服务超过8年了!积累会员资料超过10000G+
成为本站VIP会员,下载本站10000G+会员资源,购买链接:点击进入购买VIP会员
JAVA高级面试进阶视频教程Java架构师系统进阶VIP课程

分布式高可用全栈开发微服务教程

Go语言视频零基础入门到精通

Java架构师3期(课件+源码)

Java开发全终端实战租房项目视频教程

SpringBoot2.X入门到高级使用教程

大数据培训第六期全套视频教程

深度学习(CNN RNN GAN)算法原理

Java亿级流量电商系统视频教程

互联网架构师视频教程

年薪50万Spark2.0从入门到精通

年薪50万!人工智能学习路线教程

年薪50万!大数据从入门到精通学习路线年薪50万!机器学习入门到精通视频教程
仿小米商城类app和小程序视频教程深度学习数据分析基础到实战最新黑马javaEE2.1就业课程从 0到JVM实战高手教程 MySQL入门到精通教程
查看: 252|回复: 0

[Java线程学习]任务列表分派给多个线程的策略和方法

[复制链接]
  • TA的每日心情
    开心
    2021-3-12 23:18
  • 签到天数: 2 天

    [LV.1]初来乍到

    发表于 2014-11-3 23:59:45 | 显示全部楼层 |阅读模式
    多线程下载由来已久,如 FlashGet、NetAnts 等工具,它们都是依懒于 HTTP 协议的支持(Range 字段指定请求内容范围),首先能读取出请求内容 (即欲下载的文件) 的大小,划分出若干区块,把区块分段分发给每个线程去下载,线程从本段起始处下载数据及至段尾,多个线程下载的内容最终会写入到同一个文件中。
          只研究有用的,工作中的需求:要把多个任务分派给多个线程去执行,这其中就会有一个任务列表指派到线程的策略思考:已知:1. 一个待执行的任务列表,2. 指定要启动的线程数;问题是:每个线程实际要执行哪些任务。

          策略是:任务列表连续按线程数分段,先保证每线程平均能分配到的任务数,余下的任务从前至后依次附加到线程中--只是数量上,实际每个线程执行的任务都还是连续的。如果出现那种僧多(线程) 粥(任务) 少的情况,实际启动的线程数就等于任务数,一挑一。这里只实现了每个线程各扫自家门前雪,动作快的完成后眼见别的线程再累都是爱莫能助。  
      
       
       
         
       

         
       
      
        实现及演示代码如下:由三个类实现,写在了一个 java 文件中:TaskDistributor 为任务分发器,Task 为待执行的任务,WorkThread 为自定的工作线程。代码中运用了命令模式,如若能配以监听器,用上观察者模式来控制 UI 显示就更绝妙不过了,就能实现像下载中的区块着色跳跃的动感了,在此定义下一步的着眼点了。

    代码中有较为详细的注释,看这些注释和执行结果就很容易理解的。main() 是测试方法



    package com.unmi.common;
    import java.util.ArrayList;
    import java.util.List;
    /**
    * 指派任务列表给线程的分发器
    * @author Unmi
    * QQ: 1125535 Email: fantasia@sina.com
    * MSN: kypfos@msn.com 2008-03-25
    */
    public class TaskDistributor {
            /**
             * 测试方法
             * @param args
             */
            public static void main(String[] args) {
                    //初始化要执行的任务列表
                    List<Task> taskList = new ArrayList<Task>();
                    for (int i = 0; i < 108; i++) {
                            taskList.add(new Task(i));
                    }
                   
                    //设定要启动的工作线程数为 5 个
                    int threadCount = 5;
                    List<Task>[] taskListPerThread = distributeTasks(taskList, threadCount);
                    System.out.println("实际要启动的工作线程数:"+taskListPerThread.length);
                    for (int i = 0; i < taskListPerThread.length; i++) {
                            Thread workThread = new WorkThread(taskListPerThread,i);
                            workThread.start();
                    }
            }
            /**
             * 把 List 中的任务分配给每个线程,先平均分配,剩于的依次附加给前面的线程
             * 返回的数组有多少个元素 (List<Task>) 就表明将启动多少个工作线程
             * @param taskList 待分派的任务列表
             * @param threadCount   线程数
             * @return 列表的数组,每个元素中存有该线程要执行的任务列表
             */
            public static List<Task>[] distributeTasks(List<Task> taskList, int threadCount) {
                    // 每个线程至少要执行的任务数,假如不为零则表示每个线程都会分配到任务
                    int minTaskCount = taskList.size() / threadCount;
                    // 平均分配后还剩下的任务数,不为零则还有任务依个附加到前面的线程中
                    int remainTaskCount = taskList.size() % threadCount;
                    // 实际要启动的线程数,如果工作线程比任务还多
                    // 自然只需要启动与任务相同个数的工作线程,一对一的执行
                    // 毕竟不打算实现了线程池,所以用不着预先初始化好休眠的线程
                    int actualThreadCount = minTaskCount > 0 ? threadCount : remainTaskCount;
                    // 要启动的线程数组,以及每个线程要执行的任务列表
                    List<Task>[] taskListPerThread = new List[actualThreadCount];
                    int taskIndex = 0;
                   
                    //平均分配后多余任务,每附加给一个线程后的剩余数,重新声明与 remainTaskCount
                    //相同的变量,不然会在执行中改变 remainTaskCount 原有值,产生麻烦
                    int remainIndces = remainTaskCount;
                    for (int i = 0; i < taskListPerThread.length; i++) {
                            taskListPerThread = new ArrayList<Task>();
                            // 如果大于零,线程要分配到基本的任务
                            if (minTaskCount > 0) {
                                    for (int j = taskIndex; j < minTaskCount + taskIndex; j++) {
                                            taskListPerThread.add(taskList.get(j));
                                    }
                                    taskIndex += minTaskCount;
                            }
                            // 假如还有剩下的,则补一个到这个线程中
                            if (remainIndces > 0) {
                                    taskListPerThread.add(taskList.get(taskIndex++));
                                    remainIndces--;
                            }
                    }
                    // 打印任务的分配情况
                    for (int i = 0; i < taskListPerThread.length; i++) {
                            System.out.println("线程 " + i + " 的任务数:" + <br>
                    taskListPerThread.size() + "  区间["
                                            + taskListPerThread.get(0).getTaskId() + ","
                    + taskListPerThread.get(taskListPerThread.size() - 1).getTaskId() + "]");
                    }
                    return taskListPerThread;
            }
    }
    /**
    * 要执行的任务,可在执行时改变它的某个状态或调用它的某个操作
    * 例如任务有三个状态,就绪,运行,完成,默认为就绪态
    * 要进一步完善,可为 Task 加上状态变迁的监听器,因之决定UI的显示
    */
    class Task {
            public static final int READY = 0;
            public static final int RUNNING = 1;
            public static final int FINISHED = 2;
            private int status;
            //声明一个任务的自有业务含义的变量,用于标识任务
            private int taskId;
           
            //任务的初始化方法
            public Task(int taskId){
                    this.status = READY;
                    this.taskId = taskId;
            }
           
            /**
             * 执行任务
             */
            public void execute() {
                    // 设置状态为运行中
                    setStatus(Task.RUNNING);
                    System.out.println("当前线程 ID 是:" + Thread.currentThread().getName()
                                    +" | 任务 ID 是:"+this.taskId);
                    // 附加一个延时
                    try {
                            Thread.sleep(1000);
                    } catch (InterruptedException e) {
                            e.printStackTrace();
                    }
                    // 执行完成,改状态为完成
                    setStatus(FINISHED);
            }
            public void setStatus(int status) {
                    this.status = status;
            }
            public int getTaskId() {
                    return taskId;
            }
    }
    /**
    * 自定义的工作线程,持有分派给它执行的任务列表
    */
    class WorkThread extends Thread {
           
            //本线程待执行的任务列表,你也可以指为任务索引的起始值
            private List<Task> taskList = null;
            private int threadId;
           
            /**
             * 构造工作线程,为其指派任务列表,及命名线程 ID
             * @param taskList 欲执行的任务列表
             * @param threadId 线程 ID
             */
            public WorkThread(List<Task> taskList,int threadId) {
                    this.taskList = taskList;
                    this.threadId = threadId;
            }
            /**
             * 执行被指派的所有任务
             */
            public void run() {
                    for (Task task : taskList) {
                            task.execute();
                    }
            }
    }

    执行结果如下,注意观察每个线程分配到的任务数量及区间。直到所有的线程完成了所分配到的任务后程序结束:



    线程 0 的任务数:22  区间[0,21]

    线程 1 的任务数:22  区间[22,43]

    线程 2 的任务数:22  区间[44,65]

    线程 3 的任务数:21  区间[66,86]

    线程 4 的任务数:21  区间[87,107]

    实际要启动的工作线程数:5

    当前线程 ID 是:Thread-0 | 任务 ID 是:0

    当前线程 ID 是:Thread-1 | 任务 ID 是:22

    当前线程 ID 是:Thread-2 | 任务 ID 是:44

    当前线程 ID 是:Thread-3 | 任务 ID 是:66

    当前线程 ID 是:Thread-4 | 任务 ID 是:87

    当前线程 ID 是:Thread-0 | 任务 ID 是:1

    当前线程 ID 是:Thread-1 | 任务 ID 是:23

    当前线程 ID 是:Thread-2 | 任务 ID 是:45

    ...........................................................................



    上面坦白来只算是基本功夫,贴出来还真见笑了。还有更为复杂的功能.



    像多线程的下载工具的确更充分利用了网络资源,而且像 FlashGet、NetAnts 都实现了:假如某个线程下载完了欲先所分配段的内容之后,会帮其他线程下载未完成数据,直到任务完成;或某一下载线程的未完成段区间已经很小了,用不着别人来帮忙时,这就涉及到任务的进一步分配。再如,以上两个工具都能动态增加、减小或中止线程,越说越复杂了,它们原本比这复杂多了,这些实现可能定义各种队列来实现,如未完成任务队列、下载中任务队列和已完成队列。难以细究了。






      
       
       
         
         

          
         

          
         
       
      
    复制代码


    源码下载:http://file.javaxxz.com/2014/11/3/235944937.zip
    回复

    使用道具 举报

    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    QQ|手机版|Java学习者论坛 ( 声明:本站资料整理自互联网,用于Java学习者交流学习使用,对资料版权不负任何法律责任,若有侵权请及时联系客服屏蔽删除 )

    GMT+8, 2025-2-25 13:54 , Processed in 0.365329 second(s), 46 queries .

    Powered by Discuz! X3.4

    © 2001-2017 Comsenz Inc.

    快速回复 返回顶部 返回列表