Java学习者论坛

 找回密码
 立即注册

QQ登录

只需一步,快速开始

手机号码,快捷登录

恭喜Java学习者论坛(https://www.javaxxz.com)已经为数万Java学习者服务超过8年了!积累会员资料超过10000G+
成为本站VIP会员,下载本站10000G+会员资源,购买链接:点击进入购买VIP会员
JAVA高级面试进阶视频教程Java架构师系统进阶VIP课程

分布式高可用全栈开发微服务教程

Go语言视频零基础入门到精通

Java架构师3期(课件+源码)

Java开发全终端实战租房项目视频教程

SpringBoot2.X入门到高级使用教程

大数据培训第六期全套视频教程

深度学习(CNN RNN GAN)算法原理

Java亿级流量电商系统视频教程

互联网架构师视频教程

年薪50万Spark2.0从入门到精通

年薪50万!人工智能学习路线教程

年薪50万!大数据从入门到精通学习路线年薪50万!机器学习入门到精通视频教程
仿小米商城类app和小程序视频教程深度学习数据分析基础到实战最新黑马javaEE2.1就业课程从 0到JVM实战高手教程 MySQL入门到精通教程
查看: 300|回复: 0

[JavaIO学习]基于NIO的入门Echo程序。

[复制链接]
  • TA的每日心情
    开心
    2021-3-12 23:18
  • 签到天数: 2 天

    [LV.1]初来乍到

    发表于 2014-11-2 23:58:21 | 显示全部楼层 |阅读模式
    为什么需要NIO
        使用java编写过Socket程序的同学一定都知道Socket和SocketServer。当调用某个调用的时候,调用的地方就会阻塞,等待响应。这种方式对于小规模的程序非常方便,但是对于大型的程序就有点力不从心了,当有大量的连接的时候,我们可以为每一个连接建立一个线程来操作。但是这种做法带来的缺陷也是显而易见的:
    1.硬件能够支持大量的并发。
    2.并发的数量始终有一个上限。
    3. 各个线程之间的优先级不好控制。
    4. 各个Client之间的交互与同步困难。     我们也可以使用一个线程来处理所有的请求,使用不阻塞的IO,轮询查询所有的Client。这种做法同样也有缺陷:无法迅速响应Client端,同时会消耗大量轮询查询的时间。
      
       
       

         
       

         
       
      

         所以,我们需要一种poll的模式来处理这种情况,从大量的网络连接中找出来真正需要服务的Client。这正是NIO诞生的原因:提供一种Poll的模式,在所有的Client中找到需要服务的Client。在JDK中,有一个非常有意思的库:NIO(New I/O),有三个最最重要的Class:java.nio.channels中Selector和Channel,以及java.nio中的Buffer。

    套接字通道 1. 阻塞式套接字通道      与Socket和ServerSocket对应,NIO提供了SocketChannel和ServerSocketChannel对应,这两种通道同时支持一般的阻塞模式和更高效的非阻塞模式。     客户端通过SocketChannel.open()方法打开一个Socket通道,如果此时提供了SocketAddress参数,则会自动开始连接,否则需要主动调用connect()方法连接,创建连接后,可以像一般的Channel一样的用Buffer进行读写,这都是阻塞模式的。    服务器端通过ServerSocketChannel.open()创建,并使用bind()方法绑定到一个监听地址上,最后调用accept()方法阻塞等待客户端连接。当客户端连接后会返回一个SocketChannel以实现与客户端的读写交互。     总的来说,阻塞模式即是net包I/O的翻版,只是采用Channel和Buffer实现而已。 2.多路复用套接字通道(Selector实现的非阻塞式IO)    套接字通道多路复用的思想是创建一个Selector,将多个通道对它进行注册,当套接字有关注的事件发生时,可以选出这个通道进行操作。       Channel代表一个可以被用于Poll操作的对象(可以是文件流也可以使网络流),Channel能够被注册到一个Selector中。通过调用Selector的select方法可以从所有的Channel中找到需要服务的实例(Accept,read ..)。Buffer对象提供读写数据的缓存。相对于我们熟悉的Stream对象,Buffer提供更好的性能以及更好的编程透明性(人为控制缓存的大小以及具体的操作)。 (3)配合Buffer使用Channel,实现一个简单的非阻塞Echo Client:
        与传统模式的编程不用,Channel不使用Stream,而是Buffer。

    1. import java.net.InetSocketAddress;
    2. import java.net.SocketException;
    3. import java.nio.ByteBuffer;
    4. import java.nio.channels.SocketChannel;
    5. public class TCPEchoClientNonblocking {
    6.   public static void main(String args[]) throws Exception {
    7.     if ((args.length < 2) || (args.length > 3))// Testforcorrect#ofargs
    8.         throw new IllegalArgumentException("Parameter(s): < Server> < Word> [< Port>]");
    9.     String server = args[0];// 服务器IP地址或名字
    10.    byte[] argument = args[1].getBytes();//向服务器发送的字符串
    11.    int servPort = (args.length == 3) ? Integer.parseInt(args[2]) : 7777;
    12.    SocketChannel clntChan = SocketChannel.open();// 创建一个套接字通道,注意这里必须使用无参形式

    13.    // 设置为非阻塞模式,
    14.    //这个方法必须在实际连接之前调用(所以open的时候不能提供服务器地址,否则会自动连接)
    15.    clntChan.configureBlocking(false);
    16.   // 连接服务器,由于是非阻塞模式,这个方法会发起连接请求,
    17.   //并直接返回false(阻塞模式是一直等到链接成功并返回是否成功)
    18.   if (!clntChan.connect(new InetSocketAddress(server, servPort))) {
    19.     while (!clntChan.finishConnect()) {
    20.         System.out.print(".");
    21.     }
    22.   }
    23.    ByteBuffer writeBuf = ByteBuffer.wrap(argument);
    24.    ByteBuffer readBuf = ByteBuffer.allocate(argument.length);
    25.    int totalBytesRcvd = 0;
    26.    int bytesRcvd;
    27.    while (totalBytesRcvd < argument.length) {
    28.         if (writeBuf.hasRemaining()) {
    29.                 clntChan.write(writeBuf);
    30.         }
    31.         if ((bytesRcvd = clntChan.read(readBuf)) == -1) {
    32.                 throw new SocketException("Connection closed prematurely");
    33.         }
    34.         totalBytesRcvd += bytesRcvd;
    35.         System.out.print(".");
    36.     }

    37.    System.out.println("Received:" + new String(readBuf.array(), 0, totalBytesRcvd));
    38.         clntChan.close();
    39.   }
    40. }
    复制代码
    这段代码使用ByteBuffer来保存读写的数据。通过clntChan.configureBlocking(false); 设置后,其中的connect,read,write操作都不回阻塞,而是立刻放回结果。
    (4)使用Selector,实现Echo Server。
       Selector的可以从所有的被注册到自己Channel中找到需要服务的实例。 首先,定义一个接口:
    1. import java.nio.channels.SelectionKey;
    2. import java.io.IOException;
    3. public interface TCPProtocol {
    4.         void handleAccept(SelectionKey key) throws IOException;
    5.         void handleRead(SelectionKey key) throws IOException;
    6.         void handleWrite(SelectionKey key) throws IOException;
    7. }
    复制代码
    1. 我们的Echo Server将使用这个接口。然后我们实现Echo Server:
    2. import java.io.IOException;
    3. import java.net.InetSocketAddress;
    4. import java.nio.channels.SelectionKey;
    5. import java.nio.channels.Selector;
    6. import java.nio.channels.ServerSocketChannel;
    7. import java.util.Iterator;
    8. public class TCPServerSelector {
    9.         private static final int BUFSIZE = 256;
    10.         private static final int TIMEOUT = 3000;
    11.         public static void main(String[] args) throws IOException {
    12.                 if (args.length < 1) {
    13.                         throw new IllegalArgumentException("Parameter(s):< Port>...");
    14.                 }
    15.                
    16.        // 创建一个选择器,可用close()关闭,isOpen()表示是否处于打开状态,他不隶属于当前线程
    17.                 Selector selector = Selector.open();
    18.                
    19.                 for (String arg : args) {
    20.                   
    21.             // 创建ServerSocketChannel,并把它绑定到指定端口上
    22.                         ServerSocketChannel listnChannel = ServerSocketChannel.open();
    23.                         listnChannel.socket().bind(
    24.                                         new InetSocketAddress(Integer.parseInt(arg)));
    25.                         listnChannel.configureBlocking(false);//设置为非阻塞模式, 这个非常重要
    26.                         // 在选择器里面注册关注这个服务器套接字通道的accept事件
    27.             // ServerSocketChannel只有OP_ACCEPT可用,OP_CONNECT,OP_READ,OP_WRITE用于SocketChannel
    28.                         listnChannel.register(selector, SelectionKey.OP_ACCEPT);
    29.                 }
    30.                
    31.                 TCPProtocol protocol = new EchoSelectorProtocol(BUFSIZE);
    32.                 while (true) {
    33.                
    34.     /*测试等待事件发生,分为直接返回的selectNow()和阻塞等待的select(),
    35.         另外也可加一个参数表示阻塞超时,停止阻塞的方法有两种: 中断线程和selector.wakeup(),
    36.         有事件发生时,会自动的wakeup(),方法返回为select出的事件数,另外务必注意一个问题是,
    37.         当selector被select()阻塞时,其他的线程调用同一个selector的register也会被阻塞到select返回为止
    38.     select操作会把发生关注事件的Key加入到selectionKeys中(只管加不管减)*/
    39.                         if (selector.select(TIMEOUT) == 0) {
    40.                                 System.out.print(".");
    41.                                 continue;
    42.                         }
    43.                        
    44.                          // 获取发生了关注时间的Key集合,每个SelectionKey对应了注册的一个通道
    45.                          // 多说一句selector.keys()返回所有的SelectionKey(包括没有发生事件的)
    46.                         Iterator< SelectionKey> keyIter = selector.selectedKeys().iterator();
    47.                         while (keyIter.hasNext()) {
    48.                                 SelectionKey key = keyIter.next();
    49.                            // OP_ACCEPT 这个只有ServerSocketChannel才有可能触发
    50.                                 if (key.isAcceptable()) {
    51.                                         protocol.handleAccept(key);
    52.                                 }
    53.                        
    54.                                 if (key.isReadable()) { // OP_READ 有数据可读
    55.                                         protocol.handleRead(key);
    56.                                 }
    57.                             // OP_WRITE 可写状态 这个状态通常总是触发的,所以只在需要写操作时才进行关注
    58.                                 if (key.isValid() && key.isWritable()) {
    59.                                         protocol.handleWrite(key);
    60.                                 }
    61.                                 keyIter.remove();
    62.                         }
    63.                 }
    64.         }
    65. }
    复制代码
       我们通过listnChannel.register(selector, SelectionKey.OP_ACCEPT); 注册了一个我们感兴趣的事件,然后调用selector.select(TIMEOUT)等待订阅的时间发生,然后再采取相应的处理措施。 最后我们实现EchoSelectorProtocol
    1. import java.nio.channels.SelectionKey;
    2. import java.nio.channels.SocketChannel;
    3. import java.nio.channels.ServerSocketChannel;
    4. import java.nio.ByteBuffer;
    5. import java.io.IOException;
    6. public class EchoSelectorProtocol implements TCPProtocol {
    7.         private int bufSize;
    8.         public EchoSelectorProtocol(int bufSize) {
    9.                 this.bufSize = bufSize;
    10.         }
    11.         public void handleAccept(SelectionKey key) throws IOException {
    12.             // 得到与客户端的套接字通道
    13.                 SocketChannel clntChan = ((ServerSocketChannel) key.channel()).accept();
    14.                 clntChan.configureBlocking(false);  // 同样设置为非阻塞模式
    15.                
    16.                 // 同样将客户端的通道在selector上注册,OP_READ对应可读事件(对方有写入数据),
    17.                 //可以通过key获取关联的选择器
    18.                 clntChan.register(key.selector(), SelectionKey.OP_READ, ByteBuffer
    19.                                 .allocate(bufSize));
    20.         }
    21.         public void handleRead(SelectionKey key) throws IOException {
    22.                
    23.                 SocketChannel clntChan = (SocketChannel) key.channel();
    24.                 ByteBuffer buf = (ByteBuffer) key.attachment();
    25.                 long bytesRead = clntChan.read(buf);
    26.                 if (bytesRead == -1) {
    27.                         clntChan.close();
    28.                 } else if (bytesRead > 0) {
    29.                        
    30.                         key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
    31.                 }
    32.         }
    33.         public void handleWrite(SelectionKey key) throws IOException {
    34.                
    35.                 ByteBuffer buf = (ByteBuffer) key.attachment();
    36.                 buf.flip();
    37.                 SocketChannel clntChan = (SocketChannel) key.channel();
    38.                 clntChan.write(buf);
    39.                 if (!buf.hasRemaining()) {
    40.                
    41.                         key.interestOps(SelectionKey.OP_READ);
    42.                 }
    43.                 buf.compact();// Makeroomformoredatatobereadin
    44.         }
    45. }
    复制代码
    在这里,我们又进一步对Selector注册了相关的事件:key.interestOps(SelectionKey.OP_READ); 这样,我们就实现了基于NIO的Echo 系统。
      运行服务器:
    C:java>java TCPServerSelector 9999
    .........

    运行客户端:
    C:java>java TCPEchoClientNonblocking 127.0.0.1 hello 9999
    .Received:hello C:java>java TCPEchoClientNonblocking 127.0.0.1 ok 9999
    .Received:ok

      
      
       
       

         
       

         
       
      
    复制代码

    源码下载:http://file.javaxxz.com/2014/11/2/235820937.zip
    回复

    使用道具 举报

    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    QQ|手机版|Java学习者论坛 ( 声明:本站资料整理自互联网,用于Java学习者交流学习使用,对资料版权不负任何法律责任,若有侵权请及时联系客服屏蔽删除 )

    GMT+8, 2025-2-25 16:33 , Processed in 0.353226 second(s), 46 queries .

    Powered by Discuz! X3.4

    © 2001-2017 Comsenz Inc.

    快速回复 返回顶部 返回列表