Java - Netty入门
什么是Netty
Netty 是一个高性能高可用的 NIO 框架,并且使用简单,用它可以轻松地开发诸如协议服务器和客户端之类的网络应用程序。它大大简化了网络编程,如 TCP 和 UDP 套接字服务器开发。已经被其它很多项目使用,比如 Spark。
为什么不用 Java 标准库中的 NIO 框架
它实际上是 Java 标准库中的 NIO 框架的一个实现,并将它做成了一个通用框架,最重要的是它经过了很多项目的验证与打磨,现在已经非常稳定,如果项目组要从头开始实现一个这样好用的 NIO 框架成本是很高的,而且还需要长时间的打磨。由于 Netty 是开源项目,社区又很活跃,因此很多项目从一开始就选择 Netty 作为自己的 NIO 框架,以降低开发成本。
开始动手
我们 Netty 框架来编写一个简易应用程序,并说明它的执行流程,这个程序分为服务端和客户端两个部分,每个部分有两个类。
编写服务端
先来写服务端的主程序 TestServer
类,其实不是一个类,因为它只包含一个 main
方法而已:
package com.kxdmmr.demo.netty;
import io.netty.bootstrap.ServerBootstrap;
...
public class TestServer {
static final int PORT = Integer.parseInt(System.getProperty("port", "8081"));
static final int SIZE = Integer.parseInt(System.getProperty("size", "128"));
public static void main(String[] args) throws Exception {
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workerGroup = new NioEventLoopGroup();
try{
// 1.创建服务端
ServerBootstrap b = new ServerBootstrap();
// 2.配置服务端
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
// 注入自定义的处理程序
ch.pipeline().addLast(new TestServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG, SIZE)
.childOption(ChannelOption.SO_KEEPALIVE, true);
// 3.启动服务端
ChannelFuture f = b.bind(PORT).sync();
f.channel().closeFuture().sync();
} finally {
// 停止所有线程池中的线程
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}
创建服务端的整个过程可以分为三个步骤:1.创建服务端,2.配置服务端,3.启动服务端。
bossGroup
与workerGroup
是两个线程池,bossGroup
用来存放服务器端监听连接的事件,也就是监听着某一端口的线程,一旦有新的客户端连进来,它将被注册为等待接收数据的事件,并放入workerGroup
中。ServerBootstrap
类用来创建一个服务端程序,它简化了创建非阻塞式服务端程序的过程。NioServerSocketChannel
是 Netty 中定义的 Channel,底层封装了 Java 标准库中的ServerSocketChannel
。ChannelInitializer
类的initChannel
方法用来为程序配置 Channel,在这里可以将我们自定义的处理 Channel 的方法,也就是TestServerHandler
这个类,它是来用处理其它客户端发来的数据。option()
和childOption()
可以做一些其它配置,最后b.bind(PORT).sync()
就是启动服务端了。上面用到了
TestServerHandler
类,其定义如下:
package com.kxdmmr.demo.netty;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
public class TestServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws InterruptedException {
ByteBuf in = (ByteBuf) msg;
while (in.isReadable())
System.out.print((char) in.readByte());
System.out.println();
System.out.flush();
in.writeBytes("I am server!".getBytes());
Thread.sleep(1000);
ctx.write(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
当一个己连接到服务器的 Client 向服务端发数消息时,上面 channelRead()
会被调起,它的作用是处理从 Client 接收到的数据,这个就需要自己根据业务来定义了,在本例中我只是将接收到的消息打印出来,然后调用 in.writeBytes()
向 Client 发送一条消息作为回应,为了看清楚它的运行过程,所以加了 Thread.sleep(1000)
这一句。
当 channelRead()
方法结束后意味着数据处理完毕,这时 channelReadComplete()
方法会被调起,用户可以根据需要在这里加一些其它逻辑。
如果服务器端运行出现异常,比如连接意外中断,那么 exceptionCaught()
会被调用。
编写客户端
客户端的创建步骤与服务端是一样的,不同的是这里用的是 Bootstrap
类,而且只需要一个线程池:
package com.kxdmmr.demo.netty;
import io.netty.bootstrap.Bootstrap;
...
public class TestClient {
static final String HOST = System.getProperty("host", "127.0.0.1");
static final int PORT = Integer.parseInt(System.getProperty("port", "8081"));
static final int SIZE = Integer.parseInt(System.getProperty("size", "128"));
public static void main(String[] args) throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
// 创建客户端
Bootstrap b = new Bootstrap();
// 配置客户端
b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
// p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(new TestClientHandler());
}
});
// 启动客户端
ChannelFuture f = b.connect(HOST, PORT).sync();
// 在连接关闭之前一直等待
f.channel().closeFuture().sync();
} finally {
// 停掉所有线程
group.shutdownGracefully();
}
}
}
客户端同样用到了一个自定义的类:TestClientHandler
,用来处理服务端发来的消息,其定义如下:
package com.kxdmmr.demo.netty;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
public class TestClientHandler extends ChannelInboundHandlerAdapter {
private final ByteBuf firstMessage;
/**
* 创建一个客户端的处理器并初始化数据
*/
public TestClientHandler() {
firstMessage = Unpooled.copiedBuffer("I am client!".getBytes());
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.writeAndFlush(firstMessage);
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws InterruptedException {
ByteBuf in = (ByteBuf) msg;
while (in.isReadable())
System.out.print((char) in.readByte());
System.out.println();
System.out.flush();
in.writeBytes("I am client!".getBytes());
Thread.sleep(1000);
ctx.write(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
它与服务端唯一不同的是,比服务端多了一个方法 channelActive()
,其它三个方法与服务端作用是一样的,channelActive()
是用来在启动客户端时首次向服务端发送消息,也就是:”I am client!“。
-End-