之前的文章Netty基础篇:Netty是什么?介绍了传统IO编程存在的问题,及Java NIO编程在解决IO编程的问题中的局限性,由此引出IO编程问题的理想解决方案——Netty。在上篇文章中简单展示了Netty的基本使用,本篇文章通过一个Netty服务端的demo来了解一下Netty的基本组件。
1. Netty服务端
public class EchoServer {
private final int port;
public EchoServer(int port) {
this.port = port;
}
public static void main(String[] args) throws InterruptedException {
new EchoServer(8888).start();
}
public void start() throws InterruptedException {
//创建EventLoopGroup,处理事件
EventLoopGroup boss = new NioEventLoopGroup();
EventLoopGroup worker = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(boss, worker)
//指定所使用的NIO传输 Channel
.channel(NioServerSocketChannel.class)
//使用指定的端口设置套接字地址
.localAddress(new InetSocketAddress(port))
//设置ChannelHandler
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) {
ChannelPipeline channelPipeline = socketChannel.pipeline();
channelPipeline.addLast(new HttpServerCodec());
channelPipeline.addLast(new HttpObjectAggregator(10 * 1024 * 1024));
channelPipeline.addLast(new NettyServerHandler());
}
});
//异步的绑定服务器,调用sync()方法阻塞等待直到绑定完成
ChannelFuture future = b.bind().sync();
future.channel().closeFuture().sync();
} finally {
//关闭EventLoopGroup,释放所有的资源
boss.shutdownGracefully().sync();
worker.shutdownGracefully().sync();
}
}
}
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
/**
* 收到客户端请求,返回信息
* @param ctx
* @param msg
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
String result;
if (!(msg instanceof FullHttpRequest)) {
result = "未知请求!";
send(ctx, result, HttpResponseStatus.BAD_REQUEST);
return;
}
FullHttpRequest httpRequest = (FullHttpRequest) msg;
try {
String path = httpRequest.uri(); //获取路径
String body = getBody(httpRequest); //获取参数
HttpMethod method = httpRequest.method();//获取请求方法
System.out.println("接收到:" + method + " 请求");
//如果是GET请求
if (HttpMethod.GET.equals(method)) {
System.out.println("body:" + body);
result = "GET请求";
send(ctx, result, HttpResponseStatus.OK);
return;
}
//如果是POST请求
if (HttpMethod.POST.equals(method)) {
System.out.println("body:" + body);
result = "POST请求";
send(ctx, result, HttpResponseStatus.OK);
return;
}
//如果是PUT请求
if (HttpMethod.PUT.equals(method)) {
System.out.println("body:" + body);
result = "PUT请求";
send(ctx, result, HttpResponseStatus.OK);
return;
}
//如果是DELETE请求
if (HttpMethod.DELETE.equals(method)) {
System.out.println("body:" + body);
result = "DELETE请求";
send(ctx, result, HttpResponseStatus.OK);
}
} catch (Exception e) {
System.out.println("处理请求失败!");
e.printStackTrace();
} finally {
//释放请求
httpRequest.release();
}
}
/**
* 获取body参数
*
* @param request
* @return
*/
private String getBody(FullHttpRequest request) {
ByteBuf buf = request.content();
return buf.toString(CharsetUtil.UTF_8);
}
/**
* 发送的返回值
*
* @param ctx 返回
* @param context 消息
* @param status 状态
*/
private void send(ChannelHandlerContext ctx, String context, HttpResponseStatus status) {
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, status, Unpooled.copiedBuffer(context, CharsetUtil.UTF_8));
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=UTF-8");
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}
/**
* 建立连接时,返回消息
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("连接的客户端地址:" + ctx.channel().remoteAddress());
ctx.writeAndFlush("客户端" + InetAddress.getLocalHost().getHostName() + "成功与服务端建立连接! ");
super.channelActive(ctx);
}
}
当启动main方法,浏览器访问127.0.0.1:8888/hello,可以在浏览器获取响应结果“GET请求”,服务端可以看到请求日志:
接收到:GET请求
body:
说明Netty实现了服务端并正常运行,下面我们结合上面的demo来看一下Netty的基础组件。
2. Netty基础组件
根据demo,我们可以总结出Netty服务端启动流程:
- 创建 ServerBootStrap实例
- 设置EventLoopGroup线程池
- 通过ServerBootStrap的channel方法设置Channel类型,会在bind方法调用后根据该类型初始化Channel
- 绑定Socket访问地址
- 设置ChannelHandler(通过ChannelPipeline将ChannelHandler组织为一个逻辑链)
- 调用bind方法启动服务端
该流程主要涉及如下图所示的几个Netty组件:
2.1 BootStrap
BootStrap是Netty提供的启动辅助类,帮助Netty客户端或服务端的Netty初始化,客户端对应的是Bootstrap类,服务端对应的是 ServerBootStrap引导类。
2.2 Channel
Channel是Netty中的网络操作抽象类,对应JDK底层的Socket,它除了包含基本的I/O操作,如 bind()、connect()、read()、write()之外,还包括了Netty框架相关的一些功能,如获取 Channel的EventLoop。
2.3 EventLoop & EventLoopGroup
EventLoop定义了Netty的核心抽象,用于处理连接的生命周期中所发生的事件。EventLoop 为Channel处理I/O操作,下图是 Channel,EventLoop,Thread以及EventLoopGroup之间的关系(摘自《Netty In Action》):
它们之间的关系是:
- 一个EventLoopGroup 包含一个或者多个EventLoop
- 一个 EventLoop 在它的生命周期内只和一个Thread绑定
- 所有由 EventLoop处理的 I/O事件都将在它专有的Thread上被处理
- 一个 Channel 在它的生命周期内只注册一个EventLoop
- 一个 EventLoop 可能会被分配给一个或多个 Channel
EventLoopGroup实际上就是处理I/O操作的线程池,负责为每个新注册的Channel分配一个EventLoop,Channel在整个生命周期都有其绑定的 EventLoop来服务。
而上面服务端用的 NioEventLoop 就是 EventLoop的一个重要实现类,NioEventLoop 是Netty内部的I/O线程,而 NioEventLoopGroup是拥有 NioEventLoop的线程池,在Netty服务端中一般存在两个这样的NioEventLoopGroup线程池,一个 “Boss” 线程池,用于接收客户端连接,实际上该线程池中只有一个线程,一个 “Worker”线程池用于处理每个连接的读写。而Netty客户端只需一个线程池即可,主要用于处理连接中的读写操作。
2.4 ChannelHandler
ChannelHandler主要用于对出站和入站数据进行处理,它有两个重要的子接口:
- ChannelInboundHandler——处理入站数据
- ChannelOutboundHandler——处理出站数据
2.5 ChannelPipeline
ChannelPipeline是ChannelHandler的容器,通过ChannelPipeline可以将ChannelHandler组织成一个逻辑链,该逻辑链可以用来拦截流经Channel的入站和出站事件,当 Channel被创建时,它会被自动地分配到它的专属的 ChannelPipeline。
当一个消息或者任何其他的入站事件被读取时,那么它会从 ChannelPipeline的头部开始流动,并被传递给第一个 ChannelInboundHandler,第一个处理完成之后传递给下一个 ChannelInboundHandler,一直到ChannelPipeline的尾端,与之对应的是,当数据被写出时,数据从 ChannelOutboundHandler 链的尾端开始流动,直到它到达链的头部为止。
2.6 ChannelOption
ChannelOption用于对Channel设置TCP层面通用参数,比如TCP长连接设置,比如可以通过如下代码实现TCP层面的keepAlive机制:
//设置TCP的长连接,默认的keepAlive的心跳时间是两个小时
.option(ChannelOption.SO_KEEPALIVE, true)
2.7 ChannelFuture
ChannelFuture用于获取异步IO的处理结果,其 addListener()
方法注册了一个 ChannelFutureListener,以便在某个操作完成时(无论是否成功)得到通知。比如可以通过如下方式实现Netty客户端断线重连:
//客户端断线重连逻辑
ChannelFuture future = bootstrap.connect();
future.addListener((ChannelFutureListener) future1 -> {
if (future1.isSuccess()) {
log.info("连接Netty服务端成功");
} else {
log.info("连接失败,进行断线重连");
future1.channel().eventLoop().schedule(() -> start(), 20, TimeUnit.SECONDS);
}
});
socketChannel = (SocketChannel) future.channel();
2.8 ByteBuf
在之前的文章中提到,Java IO编程使用字节流来处理输入输出,效率较差,Java NIO中使用内存块为单位进行数据处理。而ByteBuf就是字节缓冲区,用于高效处理输入输出。
参考链接: