TA的每日心情 | 开心 2021-3-12 23:18 |
---|
签到天数: 2 天 [LV.1]初来乍到
|
为什么需要NIO
使用java编写过Socket程序的同学一定都知道Socket和SocketServer。当调用某个调用的时候,调用的地方就会阻塞,等待响应。这种方式对于小规模的程序非常方便,但是对于大型的程序就有点力不从心了,当有大量的连接的时候,我们可以为每一个连接建立一个线程来操作。但是这种做法带来的缺陷也是显而易见的:
1.硬件能够支持大量的并发。
2.并发的数量始终有一个上限。
3. 各个线程之间的优先级不好控制。
4. 各个Client之间的交互与同步困难。 我们也可以使用一个线程来处理所有的请求,使用不阻塞的IO,轮询查询所有的Client。这种做法同样也有缺陷:无法迅速响应Client端,同时会消耗大量轮询查询的时间。
所以,我们需要一种poll的模式来处理这种情况,从大量的网络连接中找出来真正需要服务的Client。这正是NIO诞生的原因:提供一种Poll的模式,在所有的Client中找到需要服务的Client。在JDK中,有一个非常有意思的库:NIO(New I/O),有三个最最重要的Class:java.nio.channels中Selector和Channel,以及java.nio中的Buffer。
套接字通道 1. 阻塞式套接字通道 与Socket和ServerSocket对应,NIO提供了SocketChannel和ServerSocketChannel对应,这两种通道同时支持一般的阻塞模式和更高效的非阻塞模式。 客户端通过SocketChannel.open()方法打开一个Socket通道,如果此时提供了SocketAddress参数,则会自动开始连接,否则需要主动调用connect()方法连接,创建连接后,可以像一般的Channel一样的用Buffer进行读写,这都是阻塞模式的。 服务器端通过ServerSocketChannel.open()创建,并使用bind()方法绑定到一个监听地址上,最后调用accept()方法阻塞等待客户端连接。当客户端连接后会返回一个SocketChannel以实现与客户端的读写交互。 总的来说,阻塞模式即是net包I/O的翻版,只是采用Channel和Buffer实现而已。 2.多路复用套接字通道(Selector实现的非阻塞式IO) 套接字通道多路复用的思想是创建一个Selector,将多个通道对它进行注册,当套接字有关注的事件发生时,可以选出这个通道进行操作。 Channel代表一个可以被用于Poll操作的对象(可以是文件流也可以使网络流),Channel能够被注册到一个Selector中。通过调用Selector的select方法可以从所有的Channel中找到需要服务的实例(Accept,read ..)。Buffer对象提供读写数据的缓存。相对于我们熟悉的Stream对象,Buffer提供更好的性能以及更好的编程透明性(人为控制缓存的大小以及具体的操作)。 (3)配合Buffer使用Channel,实现一个简单的非阻塞Echo Client:
与传统模式的编程不用,Channel不使用Stream,而是Buffer。
- import java.net.InetSocketAddress;
- import java.net.SocketException;
- import java.nio.ByteBuffer;
- import java.nio.channels.SocketChannel;
- public class TCPEchoClientNonblocking {
- public static void main(String args[]) throws Exception {
- if ((args.length < 2) || (args.length > 3))// Testforcorrect#ofargs
- throw new IllegalArgumentException("Parameter(s): < Server> < Word> [< Port>]");
- String server = args[0];// 服务器IP地址或名字
- byte[] argument = args[1].getBytes();//向服务器发送的字符串
- int servPort = (args.length == 3) ? Integer.parseInt(args[2]) : 7777;
- SocketChannel clntChan = SocketChannel.open();// 创建一个套接字通道,注意这里必须使用无参形式
- // 设置为非阻塞模式,
- //这个方法必须在实际连接之前调用(所以open的时候不能提供服务器地址,否则会自动连接)
- clntChan.configureBlocking(false);
- // 连接服务器,由于是非阻塞模式,这个方法会发起连接请求,
- //并直接返回false(阻塞模式是一直等到链接成功并返回是否成功)
- if (!clntChan.connect(new InetSocketAddress(server, servPort))) {
- while (!clntChan.finishConnect()) {
- System.out.print(".");
- }
- }
- ByteBuffer writeBuf = ByteBuffer.wrap(argument);
- ByteBuffer readBuf = ByteBuffer.allocate(argument.length);
- int totalBytesRcvd = 0;
- int bytesRcvd;
- while (totalBytesRcvd < argument.length) {
- if (writeBuf.hasRemaining()) {
- clntChan.write(writeBuf);
- }
- if ((bytesRcvd = clntChan.read(readBuf)) == -1) {
- throw new SocketException("Connection closed prematurely");
- }
- totalBytesRcvd += bytesRcvd;
- System.out.print(".");
- }
-
- System.out.println("Received:" + new String(readBuf.array(), 0, totalBytesRcvd));
- clntChan.close();
- }
- }
复制代码 这段代码使用ByteBuffer来保存读写的数据。通过clntChan.configureBlocking(false); 设置后,其中的connect,read,write操作都不回阻塞,而是立刻放回结果。
(4)使用Selector,实现Echo Server。
Selector的可以从所有的被注册到自己Channel中找到需要服务的实例。 首先,定义一个接口:- import java.nio.channels.SelectionKey;
- import java.io.IOException;
- public interface TCPProtocol {
- void handleAccept(SelectionKey key) throws IOException;
- void handleRead(SelectionKey key) throws IOException;
- void handleWrite(SelectionKey key) throws IOException;
- }
复制代码- 我们的Echo Server将使用这个接口。然后我们实现Echo Server:
- import java.io.IOException;
- import java.net.InetSocketAddress;
- import java.nio.channels.SelectionKey;
- import java.nio.channels.Selector;
- import java.nio.channels.ServerSocketChannel;
- import java.util.Iterator;
- public class TCPServerSelector {
- private static final int BUFSIZE = 256;
- private static final int TIMEOUT = 3000;
- public static void main(String[] args) throws IOException {
- if (args.length < 1) {
- throw new IllegalArgumentException("Parameter(s):< Port>...");
- }
-
- // 创建一个选择器,可用close()关闭,isOpen()表示是否处于打开状态,他不隶属于当前线程
- Selector selector = Selector.open();
-
- for (String arg : args) {
-
- // 创建ServerSocketChannel,并把它绑定到指定端口上
- ServerSocketChannel listnChannel = ServerSocketChannel.open();
- listnChannel.socket().bind(
- new InetSocketAddress(Integer.parseInt(arg)));
- listnChannel.configureBlocking(false);//设置为非阻塞模式, 这个非常重要
- // 在选择器里面注册关注这个服务器套接字通道的accept事件
- // ServerSocketChannel只有OP_ACCEPT可用,OP_CONNECT,OP_READ,OP_WRITE用于SocketChannel
- listnChannel.register(selector, SelectionKey.OP_ACCEPT);
- }
-
- TCPProtocol protocol = new EchoSelectorProtocol(BUFSIZE);
- while (true) {
-
- /*测试等待事件发生,分为直接返回的selectNow()和阻塞等待的select(),
- 另外也可加一个参数表示阻塞超时,停止阻塞的方法有两种: 中断线程和selector.wakeup(),
- 有事件发生时,会自动的wakeup(),方法返回为select出的事件数,另外务必注意一个问题是,
- 当selector被select()阻塞时,其他的线程调用同一个selector的register也会被阻塞到select返回为止
- select操作会把发生关注事件的Key加入到selectionKeys中(只管加不管减)*/
- if (selector.select(TIMEOUT) == 0) {
- System.out.print(".");
- continue;
- }
-
- // 获取发生了关注时间的Key集合,每个SelectionKey对应了注册的一个通道
- // 多说一句selector.keys()返回所有的SelectionKey(包括没有发生事件的)
- Iterator< SelectionKey> keyIter = selector.selectedKeys().iterator();
- while (keyIter.hasNext()) {
- SelectionKey key = keyIter.next();
- // OP_ACCEPT 这个只有ServerSocketChannel才有可能触发
- if (key.isAcceptable()) {
- protocol.handleAccept(key);
- }
-
- if (key.isReadable()) { // OP_READ 有数据可读
- protocol.handleRead(key);
- }
- // OP_WRITE 可写状态 这个状态通常总是触发的,所以只在需要写操作时才进行关注
- if (key.isValid() && key.isWritable()) {
- protocol.handleWrite(key);
- }
- keyIter.remove();
- }
- }
- }
- }
复制代码 我们通过listnChannel.register(selector, SelectionKey.OP_ACCEPT); 注册了一个我们感兴趣的事件,然后调用selector.select(TIMEOUT)等待订阅的时间发生,然后再采取相应的处理措施。 最后我们实现EchoSelectorProtocol- import java.nio.channels.SelectionKey;
- import java.nio.channels.SocketChannel;
- import java.nio.channels.ServerSocketChannel;
- import java.nio.ByteBuffer;
- import java.io.IOException;
- public class EchoSelectorProtocol implements TCPProtocol {
- private int bufSize;
- public EchoSelectorProtocol(int bufSize) {
- this.bufSize = bufSize;
- }
- public void handleAccept(SelectionKey key) throws IOException {
- // 得到与客户端的套接字通道
- SocketChannel clntChan = ((ServerSocketChannel) key.channel()).accept();
- clntChan.configureBlocking(false); // 同样设置为非阻塞模式
-
- // 同样将客户端的通道在selector上注册,OP_READ对应可读事件(对方有写入数据),
- //可以通过key获取关联的选择器
- clntChan.register(key.selector(), SelectionKey.OP_READ, ByteBuffer
- .allocate(bufSize));
- }
- public void handleRead(SelectionKey key) throws IOException {
-
- SocketChannel clntChan = (SocketChannel) key.channel();
- ByteBuffer buf = (ByteBuffer) key.attachment();
- long bytesRead = clntChan.read(buf);
- if (bytesRead == -1) {
- clntChan.close();
- } else if (bytesRead > 0) {
-
- key.interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
- }
- }
- public void handleWrite(SelectionKey key) throws IOException {
-
- ByteBuffer buf = (ByteBuffer) key.attachment();
- buf.flip();
- SocketChannel clntChan = (SocketChannel) key.channel();
- clntChan.write(buf);
- if (!buf.hasRemaining()) {
-
- key.interestOps(SelectionKey.OP_READ);
- }
- buf.compact();// Makeroomformoredatatobereadin
- }
- }
复制代码 在这里,我们又进一步对Selector注册了相关的事件:key.interestOps(SelectionKey.OP_READ); 这样,我们就实现了基于NIO的Echo 系统。
运行服务器:
C:java>java TCPServerSelector 9999
.........
运行客户端:
C:java>java TCPEchoClientNonblocking 127.0.0.1 hello 9999
.Received:hello C:java>java TCPEchoClientNonblocking 127.0.0.1 ok 9999
.Received:ok
源码下载:http://file.javaxxz.com/2014/11/2/235820937.zip |
|