所有文章

Java - Netty入门

什么是Netty

Netty 是一个高性能高可用的 NIO 框架,并且使用简单,用它可以轻松地开发诸如协议服务器和客户端之类的网络应用程序。它大大简化了网络编程,如 TCP 和 UDP 套接字服务器开发。已经被其它很多项目使用,比如 Spark。

为什么不用 Java 标准库中的 NIO 框架

它实际上是 Java 标准库中的 NIO 框架的一个实现,并将它做成了一个通用框架,最重要的是它经过了很多项目的验证与打磨,现在已经非常稳定,如果项目组要从头开始实现一个这样好用的 NIO 框架成本是很高的,而且还需要长时间的打磨。由于 Netty 是开源项目,社区又很活跃,因此很多项目从一开始就选择 Netty 作为自己的 NIO 框架,以降低开发成本。

开始动手

我们 Netty 框架来编写一个简易应用程序,并说明它的执行流程,这个程序分为服务端和客户端两个部分,每个部分有两个类。

编写服务端

先来写服务端的主程序 TestServer 类,其实不是一个类,因为它只包含一个 main 方法而已:

package com.kxdmmr.demo.netty;
 
import io.netty.bootstrap.ServerBootstrap;
...
 
public class TestServer {
 
    static final int PORT = Integer.parseInt(System.getProperty("port", "8081"));
    static final int SIZE = Integer.parseInt(System.getProperty("size", "128"));
     
    public static void main(String[] args) throws Exception {
        NioEventLoopGroup bossGroup = new NioEventLoopGroup();
        NioEventLoopGroup workerGroup = new NioEventLoopGroup();
        try{
            // 1.创建服务端
            ServerBootstrap b = new ServerBootstrap();
            // 2.配置服务端
            b.group(bossGroup, workerGroup)
            .channel(NioServerSocketChannel.class)
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    // 注入自定义的处理程序
                    ch.pipeline().addLast(new TestServerHandler());
                }
            })
            .option(ChannelOption.SO_BACKLOG, SIZE)
            .childOption(ChannelOption.SO_KEEPALIVE, true);
             
            // 3.启动服务端
            ChannelFuture f = b.bind(PORT).sync();
             
            f.channel().closeFuture().sync();
        } finally {
            // 停止所有线程池中的线程
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
         
    }
}
package com.kxdmmr.demo.netty;
 
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
 
public class TestServerHandler extends ChannelInboundHandlerAdapter {
     
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws InterruptedException {
        ByteBuf in = (ByteBuf) msg;
        while (in.isReadable())
            System.out.print((char) in.readByte());
        System.out.println();
        System.out.flush();
         
        in.writeBytes("I am server!".getBytes());
         
        Thread.sleep(1000);
         
        ctx.write(msg);
    }
 
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        ctx.flush();
    }
 
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

当一个己连接到服务器的 Client 向服务端发数消息时,上面 channelRead() 会被调起,它的作用是处理从 Client 接收到的数据,这个就需要自己根据业务来定义了,在本例中我只是将接收到的消息打印出来,然后调用 in.writeBytes() 向 Client 发送一条消息作为回应,为了看清楚它的运行过程,所以加了 Thread.sleep(1000) 这一句。

channelRead() 方法结束后意味着数据处理完毕,这时 channelReadComplete() 方法会被调起,用户可以根据需要在这里加一些其它逻辑。

如果服务器端运行出现异常,比如连接意外中断,那么 exceptionCaught() 会被调用。

编写客户端

客户端的创建步骤与服务端是一样的,不同的是这里用的是 Bootstrap 类,而且只需要一个线程池:

package com.kxdmmr.demo.netty;
 
import io.netty.bootstrap.Bootstrap;
...
 
public class TestClient {
 
    static final String HOST = System.getProperty("host", "127.0.0.1");
    static final int PORT = Integer.parseInt(System.getProperty("port", "8081"));
    static final int SIZE = Integer.parseInt(System.getProperty("size", "128"));
 
    public static void main(String[] args) throws Exception {
         
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            // 创建客户端
            Bootstrap b = new Bootstrap();
            // 配置客户端
            b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true)
            .handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ChannelPipeline p = ch.pipeline();
                    // p.addLast(new LoggingHandler(LogLevel.INFO));
                    p.addLast(new TestClientHandler());
                }
            });
 
            // 启动客户端
            ChannelFuture f = b.connect(HOST, PORT).sync();
             
            // 在连接关闭之前一直等待
            f.channel().closeFuture().sync();
        } finally {
            // 停掉所有线程
            group.shutdownGracefully();
        }
    }
}

客户端同样用到了一个自定义的类:TestClientHandler,用来处理服务端发来的消息,其定义如下:

package com.kxdmmr.demo.netty;
 
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
 
public class TestClientHandler extends ChannelInboundHandlerAdapter {
 
    private final ByteBuf firstMessage;
 
    /**
     * 创建一个客户端的处理器并初始化数据
     */
    public TestClientHandler() {
        firstMessage = Unpooled.copiedBuffer("I am client!".getBytes());
    }
 
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        ctx.writeAndFlush(firstMessage);
    }
 
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws InterruptedException {
        ByteBuf in = (ByteBuf) msg;
        while (in.isReadable()) 
            System.out.print((char) in.readByte());
        System.out.println();
        System.out.flush();
         
        in.writeBytes("I am client!".getBytes());
         
        Thread.sleep(1000);
         
        ctx.write(msg);
    }
 
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
       ctx.flush();
    }
 
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

它与服务端唯一不同的是,比服务端多了一个方法 channelActive(),其它三个方法与服务端作用是一样的,channelActive() 是用来在启动客户端时首次向服务端发送消息,也就是:”I am client!“。

-End-


编写日期:2017-07-02