所有文章

Java - NIO框架

什么是 NIO

为了弥补 Java IO 的不足,Java 从 1.4 版本开始引入了 NIO 框架,也就是说 NIO 是一套新的 IO 框架,一般解释为 Non-blocking IO,有时也解释为 New IO。它可以在实现高并发服务器的同时占用很少的资源。

传统 IO 框架

对于传统的 IO 框架,NIO 的优势主要体现在对请求的处理上,下面我们先来看看用传统 IO 怎样建立一个 TCP 连接。 首先创建服务端,并等待新的连接,下面是伪代码:

ServerSocket ser = new ServerSocket(8081);
 
while (true) {
     
    Socket sk = ser.accept();
 
    startNewThread(sk);
 
}
  1. 第一行是创建一个服务端对象,并准备监听本机的 8081 端口。
  2. Socket sk = ser.accept(); 表示开始监听新的连接请求,这行代码是阻塞式的,直到有新的客户端连接过来,会返回一个新 Socket 连接对象。
  3. startNewThread() 方法是处理 Socket 连接的主要逻辑,它会创建一个新的线程并立即返回,该线程的工作是一直监听客户端发来的数据,然后作出回应。
  4. 这时又会回到循环的项部,继续等待新的连接,这就是一个服务端了。

缺点:乍一看没什么问题,一切都很美好不是吗,但是当连接的请求多了呢,比如几万或者几百万?这样的话就会有几百万个线程同时运行,因为每一个Socket都需要一个线程来监听数据,即使大部分 Socket 都是空闲状态,只有一小部分正在繁忙的交换数据,这一百万个线程如果每个只占用 1M 的内存,那也需要 1T 的内存!这显然不太明智。

NIO 框架

在上例中共有两种监听事件,一种是主线程不断地监听新的连接,另一种是其它子线程监听它所负责的Socket上的数据,那能不能把这些需要监听对象放在一起呢?然后用一个线程或少量几个线程专门负责监听所有对象上的请求??这样就能节省大量资源啊!实际上这也是完全合理的,并且在新的 IO 框架中就是这么做的!那么我们来看看NIO的实现方式吧。

创建 Selector 对象

根据以上思路,我们首先需要一个池用来存放这些需要监的对象,它就是 NIO 框架中的 Selector 类,那怎么创建呢?如下:

Selector sel = Selector.open();

创建 ServerSocketChannel 对象

然后我们需要一个类似旧 IO 框架中的 ServerSocket,用来监听某端口上的连接请求,在 NIO 中它叫做 ServerSocketChannel

ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.socket().bind(new InetSocketAddress("localhost", 8081));
ssc.configureBlocking(false);

创建好之后需要设置一些相应的参数,无非就是 IP 端口之类,最后一行是将它设置为非阻塞,还记得传统 IO 的实现方式吗?它有一步是监听新的连接,也就是 ser.accept() 这一句,它是阻塞式的,当设置了 ssc.configureBlocking(false) 之后,accept() 方法就不再阻塞,而是立即返回,这样主线程就可以腾出时间做更多的事情,而不是一味地干等着。

注册新连接事件

然后把这个 ServerSocketChannel 对象放入 Selector 中,在 NIO 中,这个动作被称为注册,像下面这样:

ssc.register(sel, SelectionKey.OP_ACCEPT);

这样 ssc 就被当作事件注册到 Selector 中,register() 的第二个参数是为了标记这个事件的类型,也就是说 Selector 中可以多种类型的事件。

处理事件

现在,事件池中已经有一个事件了,这时我们需要写一个循环,不断地从池中取出已发生的事件并处理掉,取出来的事件是一个 Set 集合,然后遍历里面的事件:

