Java学习者论坛

 找回密码
 立即注册

QQ登录

只需一步,快速开始

手机号码,快捷登录

恭喜Java学习者论坛(https://www.javaxxz.com)已经为数万Java学习者服务超过8年了!积累会员资料超过10000G+
成为本站VIP会员,下载本站10000G+会员资源,购买链接:点击进入购买VIP会员
JAVA高级面试进阶视频教程Java架构师系统进阶VIP课程

分布式高可用全栈开发微服务教程

Go语言视频零基础入门到精通

Java架构师3期(课件+源码)

Java开发全终端实战租房项目视频教程

SpringBoot2.X入门到高级使用教程

大数据培训第六期全套视频教程

深度学习(CNN RNN GAN)算法原理

Java亿级流量电商系统视频教程

互联网架构师视频教程

年薪50万Spark2.0从入门到精通

年薪50万!人工智能学习路线教程

年薪50万!大数据从入门到精通学习路线年薪50万!机器学习入门到精通视频教程
仿小米商城类app和小程序视频教程深度学习数据分析基础到实战最新黑马javaEE2.1就业课程从 0到JVM实战高手教程 MySQL入门到精通教程
查看: 257|回复: 0

[Java线程学习]Java5线程并发库实践

[复制链接]
  • TA的每日心情
    开心
    2021-3-12 23:18
  • 签到天数: 2 天

    [LV.1]初来乍到

    发表于 2014-11-3 23:59:17 | 显示全部楼层 |阅读模式
        java5增加了新的类库并发集java.util.concurrent,该类库为并发程序提供了丰富的API多线程编程在Java 5中更加容易,灵活。本文通过一个网络服务器模型,来实践Java5的多线程编程,该模型中使用了Java5中的线程池,阻塞队列,可重入锁等,还实践了Callable, Future等接口,并使用了Java 5的另外一个新特性泛型。
            本文将实现一个网络服务器模型,一旦有客户端连接到该服务器,则启动一个新线程为该连接服务,服务内容为往客户端输送一些字符信息。一个典型的网络服务器模型如下: 1. 建立监听端口。 2. 发现有新连接,接受连接,启动线程,执行服务线程。 3. 服务完毕,关闭线程。        这个模型在大部分情况下运行良好,但是需要频繁的处理用户请求而每次请求需要的服务又是简短的时候,系统会将大量的时间花费在线程的创建销毁。Java 5的线程池克服了这些缺点。通过对重用线程来执行多个任务,避免了频繁线程的创建与销毁开销,使得服务器的性能方面得到很大提高。因此,本文的网络服务器模型将如下: 1. 建立监听端口,创建线程池。 2. 发现有新连接,使用线程池来执行服务任务。 3. 服务完毕,释放线程到线程池。 下面详细介绍如何使用Java 5的concurrent包提供的API来实现该服务器。  初始化       初始化包括创建线程池以及初始化监听端口。创建线程池可以通过调用java.util.concurrent.Executors类里的静态方法newChahedThreadPool或是newFixedThreadPool来创建,也可以通过新建一个java.util.concurrent.ThreadPoolExecutor实例来执行任务。这里我们采用newFixedThreadPool方法来建立线程池。 ExecutorService pool = Executors.newFixedThreadPool(10);
    表示新建了一个线程池,线程池里面有10个线程为任务队列服务。 使用ServerSocket对象来初始化监听端口。 private static final int PORT = 19527;
    serverListenSocket = new ServerSocket(PORT);
    serverListenSocket.setReuseAddress(true);
    serverListenSocket.setReuseAddress(true);  服务新连接 当有新连接建立时,accept返回时,将服务任务提交给线程池执行。 while(true){
       Socket socket = serverListenSocket.accept();
       pool.execute(new ServiceThread(socket));
    } 这里使用线程池对象来执行线程,减少了每次线程创建和销毁的开销。任务执行完毕,线程释放到线程池。  服务任务       服务线程ServiceThread维护一个count来记录服务线程被调用的次数。每当服务任务被调用一次时,count的值自增1,因此ServiceThread提供一个increaseCount和getCount的方法,分别将count值自增1和取得该count值。由于可能多个线程存在竞争,同时访问count,因此需要加锁机制,在Java 5之前,我们只能使用synchronized来锁定。Java 5中引入了性能更加粒度更细的重入锁ReentrantLock。我们使用ReentrantLock保证代码线程安全。下面是具体代码: private static ReentrantLock lock = new ReentrantLock ();
    private static int count = 0;
    private int getCount(){
    int ret = 0;
    try{
       lock.lock();
       ret = count;
    }finally{
       lock.unlock();
    }
       return ret;
    }
    private void increaseCount(){
    try{
       lock.lock();
       ++count;
    }finally{
       lock.unlock();
      }
    } 服务线程在开始给客户端打印一个欢迎信息, increaseCount();
    int curCount = getCount();
    helloString = "hello, id = " + curCount+"
    ";
    dos = new DataOutputStream(connectedSocket.getOutputStream());
    dos.write(helloString.getBytes());
    然后使用ExecutorService的submit方法提交一个Callable的任务,返回一个Future接口的引用。这种做法对费时的任务非常有效,submit任务之后可以继续执行下面的代码,然后在适当的位置可以使用Future的get方法来获取结果,如果这时候该方法已经执行完毕,则无需等待即可获得结果,如果还在执行,则等待到运行完毕。
    ExecutorService executor = Executors.newSingleThreadExecutor();
    Future <String> future = executor.submit(new TimeConsumingTask());
    dos.write("let"s do soemthing other".getBytes());
    String result = future.get();
    dos.write(result.getBytes());

    其中TimeConsumingTask实现了Callable接口
    class TimeConsumingTask implements Callable <String>{
    public String call() throws Exception {
       System.out.println ("It"s a time-consuming task, you"d better retrieve your result in the furture");
       return "ok, here"s the result: It takes me lots of time to produce this result";
    }
    }   这里使用了Java 5的另外一个新特性泛型,声明TimeConsumingTask的时候使用了String做为类型参数。必须实现Callable接口的call函数,其作用类似与Runnable中的run函数,在call函数里写入要执行的代码,其返回值类型等同于在类声明中传入的类型值。在这段程序中,我们提交了一个Callable的任务,然后程序不会堵塞,而是继续执行dos.write("let"s do soemthing other".getBytes());当程序执行到String result = future.get()时如果call函数已经执行完毕,则取得返回值,如果还在执行,则等待其执行完毕。   服务器端的完整实现 服务器端的完整实现代码如下: package com.andrew;
    import java.io.DataOutputStream;
    import java.io.IOException;
    import java.io.Serializable;
    import java.net.ServerSocket;
    import java.net.Socket;
    import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.Callable;
    import java.util.concurrent.ExecutionException;
    import java.util.concurrent.ExecutorService;
    import java.util.concurrent.Executors;
    import java.util.concurrent.Future;
    import java.util.concurrent.RejectedExecutionHandler;
    import java.util.concurrent.ThreadPoolExecutor;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.locks.ReentrantLock;
    public class Server {
    private static int produceTaskSleepTime = 100;
    private static int consumeTaskSleepTime = 1200;
    private static int produceTaskMaxNumber = 100;
    private static final int CORE_POOL_SIZE = 2;
    private static final int MAX_POOL_SIZE = 100;
    private static final int KEEPALIVE_TIME = 3;
    private static final int QUEUE_CAPACITY = (CORE_POOL_SIZE + MAX_POOL_SIZE) / 2;
    private static final TimeUnit TIME_UNIT = TimeUnit.SECONDS;
    private static final String HOST = "127.0.0.1";
    private static final int PORT = 19527;
    private BlockingQueue<Runnable> workQueue = new ArrayBlockingQueue<Runnable>(
    QUEUE_CAPACITY);
    //private ThreadPoolExecutor serverThreadPool = null;

    private ExecutorService pool = null;
    private RejectedExecutionHandler rejectedExecutionHandler = new
    ThreadPoolExecutor.DiscardOldestPolicy();
    private ServerSocket serverListenSocket = null;
    private int times = 5;
    public void start() {
       // You can also init thread pool in this way.
       /*serverThreadPool = new ThreadPoolExecutor(CORE_POOL_SIZE,
       MAX_POOL_SIZE, KEEPALIVE_TIME, TIME_UNIT, workQueue,
       rejectedExecutionHandler);*/
    pool = Executors.newFixedThreadPool(10);
    try {
       serverListenSocket = new ServerSocket(PORT);
       serverListenSocket.setReuseAddress(true);
       System.out.println("I"m listening");
       while (times-- > 0) {
        Socket socket = serverListenSocket.accept();
       String welcomeString = "hello";
        //serverThreadPool.execute(new ServiceThread(socket, welcomeString));
       pool.execute(new ServiceThread(socket));
    }
    } catch (IOException e) {
       // TODO Auto-generated catch block
       e.printStackTrace();
    }
       cleanup();
    }

    public void cleanup() {
       if (null != serverListenSocket) {
       try {
        serverListenSocket.close();
       } catch (IOException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
    }
    //serverThreadPool.shutdown();
       pool.shutdown();
    }

    public static void main(String args[]) {
         Server server = new Server();
         server.start();
       }
    }

    class ServiceThread implements Runnable, Serializable {
    private static final long serialVersionUID = 0;
    private Socket connectedSocket = null;
    private String helloString = null;
    private static int count = 0;
    private static ReentrantLock lock = new ReentrantLock();
    ServiceThread(Socket socket) {
    connectedSocket = socket;
    }

    public void run() {
       increaseCount();
       int curCount = getCount();
       helloString = "hello, id = " + curCount + "
    ";
       ExecutorService executor = Executors.newSingleThreadExecutor();
       Future<String> future = executor.submit(new TimeConsumingTask());
       DataOutputStream dos = null;
    try {
       dos = new DataOutputStream(connectedSocket.getOutputStream());
       dos.write(helloString.getBytes());
       try {
         dos.write("let"s do soemthing other.
    ".getBytes());
         String result = future.get();
         dos.write(result.getBytes());
       } catch (InterruptedException e) {
       e.printStackTrace();
      } catch (ExecutionException e) {
         e.printStackTrace();
    }
    } catch (IOException e) {
       // TODO Auto-generated catch block
       e.printStackTrace();
    } finally {
         if (null != connectedSocket) {
       try {
         connectedSocket.close();
    } catch (IOException e) {
       // TODO Auto-generated catch block
       e.printStackTrace();
    }
    }
    if (null != dos) {
    try {
    dos.close();
    } catch (IOException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
    }
    }
    executor.shutdown();
    }
    }

    private int getCount() {
       int ret = 0;
       try {
        lock.lock();
        ret = count;
    } finally {
       lock.unlock();
    }
       return ret;
    }

    private void increaseCount() {
       try {
        lock.lock();
        ++count;
       } finally {
           lock.unlock();
        }
      }
    }

    class TimeConsumingTask implements Callable<String> {
       public String call() throws Exception {
       System.out.println("It"s a time-consuming task, you"d better retrieve your result in the furture");
       return "ok, here"s the result: It takes me lots of time to produce this result";
      }
    }  运行程序 运行服务端,客户端只需使用telnet 127.0.0.1 19527 即可看到信息  

       
         
         
          
          

            
          

            
          
         
       

      


    源码下载:http://file.javaxxz.com/2014/11/3/235916640.zip
    回复

    使用道具 举报

    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    QQ|手机版|Java学习者论坛 ( 声明:本站资料整理自互联网,用于Java学习者交流学习使用,对资料版权不负任何法律责任,若有侵权请及时联系客服屏蔽删除 )

    GMT+8, 2025-2-25 13:49 , Processed in 0.363765 second(s), 50 queries .

    Powered by Discuz! X3.4

    © 2001-2017 Comsenz Inc.

    快速回复 返回顶部 返回列表