Java - NIO框架
什么是 NIO
为了弥补 Java IO 的不足,Java 从 1.4 版本开始引入了 NIO 框架,也就是说 NIO 是一套新的 IO 框架,一般解释为 Non-blocking IO,有时也解释为 New IO。它可以在实现高并发服务器的同时占用很少的资源。
传统 IO 框架
对于传统的 IO 框架,NIO 的优势主要体现在对请求的处理上,下面我们先来看看用传统 IO 怎样建立一个 TCP 连接。 首先创建服务端,并等待新的连接,下面是伪代码:
ServerSocket ser = new ServerSocket(8081);
while (true) {
Socket sk = ser.accept();
startNewThread(sk);
}
- 第一行是创建一个服务端对象,并准备监听本机的 8081 端口。
Socket sk = ser.accept();
表示开始监听新的连接请求,这行代码是阻塞式的,直到有新的客户端连接过来,会返回一个新 Socket 连接对象。startNewThread()
方法是处理 Socket 连接的主要逻辑,它会创建一个新的线程并立即返回,该线程的工作是一直监听客户端发来的数据,然后作出回应。- 这时又会回到循环的项部,继续等待新的连接,这就是一个服务端了。
缺点:乍一看没什么问题,一切都很美好不是吗,但是当连接的请求多了呢,比如几万或者几百万?这样的话就会有几百万个线程同时运行,因为每一个Socket都需要一个线程来监听数据,即使大部分 Socket 都是空闲状态,只有一小部分正在繁忙的交换数据,这一百万个线程如果每个只占用 1M 的内存,那也需要 1T 的内存!这显然不太明智。
NIO 框架
在上例中共有两种监听事件,一种是主线程不断地监听新的连接,另一种是其它子线程监听它所负责的Socket上的数据,那能不能把这些需要监听对象放在一起呢?然后用一个线程或少量几个线程专门负责监听所有对象上的请求??这样就能节省大量资源啊!实际上这也是完全合理的,并且在新的 IO 框架中就是这么做的!那么我们来看看NIO的实现方式吧。
创建 Selector 对象
根据以上思路,我们首先需要一个池用来存放这些需要监的对象,它就是 NIO 框架中的 Selector
类,那怎么创建呢?如下:
Selector sel = Selector.open();
创建 ServerSocketChannel 对象
然后我们需要一个类似旧 IO 框架中的 ServerSocket
,用来监听某端口上的连接请求,在 NIO 中它叫做 ServerSocketChannel
:
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.socket().bind(new InetSocketAddress("localhost", 8081));
ssc.configureBlocking(false);
创建好之后需要设置一些相应的参数,无非就是 IP 端口之类,最后一行是将它设置为非阻塞,还记得传统 IO 的实现方式吗?它有一步是监听新的连接,也就是 ser.accept()
这一句,它是阻塞式的,当设置了 ssc.configureBlocking(false)
之后,accept()
方法就不再阻塞,而是立即返回,这样主线程就可以腾出时间做更多的事情,而不是一味地干等着。
注册新连接事件
然后把这个 ServerSocketChannel
对象放入 Selector
中,在 NIO 中,这个动作被称为注册,像下面这样:
ssc.register(sel, SelectionKey.OP_ACCEPT);
这样 ssc
就被当作事件注册到 Selector
中,register()
的第二个参数是为了标记这个事件的类型,也就是说 Selector
中可以多种类型的事件。
处理事件
现在,事件池中已经有一个事件了,这时我们需要写一个循环,不断地从池中取出已发生的事件并处理掉,取出来的事件是一个 Set
集合,然后遍历里面的事件:
sel.select();
Set<SelectionKey> selectionKeys = sel.selectedKeys();
Iterator<SelectionKey> it = selectionKeys.iterator();
while(it.hasNext()){
SelectionKey key = it.next();
...
注册接收数据事件
这个 key 要怎么处理呢?它不是就是我们放进去的 ServerSocketChannel
吗,我们调用它的 accept()
方法就可以了,它返回一个 Socket
对象,然后将这个新的对象也放入池中,让 Selector
统一管理与监听,放进去之前也要将它设置为非阻塞:
ServerSocketChannel c = (ServerSocketChannel)key.channel();
SocketChannel sc = c.accept();
sc.configureBlocking(false);
//将新的连接加入监听器中
sc.register(sel, SelectionKey.OP_READ);
注意这时 register()
方法的第二个参数不一样了,它被标记为 OP_READ
类型的事件。
完整示例
这时,池中已经有两种事件了,所以我们在取出事件后,要对其作区分,不同事件做不同的处理,下面是服务端的完整逻辑:
public static void main(String[] args) throws Exception {
//创建一个事件池
Selector sel = Selector.open();
//创建一个服务端并加入事件池
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.socket().bind(new InetSocketAddress("localhost", 8888));
ssc.configureBlocking(false);
ssc.register(sel, SelectionKey.OP_ACCEPT);
//不断监听并处理池中的事件
while(true){
//不断监听并取出池中的事件
int num = sel.select();
System.out.printf("read [%s] event from selector!\n", num);
Set<SelectionKey> selectionKeys = sel.selectedKeys();
Iterator<SelectionKey> it = selectionKeys.iterator();
//处理取出的事件
while(it.hasNext()){
SelectionKey key = it.next();
if(key.readyOps() == SelectionKey.OP_ACCEPT){
System.out.println("检测到新的连接");
ServerSocketChannel c = (ServerSocketChannel)key.channel();
SocketChannel sc = c.accept();
sc.configureBlocking(false);
//将新的连接加入监听器中
sc.register(sel, SelectionKey.OP_READ);
} else if (key.readyOps() == SelectionKey.OP_READ){
System.out.println("检测到新的请求");
SocketChannel sc = (SocketChannel)key.channel();
try{
handelRequest(sc);
}catch (IOException e) {
System.out.println("有连接被断开");
sc.close();
key.cancel();
}
}
}
selectionKeys.clear();
Thread.sleep(1000);
}
数据处理
handelRequest()
方法是需要自己实现的,它主要是从 SocketChannel
中接收数据然后作出相应的动作,如果接收时抛出异常,我们就认为这个连接已经断开,并取消对其的监听,也就是 key.cancel();
,为了让程序跑的慢一些以便看清楚它的流程,所以加入了 Thread.sleep(1000);
。
handelRequest()
方法实现如下:
static ByteBuffer buff = ByteBuffer.allocate(128);
static void handelRequest(SocketChannel sc) throws IOException{
buff.clear();
//读取客户端发来的数据
if(sc.read(buff) < 1)
return;
//处理接收到的数据
buff.flip();
System.out.println(new String(buff.array()));
//向客户端作出回应
sc.write(ByteBuffer.wrap("I am Server!".getBytes()));
}
第一行创建是 NIO 框架专用的 ByteBuffer
。
if(sc.read(buff) < 1)
是将客户端发来的数据读到 ByteBuffer
中,如果客户端没有发数据过来,此方法会返回 -1。
NIO中各对象的关系
到这里整个流程差不多就结束了,但有些概念还是需要捋一捋,在 NIO 框架中,所有对象可以分为三种,Channel、ByteBuffer 和 Selector,其中 Channel 的概念上面没有提到,其实它就是一个抽象,因为我们要把所有需监听的对象放到 Selector 这个池中,所以当然需要将放进去的东西统一抽象出来,也就是说在 NIO 中,所有连接都被看作是 Channel,而从一个 Channel 中读出数据或者是写入数据都要先经过 ByteBuffer,ByteBuffer 起到缓冲的作用,在 NIO 中所有读写操都是非阻塞的,而如果客户端发一条很长的数据过来,由于网卡和操作系统的限制,必然后被切分成多条子消息发送到服务端,为了避免数据的不完整性,缓冲机制就成了必须品。
参考
- IBM有一篇NIO的文章,写的相当详细适合刚接触NIO的同学: NIO 入门
- 在实际项目中,一般不会自主实现一套产品级的高可用 NIO 框架,现在比较成熟的 NIO 开源框架是 Netty,在很多项目中被使用,我之前写过一篇关于 Netty 的文章: Netty 使用教程