sel.select();
Set<SelectionKey> selectionKeys = sel.selectedKeys();
Iterator<SelectionKey> it = selectionKeys.iterator();
while(it.hasNext()){
    SelectionKey key = it.next();
...

注册接收数据事件

这个 key 要怎么处理呢?它不是就是我们放进去的 ServerSocketChannel 吗,我们调用它的 accept() 方法就可以了,它返回一个 Socket 对象,然后将这个新的对象也放入池中,让 Selector 统一管理与监听,放进去之前也要将它设置为非阻塞:

ServerSocketChannel c = (ServerSocketChannel)key.channel();
SocketChannel sc = c.accept();
sc.configureBlocking(false);
     
//将新的连接加入监听器中
sc.register(sel, SelectionKey.OP_READ);

注意这时 register() 方法的第二个参数不一样了,它被标记为 OP_READ 类型的事件。

完整示例

这时,池中已经有两种事件了,所以我们在取出事件后,要对其作区分,不同事件做不同的处理,下面是服务端的完整逻辑:

public static void main(String[] args) throws Exception {
    //创建一个事件池
    Selector sel = Selector.open();
     
    //创建一个服务端并加入事件池
    ServerSocketChannel ssc = ServerSocketChannel.open();
    ssc.socket().bind(new InetSocketAddress("localhost", 8888));
    ssc.configureBlocking(false);
    ssc.register(sel, SelectionKey.OP_ACCEPT);
    //不断监听并处理池中的事件
    while(true){
         //不断监听并取出池中的事件
        int num = sel.select();
        System.out.printf("read [%s] event from selector!\n", num);
        Set<SelectionKey> selectionKeys = sel.selectedKeys();
        Iterator<SelectionKey> it = selectionKeys.iterator();
        //处理取出的事件
        while(it.hasNext()){
            SelectionKey key = it.next();
             
            if(key.readyOps() == SelectionKey.OP_ACCEPT){
                System.out.println("检测到新的连接");
                ServerSocketChannel c = (ServerSocketChannel)key.channel();
                SocketChannel sc = c.accept();
                sc.configureBlocking(false);
                 
                //将新的连接加入监听器中
                sc.register(sel, SelectionKey.OP_READ);
                 
            } else if (key.readyOps() == SelectionKey.OP_READ){
                System.out.println("检测到新的请求");
                SocketChannel sc = (SocketChannel)key.channel();
                 
                try{
                    handelRequest(sc);
                }catch (IOException e) {
                    System.out.println("有连接被断开");
                    sc.close();
                    key.cancel();
                }
            }
        }
        selectionKeys.clear();
        Thread.sleep(1000);
    }

数据处理

handelRequest() 方法是需要自己实现的,它主要是从 SocketChannel 中接收数据然后作出相应的动作,如果接收时抛出异常,我们就认为这个连接已经断开,并取消对其的监听,也就是 key.cancel();,为了让程序跑的慢一些以便看清楚它的流程,所以加入了 Thread.sleep(1000);

handelRequest() 方法实现如下:

static ByteBuffer buff = ByteBuffer.allocate(128);
static void handelRequest(SocketChannel sc) throws IOException{
    buff.clear();
     
    //读取客户端发来的数据
    if(sc.read(buff) < 1)
        return;
     
    //处理接收到的数据
    buff.flip();
    System.out.println(new String(buff.array()));
 
    //向客户端作出回应
    sc.write(ByteBuffer.wrap("I am Server!".getBytes()));
}

第一行创建是 NIO 框架专用的 ByteBuffer

if(sc.read(buff) < 1) 是将客户端发来的数据读到 ByteBuffer 中,如果客户端没有发数据过来,此方法会返回 -1。

NIO中各对象的关系

到这里整个流程差不多就结束了,但有些概念还是需要捋一捋,在 NIO 框架中,所有对象可以分为三种,Channel、ByteBuffer 和 Selector,其中 Channel 的概念上面没有提到,其实它就是一个抽象,因为我们要把所有需监听的对象放到 Selector 这个池中,所以当然需要将放进去的东西统一抽象出来,也就是说在 NIO 中,所有连接都被看作是 Channel,而从一个 Channel 中读出数据或者是写入数据都要先经过 ByteBuffer,ByteBuffer 起到缓冲的作用,在 NIO 中所有读写操都是非阻塞的,而如果客户端发一条很长的数据过来,由于网卡和操作系统的限制,必然后被切分成多条子消息发送到服务端,为了避免数据的不完整性,缓冲机制就成了必须品。

参考

  1. IBM有一篇NIO的文章,写的相当详细适合刚接触NIO的同学: NIO 入门
  2. 在实际项目中,一般不会自主实现一套产品级的高可用 NIO 框架,现在比较成熟的 NIO 开源框架是 Netty,在很多项目中被使用,我之前写过一篇关于 Netty 的文章: Netty 使用教程

编写日期:2017-07-08