Netty核心
一、概述
1、什么是Netty
Netty is an asynchronous event-driven network application framework for rapid development of maintainable high performance protocol servers & clients.
Netty是一个异步的、基于事件驱动的网络应用框架,用于快速开发可维护、高性能的网络服务器和客户端。
注意:
Netty的异步还是基于多路复用的,并没有实现真正意义上的异步IO
2、Netty的优势
如果使用传统NIO,其工作量大,Bug 多
- 需要自己构建协议
- 解决
TCP传输问题,如粘包、半包 - 因为
bug的存在,epoll空轮询导致CPU100%
Netty 对 API 进行增强,使之更易用,如
FastThreadLocal=>ThreadLocalByteBuf=>ByteBuffer
相比于其他网络应用框架
Mina由Apache维护,将来3.x版本可能会有较大重构,破坏API向下兼容性,Netty的开发迭代更迅速,API更简洁Netty久经考验,经历多年,很多Bug问题已经被修复,Netty版本的迭代过程如下:2.x 20043.x 20084.x 2013(常用)5.x 已废弃(没有明显的性能提升,维护成本高)
3、Netty的地位
Netty 在 Java 网络应用框架中的地位就好比 Spring 框架在 JavaEE 开发中的地位。
总之一句话:只要有网络通信需求的框架都用到了Netty。
4、Netty的作者
他还是另一个著名网络应用框架
Mina的重要贡献者
二、入门案例
1、需求
开发一个简单的服务器端和客户端:
- 客户端向服务器端发送
hello world - 服务器仅接收输出到控制台,不返回
添加依赖:
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.39.Final</version>
</dependency>
2、服务器端代码
public class HelloServer {
public static void main(String[] args) {
// 1、启动器,负责装配netty组件,启动服务器
new ServerBootstrap()
// 2、创建 NioEventLoopGroup,可以简单理解为 线程池 + Selector
.group(new NioEventLoopGroup())
// 3、选择服务器的 ServerSocketChannel 实现
.channel(NioServerSocketChannel.class)
// 4、child 负责处理读写,该方法决定了 child 执行哪些操作
// ChannelInitializer 处理器(仅执行一次)
// 它的作用是待客户端SocketChannel建立连接后,执行initChannel以便添加更多的处理器
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
// 5、SocketChannel的处理器,使用StringDecoder解码,ByteBuf=>String
nioSocketChannel.pipeline().addLast(new StringDecoder());
// 6、SocketChannel的业务处理,使用上一个处理器的处理结果
nioSocketChannel.pipeline().addLast(new SimpleChannelInboundHandler<String>() {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String s) throws Exception {
System.out.println(s);
}
});
}
// 7、ServerSocketChannel绑定8080端口
}).bind(8080);
}
}
- 第二步说明:
事件循环组中有多个事件循环对象,专门用于处理accept、read等事件,一个事件循环对象可以认为就是一个线程配合一个selector工作,可以管理多个channel。
public NioEventLoopGroup() {
//默认传入线程数为0
this(0);
}
public NioEventLoopGroup(int nThreads) {
//传入指定的线程数
this(nThreads, (Executor) null);
}
可以传入一个参数,代表线程数,如果不传入,也至少会保证一个线程。
private static final int DEFAULT_EVENT_LOOP_THREADS;
static {
//至少保证一个线程
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
//核心数的2倍作为线程数
"io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
if (logger.isDebugEnabled()) {
logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
}
}
/**
* @see MultithreadEventExecutorGroup#MultithreadEventExecutorGroup(int, Executor, Object...)
*/
//三元运算,如果是0,则使用默认的线程数(可以至少保证一个线程),否则使用用户指定的线程数
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
- 第三步说明:
选择服务器的ServerSocketChannel实现,上述选中了基于NIO的服务器实现,还有其他的实现,如下图
- 第四步说明:
方法名叫做chindHandler是因为接下来添加的处理器都是给 SocketChannel 用的,而不是给 ServerSocketChannel。
IDEA中重写方法快捷键:Control+O
3、客户端代码
public class HelloClient {
public static void main(String[] args) throws InterruptedException {
new Bootstrap()
.group(new NioEventLoopGroup())
// 选择客户 Socket 实现类,NioSocketChannel 表示基于 NIO 的客户端实现
.channel(NioSocketChannel.class)
// ChannelInitializer 处理器(仅执行一次)
// 它的作用是待客户端SocketChannel建立连接后,执行initChannel以便添加更多的处理器
.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel channel) throws Exception {
// 消息会经过通道 handler 处理,这里是将 String => ByteBuf 编码发出
channel.pipeline().addLast(new StringEncoder());
}
})
// 指定要连接的服务器和端口
.connect(new InetSocketAddress("localhost", 8080))
// Netty 中很多方法都是异步的,如 connect
// 这时需要使用 sync 方法等待 connect 建立连接完毕 【sync:同步】【async:异步】
.sync()
// 获取 channel 对象,它即为通道抽象,可以进行数据读写操作
.channel()
// 写入消息并清空缓冲区
.writeAndFlush("hello world");
}
}
- 第二步:
选择客户端的SocketChannel实现,上述选中了基于NIO的客户端实现,还有其他的实现,如下图:
IDEA中多开客户端:
4、运行流程
左:客户端 右:服务器端
- 服务器端启动之后,会绑定
8080端口并进行监听,同时启动事件循环组监听channel上的事件。 - 客户端启动之后,连接服务器端的
8080端口,服务器端的事件循环组监听到ServerSocketChannel上的accept事件并进行处理。 - 客户端会阻塞,直到连接建立之后,服务器端和客户端分别初始化
SocketChannel。 - 客户端获得
SocketChannel并且写入数据,客户端的处理器会将字符串转为ByteBuf进行传输,在SocketChannel中传输的都是ByteBuf数据。 - 服务器端的事件循环组监听到某个
SocketChannel的read事件,由某个EventLoop处理read事件,接收到了ByteBuf数据。 - 服务器端的处理器依次对接受到的数据进行处理。
5、组件解释
channel可以理解为数据的通道msg理解为流动的数据,最开始输入是ByteBuf,但经过pipeline中的各个handler加工,会变成其它类型对象,最后输出又变成ByteBufhandler可以理解为数据的处理工序- 工序有多道,合在一起就是
pipeline(流水线),pipeline负责发布事件(读、读取完成…)传播给每个handler,handler对自己感兴趣的事件进行处理(重写了相应事件处理方法)pipeline中有多个handler,处理时会依次调用其中的handler
handler分Inbound和Outbound两类Inbound入站Outbound出站
- 工序有多道,合在一起就是
eventLoop可以理解为处理数据的工人eventLoop可以管理多个channel的io操作,并且一旦eventLoop负责了某个channel,就会将其与channel进行绑定,以后该channel中的io操作都由该eventLoop负责eventLoop既可以执行io操作,也可以进行任务处理,每个eventLoop有自己的任务队列,队列里可以堆放多个channel的待处理任务,任务分为普通任务、定时任务eventLoop按照pipeline顺序,依次按照handler的规划(代码)处理数据,可以为每个handler指定不同的eventLoop
三、组件
1、EventLoop
事件循环对象
EventLoop
EventLoop 本质是一个单线程执行器(同时维护了一个 Selector),里面有 run 方法处理一个或多个 Channel 上源源不断的 io 事件
它的继承关系如下:
- 继承自
j.u.c.ScheduledExecutorService因此包含了线程池中所有的方法 - 继承自
Netty自己的OrderedEventExecutor- 提供了
boolean inEventLoop(Thread thread)方法判断一个线程是否属于此EventLoop - 提供了
EventLoopGroup parent()方法来看看自己属于哪个EventLoopGroup
- 提供了
事件循环组
EventLoopGroup
EventLoopGroup 是一组 EventLoop,Channel 一般会调用 EventLoopGroup 的 register 方法来绑定其中一个 EventLoop,后续这个 Channel 上的 io 事件都由此 EventLoop 来处理(保证了 io 事件处理时的线程安全)
- 继承自
Netty自己的EventExecutorGroup- 实现了
Iterable接口提供遍历EventLoop的能力 - 另有
next方法获取集合中下一个EventLoop
- 实现了
EventLoopGroup有多个类型,都继承自MultithreadEventLoopGroup(多线程事件循环组)抽象类。
一般使用NioEventLoopGroup(处理io事件、普通任务、定时任务),而DefaultEventLoopGroup只能处理普通任务和定时任务。
EventLoopGroup继承关系
1、处理普通任务和定时任务
public class TestEventLoop {
public static void main(String[] args) throws InterruptedException {
// 创建拥有两个EventLoop的NioEventLoopGroup,对应两个线程
EventLoopGroup group = new NioEventLoopGroup(2);
// 通过next方法可以获得下一个 EventLoop
System.out.println(group.next());
System.out.println(group.next());
System.out.println(group.next());
System.out.println(group.next());
// 通过EventLoop执行普通任务,可以用来执行耗时较长的任务
group.next().execute(() -> {
System.out.println(Thread.currentThread().getName() + "->执行普通任务");
});
// 通过EventLoop执行定时任务
group.next().scheduleAtFixedRate(() -> {
System.out.println(Thread.currentThread().getName() + "->执行定时任务");
}, 0, 1, TimeUnit.SECONDS);
//当前线程睡眠4秒钟,让EventLoop中的线程执行定时任务
Thread.sleep(4000);
// 优雅地关闭
group.shutdownGracefully();
}
}
执行结果:
io.netty.channel.nio.NioEventLoop@53e25b76
io.netty.channel.nio.NioEventLoop@73a8dfcc
io.netty.channel.nio.NioEventLoop@53e25b76
io.netty.channel.nio.NioEventLoop@73a8dfcc
nioEventLoopGroup-2-1->执行普通任务
nioEventLoopGroup-2-2->执行定时任务
nioEventLoopGroup-2-2->执行定时任务
nioEventLoopGroup-2-2->执行定时任务
nioEventLoopGroup-2-2->执行定时任务
nioEventLoopGroup-2-2->执行定时任务
通过上述结果可以看出NioEventLoopGroup中的线程数为2,所以有两个EventLoop,每一个EventLoop都是一个线程配合一个selector进行工作,并且通过next方法实现了简单的轮询。
关闭 EventLoopGroup
优雅关闭
shutdownGracefully方法,该方法会首先切换EventLoopGroup到关闭状态从而拒绝新的任务的加入,然后在任务队列的任务都处理完成后,停止线程的运行,从而确保整体应用是在正常有序的状态下退出的。
2、处理IO任务
服务器代码
public class MyServer {
public static void main(String[] args) {
new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//添加一个处理器
socketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//channel中接受到的数据
ByteBuf buf = (ByteBuf) msg;
//打印客户端发送过来的数据
System.out.println(Thread.currentThread().getName() + " " + buf.toString(StandardCharsets.UTF_8));
}
});
}
})
.bind(8080);
}
}
客户端代码
public class MyClient {
public static void main(String[] args) throws IOException, InterruptedException {
Channel channel = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//添加处理器:将字符串转为ByteBuf在channel中进行传输
socketChannel.pipeline().addLast(new StringEncoder());
}
})
.connect(new InetSocketAddress("localhost", 8080))
.sync()//阻塞直到建立连接
.channel();//获得当前客户端和服务器传输的channel
System.out.println(channel);
// 此处打断点调试,调用 channel.writeAndFlush(...);
System.in.read();
}
}
添加断点之后,向服务器发送数据,会发现服务器端收不到消息,这是因为IDEA中的断点默认会阻塞所有的线程,客户端代码中的main线程和NioEventLoopGroup中的线程都会被阻塞,服务器自然就收不到服务器端发送的数据了。正确的做法应该是可以阻塞main线程,其他的线程可以正常执行。
按照下图修改断点的设置,使其只能阻塞当前main线程,其他线程正常执行:
Breakpoint Suspend 参数介绍:在创建断点时有一个重要参数是Suspend。
Suspend:未勾选,程序运行到断点处并不会阻塞,而会继续执行后面的逻辑。
Suspend:勾选,代表程序运行到断点处会阻塞。
+ All:勾选,代表断点会阻塞所有线程。
+ Thread:勾选,代表断点只会阻塞当前线程。
[Suspend勾选,All勾选] 是默认值,所以才会出现 “Stop The World” 的可怕情况。
所以说,在多线程调试时,若你希望阻塞程序,最好选择 Thread 当前线程阻塞策略,这样就不会影响到其他线程的工作。
打开三个客户端分别给服务器发送数据,查看服务器端接收到的数据如下:
nioEventLoopGroup-2-3 我是客户端1的数据
nioEventLoopGroup-2-4 我是客户端2的数据
nioEventLoopGroup-2-5 我是客户端3的数据
//三个客户端分别被不同的线程处理IO事件,当服务器端的线程数用完之后,就会实现轮询,实现一个线程操作多个客户端,配合线程的selector会管理多个客户端的SocketChannel
nioEventLoopGroup-2-3 我是客户端1的数据
nioEventLoopGroup-2-3 我是客户端1的数据
nioEventLoopGroup-2-5 我是客户端3的数据
nioEventLoopGroup-2-5 我是客户端3的数据
nioEventLoopGroup-2-4 我是客户端2的数据
nioEventLoopGroup-2-4 我是客户端2的数据
//线程和客户端实现了绑定,对于某一个客户端,第一次由哪个线程处理的IO事件,以后的IO事件都会由该线程进行处理,保证了数据安全性
3、实现分工
ServerBootstrap的group()方法可以传入两个EventLoopGroup参数,分别负责处理不同的事件。
public class MyServer {
public static void main(String[] args) {
new ServerBootstrap()
// 两个Group,分别为Boss 负责Accept事件,Worker 负责读写事件
.group(new NioEventLoopGroup(1), new NioEventLoopGroup(2))
...
}
}
打开四个客户端分别给服务器发送数据,查看服务器端接收到的数据如下:
nioEventLoopGroup-3-1 客户端1的数据
nioEventLoopGroup-3-2 客户端2的数据
nioEventLoopGroup-3-1 客户端3的数据
nioEventLoopGroup-3-2 客户端4的数据
//group中的第2个参数设置了两个线程,该线程负责读写事件,并且在多客户端下实现了轮询,而第一个参数是负责客户端的accept事件
nioEventLoopGroup-3-1 客户端1的数据
nioEventLoopGroup-3-1 客户端3的数据
//实现了线程和客户端的绑定,保证了数据安全性
可以看出,一个EventLoop可以负责多个Channel,且EventLoop一旦与Channel绑定,则一直负责处理该Channel中的事件。
4、更换EventLoopGroup
当有的任务需要较长的时间处理时,可以使用非NioEventLoopGroup,避免同一个NioEventLoop中的其他Channel在较长的时间内都无法得到处理。
public class MyServer {
public static void main(String[] args) {
// 增加自定义的非NioEventLoopGroup
EventLoopGroup group = new DefaultEventLoopGroup(2);
new ServerBootstrap()
//一个Boss 负责accept事件 两个Worker 负责读写事件
.group(new NioEventLoopGroup(1), new NioEventLoopGroup(2))
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// 增加两个handler,第一个使用NioEventLoopGroup处理,第二个使用自定义EventLoopGroup处理
socketChannel.pipeline().addLast("NioHandler", new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println(Thread.currentThread().getName() + " " + buf.toString(StandardCharsets.UTF_8));
// 调用下一个handler
ctx.fireChannelRead(msg);
}
})
// 该handler绑定自定义的Group
.addLast(group, "MyDefaultHandler", new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
System.out.println(Thread.currentThread().getName() + " " + buf.toString(StandardCharsets.UTF_8));
}
});
}
})
.bind(8080);
}
}
打开四个客户端分别给服务器发送数据,查看服务器端接收到的数据如下:
nioEventLoopGroup-4-2 客户端1的数据
defaultEventLoopGroup-2-2 客户端1的数据
//客户端1
nioEventLoopGroup-4-1 客户端2的数据
defaultEventLoopGroup-2-1 客户端2的数据
//客户端2
nioEventLoopGroup-4-2 客户端3的数据
defaultEventLoopGroup-2-2 客户端3的数据
//客户端3
nioEventLoopGroup-4-1 客户端4的数据
defaultEventLoopGroup-2-1 客户端4的数据
//客户端4
nioEventLoopGroup-4-2 客户端1的数据
defaultEventLoopGroup-2-2 客户端1的数据
//客户端1
可以看出,客户端与服务器之间的read事件中,不同的handler被nioEventLoopGroup和defaultEventLoopGroup分别处理,下一次客户端继续发送消息,仍然由原来的线程操作IO事件和相应任务,保证了数据的安全性。
5、换工人原理
不同的EventLoopGroup切换的实现原理如下:当handler中绑定的Group不同时,需要切换Group来执行不同的任务。
关键代码 io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead()
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
// 获得下一个EventLoop, excutor 即为 EventLoopGroup
EventExecutor executor = next.executor();
// 如果下一个EventLoop 在当前的 EventLoopGroup中
if (executor.inEventLoop()) {
// 使用当前 EventLoopGroup 中的 EventLoop 来处理任务
next.invokeChannelRead(m);
} else {
// 否则让另一个 EventLoopGroup 中的 EventLoop 来创建任务并执行
executor.execute(new Runnable() {
public void run() {
next.invokeChannelRead(m);
}
});
}
}
- 如果两个 handler 绑定的是同一个EventLoopGroup,那么就直接调用
- 否则,把要调用的代码封装为一个任务对象,由下一个 handler 的 EventLoopGroup 来调用
2、Channel
Channel 的常用方法:
- close() 可以用来关闭Channel
- closeFuture() 用来处理 Channel 的关闭
- sync 方法作用是同步等待 Channel 关闭
- 而 addListener 方法是异步等待 Channel 关闭
- pipeline() 方法用于添加处理器
- write() 方法将数据写入
- 因为缓冲机制,数据被写入到 Channel 中以后,不会立即被发送
- 只有当缓冲满了或者调用了flush()方法后,才会将数据通过 Channel 发送出去
- writeAndFlush() 方法将数据写入并立即发送(刷出)
1、ChannelFuture
连接问题
客户端获得SocketChannel和发送数据的操作是在main线程中执行还是在NIO线程(NioEventLoop 中的线程)中执行?
拆分客户端代码
public class MyClient {
public static void main(String[] args) throws IOException, InterruptedException {
ChannelFuture channelFuture = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new StringEncoder());
}
})
// 该方法为异步非阻塞方法,主线程调用后不会被阻塞,真正去执行连接操作的是NIO线程
// NIO线程:NioEventLoop 中的线程
.connect(new InetSocketAddress("localhost", 8080));
// 该方法用于等待连接真正建立
channelFuture.sync();
// 获取客户端-服务器之间的Channel对象
Channel channel = channelFuture.channel();
channel.writeAndFlush(Thread.currentThread().getName() + "->客户端数据");
System.in.read();
}
}
如果去掉channelFuture.sync()方法,会服务器无法收到main->客户端数据,这是因为建立连接(connect)的过程是异步非阻塞的,这就意味着不等连接建立,方法执行就返回了。若不通过sync()方法阻塞主线程,等待连接真正建立,这时通过 channelFuture.channel() 拿到的 Channel 对象,并不是真正与服务器建立好连接的 Channel,也就没法将信息正确的传输给服务器端。所以需要通过channelFuture.sync()方法,阻塞主线程,同步处理结果,等待连接真正建立好以后,再去获得 Channel 传递数据。使用该方法,获取 Channel 和发送数据的线程都是主线程。
下面还有一种addListener方法,用于异步获取建立连接后的 Channel 和发送数据,使得执行这些操作的线程是 NIO 线程:
public class MyClient {
public static void main(String[] args) throws IOException, InterruptedException {
ChannelFuture channelFuture = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new StringEncoder());
}
})
// 该方法为异步非阻塞方法,主线程调用后不会被阻塞,真正去执行连接操作的是NIO线程
// NIO线程:NioEventLoop 中的线程
.connect(new InetSocketAddress("localhost", 8080));
// 当connect方法执行完毕后,也就是连接真正建立后
// 会在NIO线程中调用operationComplete方法
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
Channel channel = channelFuture.channel();
channel.writeAndFlush(Thread.currentThread().getName() + "->客户端数据");
}
});
System.in.read();
}
}
通过这种方法可以**在NIO线程中获取 Channel 并发送数据,**而不是在主线程中执行这些操作。
2、CloseFuture
处理关闭
public class ReadClient {
public static void main(String[] args) throws InterruptedException {
// 创建EventLoopGroup,使用完毕后关闭
NioEventLoopGroup group = new NioEventLoopGroup();
ChannelFuture channelFuture = new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//将字符串转为ByteBuf进行传输
socketChannel.pipeline().addLast(new StringEncoder());
}
})
//建立连接是在NIO线程中执行
.connect(new InetSocketAddress("localhost", 8080));
//阻塞->等待连接建立
channelFuture.sync();
//获得客户端->服务器的SocketChannel
Channel channel = channelFuture.channel();
//系统输入
Scanner scanner = new Scanner(System.in);
// 创建一个线程用于输入并向服务器发送
new Thread(() -> {
while (true) {
String msg = scanner.next();
if ("q".equals(msg)) {
// 关闭操作是异步的,在NIO线程中执行
channel.close();
break;
}
channel.writeAndFlush(Thread.currentThread().getName() + "--->" + msg);
}
}, "InputThread").start();
// 获得closeFuture对象
ChannelFuture closeFuture = channel.closeFuture();
System.out.println("waiting close...");
// 同步等待NIO线程执行完close操作
closeFuture.sync();
// 关闭之后执行一些操作,可以保证执行的操作一定是在channel关闭以后执行的
System.out.println("SocketChannel关闭之后需要执行的操作...");
// 关闭EventLoopGroup
group.shutdownGracefully();
}
}
当我们要关闭channel时,可以调用channel.close()方法进行关闭。但是该方法也是一个异步方法。真正的关闭操作并不是在调用该方法的线程中执行的,而是在NIO线程中执行真正的关闭操作。如果想在channel真正关闭以后,执行一些额外的操作,可以选择以下两种方法来实现:
- 通过channel.closeFuture()方法获得对应的ChannelFuture对象,然后调用sync()方法阻塞执行操作的线程,等待channel真正关闭后,再执行其他操作,这些其他操作是在main线程中执行的。
// 获得closeFuture对象
ChannelFuture closeFuture = channel.closeFuture();
// 同步等待NIO线程执行完close操作
closeFuture.sync();
// 关闭之后执行一些操作,可以保证执行的操作一定是在channel关闭以后执行的
System.out.println(Thread.currentThread().getName());
- 调用closeFuture.addListener方法,添加close的后续操作,这些操作是在NIO线程中完成的。
closeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
// 等待channel关闭后才执行的操作
System.out.println(Thread.currentThread().getName());
// 关闭EventLoopGroup
group.shutdownGracefully();
}
});
3、Future&Promise
1、概念
Netty 中的 Future 与 JDK 中的 Future 同名,但是是两个接口,Netty 的 Future 继承自 JDK 的 Future,而 Promise 又对 Netty Future 进行了扩展。
- JDK Future 只能同步等待任务结束(或成功、或失败)才能得到结果
- Netty Future 可以同步等待任务结束得到结果,也可以异步方式得到结果,但都是要等任务结束
- Netty Promise 不仅有 Netty Future 的功能,而且脱离了任务独立存在,只作为两个线程间传递结果的容器
| 功能/名称 | JDK Future | Netty Future | Netty Promise |
|---|---|---|---|
| cancel | 取消任务 | - | - |
| isCanceled | 任务是否取消 | - | - |
| isDone | 任务是否完成,不能区分成功失败 | - | - |
| get | 获取任务结果,阻塞等待 | - | - |
| getNow | - | 获取任务结果,非阻塞,还未产生结果时返回 null | - |
| await | - | 等待任务结束,如果任务失败,不会抛异常,而是通过 isSuccess 判断 | - |
| sync | - | 等待任务结束,如果任务失败,抛出异常 | - |
| isSuccess | - | 判断任务是否成功 | - |
| cause | - | 获取失败信息,非阻塞,如果没有失败,返回null | - |
| addLinstener | - | 添加回调,异步接收结果 | - |
| setSuccess | - | - | 设置成功结果 |
| setFailure | - | - | 设置失败结果 |
2、JDK Future
public class JdkFuture {
public static void main(String[] args) throws ExecutionException, InterruptedException {
ThreadFactory factory = new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "JdkFuture");
}
};
// 创建线程池
ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(10), factory);
// 获得Future对象
Future<Integer> future = executor.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
TimeUnit.SECONDS.sleep(1);
return 50;
}
});
// 通过阻塞的方式,获得运行结果
System.out.println(future.get());
}
}
3、Netty Future
public class NettyFuture {
public static void main(String[] args) throws ExecutionException, InterruptedException {
NioEventLoopGroup group = new NioEventLoopGroup();
// 获得 EventLoop 对象
EventLoop eventLoop = group.next();
Future<Integer> future = eventLoop.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return 50;
}
});
// 主线程中获取结果
future.sync();//等待任务结束,同步结果
System.out.println(Thread.currentThread().getName() + "->获取结果");
System.out.println("getNow->" + future.getNow());
System.out.println("get->" + future.get());
// NIO线程中异步获取结果
future.addListener(new GenericFutureListener<Future<? super Integer>>() {
@Override
public void operationComplete(Future<? super Integer> future) throws Exception {
System.out.println(Thread.currentThread().getName() + "->获取结果");
System.out.println("getNow->" + future.getNow());
System.out.println("get->" + future.get());
}
});
}
}
Netty中的Future对象,可以通过EventLoop的sumbit()方法得到
- 可以通过Future对象的get方法,阻塞地获取返回结果
- 也可以通过getNow方法,获取结果,若还没有结果,则返回null,该方法是非阻塞的
- 还可以通过future.addListener方法,在Callable方法执行的线程中,异步获取返回结果
4、Netty Promise
Promise相当于一个容器,可以用于存放各个线程中的结果,然后让其他线程去获取该结果。
public class NettyPromise {
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 创建EventLoop
NioEventLoopGroup group = new NioEventLoopGroup();
EventLoop eventLoop = group.next();
// 创建Promise对象,用于存放结果
DefaultPromise<Integer> promise = new DefaultPromise<>(eventLoop);
new Thread(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
// 自定义线程向Promise中存放结果
promise.setSuccess(50);
}).start();
// 主线程从Promise中获取结果
System.out.println(Thread.currentThread().getName() + "->" + promise.get());
}
}
4、Handler&Pipeline
1、Pipeline
public class PipeLineServer {
public static void main(String[] args) {
new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
// 在socketChannel的pipeline中添加handler
// pipeline中handler是带有head与tail节点的双向链表,其实际结构为
// head <-> handler1 <-> ... <-> handler4 <->tail
// Inbound主要处理入站操作,一般为读操作,发生入站操作时会触发Inbound方法
// 入站时,handler是从head向后调用的
socketChannel.pipeline().addLast("handler1", new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(Thread.currentThread().getName() + " Inbound handler 1");
// 父类该方法内部会调用fireChannelRead
// 将数据传递给下一个handler
super.channelRead(ctx, msg);
}
});
socketChannel.pipeline().addLast("handler2", new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(Thread.currentThread().getName() + " Inbound handler 2");
// 执行write操作,使得Outbound的方法能够得到调用
socketChannel.writeAndFlush(ctx.alloc().buffer().writeBytes("Server...".getBytes(StandardCharsets.UTF_8)));
super.channelRead(ctx, msg);
}
});
// Outbound主要处理出站操作,一般为写操作,发生出站操作时会触发Outbound方法
// 出站时,handler的调用是从tail向前调用的
socketChannel.pipeline().addLast("handler3", new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println(Thread.currentThread().getName() + " Outbound handler 1");
super.write(ctx, msg, promise);
}
});
socketChannel.pipeline().addLast("handler4", new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println(Thread.currentThread().getName() + " Outbound handler 2");
super.write(ctx, msg, promise);
}
});
}
})
.bind(8080);
}
}
运行结果:
nioEventLoopGroup-2-2 Inbound handler 1
nioEventLoopGroup-2-2 Inbound handler 2
nioEventLoopGroup-2-2 Outbound handler 2
nioEventLoopGroup-2-2 Outbound handler 1
通过channel.pipeline().addLast(name, handler)添加handler时,记得给handler取名字。这样可以调用pipeline的addAfter、addBefore等方法更灵活地向pipeline中添加handler
handler需要放入channel的pipeline中,才能根据放入顺序来使用handler
- pipeline的结构是一个带有head与tail指针的双向链表,其中的节点为handler
- 要通过ctx.fireChannelRead(msg)等方法,将当前handler的处理结果传递给下一个handler
- 当有入站(Inbound)操作时,会从head开始向后调用handler,直到handler不是处理Inbound操作为止
- 当有出站(Outbound)操作时,会从tail开始向前调用handler,直到handler不是处理Outbound操作为止
具体结构如下:
调用顺序如下:
2、OutboundHandler
- socketChannel.writeAndFlush()
当handler中调用该方法进行写操作时,会触发Outbound操作,此时是从tail向前寻找OutboundHandler
- ctx.writeAndFlush()
当handler中调用该方法进行写操作时,会触发Outbound操作,此时是从当前handler向前寻找OutboundHandler
3、EmbeddedChannel
EmbeddedChannel可以用于测试各个handler,通过其构造函数按顺序传入需要测试的handler,然后调用对应的Inbound和Outbound方法即可。
public class TestEmbeddedChannel {
public static void main(String[] args) {
ChannelInboundHandlerAdapter h1 = new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("1");
super.channelRead(ctx, msg);
}
};
ChannelInboundHandlerAdapter h2 = new ChannelInboundHandlerAdapter() {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("2");
super.channelRead(ctx, msg);
}
};
ChannelOutboundHandlerAdapter h3 = new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("3");
super.write(ctx, msg, promise);
}
};
ChannelOutboundHandlerAdapter h4 = new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("4");
super.write(ctx, msg, promise);
}
};
// 用于测试Handler的Channel
EmbeddedChannel channel = new EmbeddedChannel(h1, h2, h3, h4);
// 执行Inbound操作
channel.writeInbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("测试Inbound".getBytes(StandardCharsets.UTF_8)));
// 执行Outbound操作
channel.writeOutbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("测试Outbound".getBytes(StandardCharsets.UTF_8)));
}
}
5、ByteBuf
1、调试工具类
public class ByteBufUtil {
public static void log(ByteBuf buffer) {
int length = buffer.readableBytes();
int rows = length / 16 + (length % 15 == 0 ? 0 : 1) + 4;
StringBuilder buf = new StringBuilder(rows * 80 * 2)
.append("read index:").append(buffer.readerIndex())
.append(" write index:").append(buffer.writerIndex())
.append(" capacity:").append(buffer.capacity())
.append(NEWLINE);
appendPrettyHexDump(buf, buffer);
System.out.println(buf.toString());
}
}
通过该工具类可以更为详细地查看ByteBuf中的内容。
2、创建
public class ByteBufStudy {
public static void main(String[] args) {
// 创建ByteBuf
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16);
ByteBufUtil.log(buffer);
// 向buffer中写入数据
StringBuilder sb = new StringBuilder();
for (int i = 0; i < 20; i++) {
sb.append("a");
}
buffer.writeBytes(sb.toString().getBytes(StandardCharsets.UTF_8));
// 查看写入结果
ByteBufUtil.log(buffer);
}
}
运行结果:
read index:0 write index:0 capacity:16
read index:0 write index:20 capacity:64
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 61 |aaaaaaaaaaaaaaaa|
|00000010| 61 61 61 61 |aaaa |
+--------+-------------------------------------------------+----------------+
ByteBuf通过ByteBufAllocator选择allocator并调用对应的buffer()方法来创建,默认使用直接内存作为ByteBuf,容量为256个字节,可以指定初始容量的大小。当ByteBuf的容量无法容纳所有数据时,ByteBuf会进行扩容操作。
如果在handler中创建ByteBuf,建议使用
ChannelHandlerContext ctx.alloc().buffer()来创建
3、直接内存与堆内存
通过该方法创建的ByteBuf,使用的是基于池化直接内存的ByteBuf
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16);
可以使用下面的代码来创建基于池化堆的 ByteBuf
ByteBuf buffer = ByteBufAllocator.DEFAULT.heapBuffer(16);
也可以使用下面的代码来创建基于池化直接内存的 ByteBuf
ByteBuf buffer = ByteBufAllocator.DEFAULT.directBuffer(16);
- 直接内存创建和销毁的代价昂贵,但读写性能高(少一次内存复制),适合配合池化功能一起用。
- 直接内存对 GC 压力小,因为这部分内存不受 JVM 垃圾回收的管理,但也要注意及时主动释放,否则会造成内存泄漏,最终可能会导致内存溢出。
验证
public class ByteBufStudy {
public static void main(String[] args) {
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16);
System.out.println(buffer.getClass());
buffer = ByteBufAllocator.DEFAULT.heapBuffer(16);
System.out.println(buffer.getClass());
buffer = ByteBufAllocator.DEFAULT.directBuffer(16);
System.out.println(buffer.getClass());
}
}
// 使用池化的直接内存
class io.netty.buffer.PooledUnsafeDirectByteBuf
// 使用池化的堆内存
class io.netty.buffer.PooledUnsafeHeapByteBuf
// 使用池化的直接内存
class io.netty.buffer.PooledUnsafeDirectByteBuf
4、池化与非池化
池化的最大意义在于可以重用 ByteBuf,优点有
- 没有池化,则每次都得创建新的 ByteBuf 实例,这个操作对直接内存代价昂贵,就算是堆内存,也会增加 GC 压力
- 有了池化,则可以重用池中 ByteBuf 实例,并且采用了与 jemalloc 类似的内存分配算法提升分配效率
- 高并发时,池化功能更节约内存,减少内存溢出的可能
池化功能是否开启,可以通过下面的系统环境变量来设置:
-Dio.netty.allocator.type={unpooled|pooled}
- 4.1 以后,非 Android 平台默认启用池化实现,Android 平台启用非池化实现
- 4.1 之前,池化功能还不成熟,默认是非池化实现
5、组成
ByteBuf主要有以下几个组成部分:
- 最大容量与当前容量
- 在构造ByteBuf时,可传入两个参数,分别代表初始容量和最大容量,若未传入第二个参数(最大容量),最大容量默认为
Integer.MAX_VALUE - 当ByteBuf容量无法容纳所有数据时,会进行扩容操作,若超出最大容量,会抛出
java.lang.IndexOutOfBoundsException异常
- 在构造ByteBuf时,可传入两个参数,分别代表初始容量和最大容量,若未传入第二个参数(最大容量),最大容量默认为
- 读写操作不同于ByteBuffer只用position进行控制,ByteBuf分别由读指针和写指针两个指针控制,进行读写操作时,无需进行模式的切换
- 读指针前的部分被称为废弃部分,是已经读过的内容
- 读指针与写指针之间的空间称为可读部分
- 写指针与当前容量之间的空间称为可写部分
6、写入
| 方法签名 | 含义 | 备注 |
|---|---|---|
| writeBoolean(boolean value) | 写入 boolean 值 | 用一字节 01|00 代表 true|false |
| writeByte(int value) | 写入 byte 值 | |
| writeShort(int value) | 写入 short 值 | |
| writeInt(int value) | 写入 int 值 | Big Endian,即 0x250,写入后 00 00 02 50 |
| writeIntLE(int value) | 写入 int 值 | Little Endian,即 0x250,写入后 50 02 00 00 |
| writeLong(long value) | 写入 long 值 | |
| writeChar(int value) | 写入 char 值 | |
| writeFloat(float value) | 写入 float 值 | |
| writeDouble(double value) | 写入 double 值 | |
| writeBytes(ByteBuf src) | 写入 netty 的 ByteBuf | |
| writeBytes(byte[] src) | 写入 byte[] | |
| writeBytes(ByteBuffer src) | 写入 nio 的 ByteBuffer | |
| int writeCharSequence(CharSequence sequence, Charset charset) | 写入字符串 |
- 这些方法的未指明返回值的,其返回值都是 ByteBuf,意味着可以链式调用来写入不同的数据
- 网络传输中,默认习惯是 Big Endian,使用 writeInt(int value)
public class ByteBufStudy {
public static void main(String[] args) {
// 创建ByteBuf
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16, 20);
ByteBufUtil.log(buffer);
// 向buffer中写入数据
buffer.writeBytes(new byte[]{1, 2, 3, 4});
ByteBufUtil.log(buffer);
buffer.writeInt(5);
ByteBufUtil.log(buffer);
buffer.writeIntLE(6);
ByteBufUtil.log(buffer);
buffer.writeLong(7);
ByteBufUtil.log(buffer);
}
}
read index:0 write index:0 capacity:16
read index:0 write index:4 capacity:16
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 |.... |
+--------+-------------------------------------------------+----------------+
read index:0 write index:8 capacity:16
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 00 00 00 05 |........ |
+--------+-------------------------------------------------+----------------+
read index:0 write index:12 capacity:16
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 00 00 00 05 06 00 00 00 |............ |
+--------+-------------------------------------------------+----------------+
read index:0 write index:20 capacity:20
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 00 00 00 05 06 00 00 00 00 00 00 00 |................|
|00000010| 00 00 00 07 |.... |
+--------+-------------------------------------------------+----------------+
还有一类方法是 set 开头的一系列方法,也可以写入数据,但不会改变写指针位置。
7、扩容
当ByteBuf中的容量无法容纳写入的数据时,会进行扩容操作。
public class ByteBufStudy {
public static void main(String[] args) {
// 创建ByteBuf
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(8);
ByteBufUtil.log(buffer);
// 向buffer中写入数据
buffer.writeBytes(new byte[]{1, 2, 3, 4});
ByteBufUtil.log(buffer);
//空间正好够8个字节
buffer.writeInt(5);
ByteBufUtil.log(buffer);
//继续添加会实现自动扩容,主要不指定最大容量,就会默认是Integer.MAX_VALUE
buffer.writeInt(5);
ByteBufUtil.log(buffer);
}
}
read index:0 write index:0 capacity:8
read index:0 write index:4 capacity:8
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 |.... |
+--------+-------------------------------------------------+----------------+
read index:0 write index:8 capacity:8
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 00 00 00 05 |........ |
+--------+-------------------------------------------------+----------------+
read index:0 write index:12 capacity:16
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 00 00 00 05 00 00 00 05 |............ |
+--------+-------------------------------------------------+----------------+
扩容规则
不指定初始化容量大小,默认值是256字节(
ByteBufAllocator.DEFAULT.directBuffer())如果写入后数据大小未超过 512 字节,则选择下一个 16 的整数倍进行扩容
- 例如写入后大小为 12 字节,则扩容后 capacity 是 16 字节
如果写入后数据大小超过 512 字节,则选择下一个 2的N次方的整数进行扩容
- 例如写入后大小为 513 字节,则扩容后 capacity 是 2的十次方,也就是1024 字节
扩容不能超过 maxCapacity,否则会抛出
java.lang.IndexOutOfBoundsException异常
8、读取
读取主要是通过一系列read方法进行读取,读取时会根据读取数据的字节数移动读指针。如果需要重复读取,需要调用buffer.markReaderIndex()对读指针进行标记,并通过buffer.resetReaderIndex()将读指针恢复到mark标记的位置。
public class ByteBufStudy {
public static void main(String[] args) {
// 创建ByteBuf
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16, 20);
// 向buffer中写入数据
buffer.writeBytes(new byte[]{1, 2, 3, 4});
buffer.writeInt(5);
ByteBufUtil.log(buffer);
// 读取4个字节
System.out.println(buffer.readByte());
System.out.println(buffer.readByte());
System.out.println(buffer.readByte());
System.out.println(buffer.readByte());
ByteBufUtil.log(buffer);
// 通过mark与reset实现重复读取
buffer.markReaderIndex();
System.out.println(buffer.readInt());
ByteBufUtil.log(buffer);
// 恢复到mark标记处
buffer.resetReaderIndex();
ByteBufUtil.log(buffer);
}
}
//初始状态
read index:0 write index:8 capacity:16
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 01 02 03 04 00 00 00 05 |........ |
+--------+-------------------------------------------------+----------------+
//读取前4个字节的数据
1
2
3
4
//读取后Bytebuf的状态->源代码中对此状态进行了标记(快照)
read index:4 write index:8 capacity:16
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 00 00 05 |.... |
+--------+-------------------------------------------------+----------------+
//读取出4个字节的数据
5
//打印当前ByteBuf的状态->已经没有数据可读了
read index:8 write index:8 capacity:16
//源代码对标记进行恢复,可以实现重复读
read index:4 write index:8 capacity:16
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 00 00 05 |.... |
+--------+-------------------------------------------------+----------------+
还有以 get 开头的一系列方法,这些方法不会改变读指针的位置。
9、释放
10、切片
11、优势
四、应用
1、粘包与半包
1、粘包现象
服务器端代码
public class StudyServer {
void start() {
NioEventLoopGroup boss = new NioEventLoopGroup(1);
NioEventLoopGroup worker = new NioEventLoopGroup(2);
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.group(boss, worker);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("---channelActive---");
// 连接建立时会执行该方法
super.channelActive(ctx);
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("---channelInactive---");
// 连接断开时会执行该方法
super.channelInactive(ctx);
}
});
}
});
ChannelFuture channelFuture = serverBootstrap.bind(8080);
channelFuture.sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
public static void main(String[] args) {
new StudyServer().start();
}
}
客户端代码
public class StudyClient {
public static void main(String[] args) {
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(worker);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
//连接建立之后立即执行
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 每次发送16个字节的数据,共发送10次
for (int i = 0; i < 10; i++) {
ByteBuf buffer = ctx.alloc().buffer();
buffer.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15});
//通过channel向服务器发送消息
ctx.writeAndFlush(buffer);
}
//关闭连接
ctx.channel().close();
}
});
}
});
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
worker.shutdownGracefully();
}
}
}
服务器端接收到的数据
10:53:12.970 [nioEventLoopGroup-3-1] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x3805c5dc, L:/127.0.0.1:8080 - R:/127.0.0.1:59105] REGISTERED
10:53:12.970 [nioEventLoopGroup-3-1] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x3805c5dc, L:/127.0.0.1:8080 - R:/127.0.0.1:59105] ACTIVE
//服务器连接建立之后执行了channelActive
---channelActive---
//一次性读取了160B的数据
10:53:12.993 [nioEventLoopGroup-3-1] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x3805c5dc, L:/127.0.0.1:8080 - R:/127.0.0.1:59105] READ: 160B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
|00000010| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
|00000020| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
|00000030| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
|00000040| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
|00000050| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
|00000060| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
|00000070| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
|00000080| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
|00000090| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
+--------+-------------------------------------------------+----------------+
//读取完成
10:53:12.994 [nioEventLoopGroup-3-1] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x3805c5dc, L:/127.0.0.1:8080 - R:/127.0.0.1:59105] READ COMPLETE
//客户端关闭连接
10:53:12.996 [nioEventLoopGroup-3-1] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x3805c5dc, L:/127.0.0.1:8080 ! R:/127.0.0.1:59105] INACTIVE
//服务器执行channelInactive
---channelInactive---
10:53:12.996 [nioEventLoopGroup-3-1] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x3805c5dc, L:/127.0.0.1:8080 ! R:/127.0.0.1:59105] UNREGISTERED
可见虽然客户端是分别以16字节为单位,通过channel向服务器发送了10次数据,可是服务器端却只接收了一次,接收数据的大小为160B,即客户端发送的数据总大小,这就是粘包现象(将多个消息粘到了一起)。
2、半包现象
调整服务器端接收缓冲区的大小:
// 调整系统的接收缓冲区(滑动窗口)->一般不需要调整,因为其影响的是Netty读取的最小单位,一般都是修改Netty的接收缓冲区大小,默认是1024字节
// serverBootstrap.option(ChannelOption.SO_RCVBUF, 10);
// 调整 Netty 的接收缓冲区(byteBuf)
// serverBootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(16, 16, 16));
在测试粘包现象时,在服务器端添加了执行器:
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { System.out.println("---channelActive---"); // 连接建立时会执行该方法 super.channelActive(ctx); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { System.out.println("---channelInactive---"); // 连接断开时会执行该方法 super.channelInactive(ctx); } });该执行器在测试半包及粘包现象时并没有帮助作用,只是方便在客户端建立连接之后立即发送数据,了解两个方法的作用之后可以在服务器代码中删掉。
服务器端代码
public class StudyServer {
void start() {
NioEventLoopGroup boss = new NioEventLoopGroup(1);
NioEventLoopGroup worker = new NioEventLoopGroup(2);
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.group(boss, worker);
//修改Netty 的接收缓冲区(byteBuf)大小
serverBootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(16, 16, 16));
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
}
});
ChannelFuture channelFuture = serverBootstrap.bind(8080);
channelFuture.sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
public static void main(String[] args) {
new StudyServer().start();
}
}
客户端代码
public class StudyClient {
public static void main(String[] args) {
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(worker);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//每次发送18B数据,发送3次
for (int i = 0; i < 3; i++) {
ByteBuf buffer = ctx.alloc().buffer();
buffer.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17});
ctx.writeAndFlush(buffer);
}
}
});
}
});
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
worker.shutdownGracefully();
}
}
}
服务器端接收到的数据:
11:51:02.438 [nioEventLoopGroup-3-2] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0xa7da10d5, L:/127.0.0.1:8080 - R:/127.0.0.1:52238] READ: 16B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
+--------+-------------------------------------------------+----------------+
11:51:02.438 [nioEventLoopGroup-3-2] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0xa7da10d5, L:/127.0.0.1:8080 - R:/127.0.0.1:52238] READ: 2B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 10 11 |.. |
+--------+-------------------------------------------------+----------------+
11:51:02.439 [nioEventLoopGroup-3-2] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0xa7da10d5, L:/127.0.0.1:8080 - R:/127.0.0.1:52238] READ: 16B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
+--------+-------------------------------------------------+----------------+
11:51:02.440 [nioEventLoopGroup-3-2] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0xa7da10d5, L:/127.0.0.1:8080 - R:/127.0.0.1:52238] READ: 16B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 10 11 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d |................|
+--------+-------------------------------------------------+----------------+
11:51:02.440 [nioEventLoopGroup-3-2] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0xa7da10d5, L:/127.0.0.1:8080 - R:/127.0.0.1:52238] READ: 4B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 0e 0f 10 11 |.... |
+--------+-------------------------------------------------+----------------+
客户端每次发送18字节的数据,但是由于服务器端接收缓冲区的大小为16字节,所以只能将剩下的数据在下一次发送过来,这样就将一条18字节完整的消息拆分成两次进行发送,这就是半包现象。
注意:
serverBootstrap.option(ChannelOption.SO_RCVBUF, 10)影响的底层接收缓冲区(即滑动窗口)大小,仅决定了 Netty 读取的最小单位,但是Netty 实际每次读取的一般是16的整数倍,Netty默认的缓冲区大小是1024字节,可以通过serverBootstrap.childOption进行设置。
3、原理分析
粘包
- 现象(多个消息被粘合到一起接收)
- 发送 abc def,接收 abcdef
- 原因
- 应用层
- 接收方 ByteBuf 设置太大(Netty 默认 1024)
- 传输层-网络层
- 滑动窗口:假设发送方 256 bytes 表示一个完整报文,但由于接收方处理不及时且窗口大小足够大(大于256 bytes),这 256 bytes 字节就会缓冲在接收方的滑动窗口中,当滑动窗口中缓冲了多个报文就会粘包
- Nagle 算法:会造成粘包
- 应用层
半包
- 现象(一个消息被分开接收)
- 发送 abcdef,接收 abc def
- 原因
- 应用层
- 接收方 ByteBuf 小于实际发送数据量
- 传输层-网络层
- 滑动窗口:假设接收方的窗口只剩了 128 bytes,发送方的报文大小是 256 bytes,这时接收方窗口中无法容纳发送方的全部报文,发送方只能先发送前 128 bytes,等待 ack 后才能发送剩余部分,这就造成了半包
- 数据链路层
- MSS 限制:当发送的数据超过 MSS 限制后,会将数据切分发送,就会造成半包
- 应用层
本质
发生粘包与半包现象的本质是因为 TCP 是流式协议,消息无边界。
4、解决方案
短链接方式即建立一次连接,发送一个消息,然后断开连接,这样连接建立到连接断开之间就是消息的边界,缺点效率太低
每一条消息采用固定长度,缺点浪费空间
每一条消息采用分隔符,例如 \n,缺点需要转义
每一条消息分为 head 和 body,head 中包含 body 的长度
1、短连接
客户端每次向服务器发送数据以后,就与服务器断开连接,此时的消息边界为连接建立到连接断开。此时如果服务器端接收缓冲区足够大,则不会发生粘包现象。但如果一次性数据发送过多,接收方无法一次性容纳所有数据,还是会发生半包现象,所以短链接无法解决半包现象。
服务器端代码
public class StudyServer {
void start() {
NioEventLoopGroup boss = new NioEventLoopGroup(1);
NioEventLoopGroup worker = new NioEventLoopGroup(2);
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.group(boss, worker);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
}
});
ChannelFuture channelFuture = serverBootstrap.bind(8080);
channelFuture.sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
public static void main(String[] args) {
new StudyServer().start();
}
}
客户端代码
public class StudyClient {
public static void main(String[] args) {
for (int i = 0; i < 4; i++) {
//将发送消息和断开连接封装成一个方法
sendMessage();
}
}
private static void sendMessage() {
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(worker);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//连接建立发送消息
ByteBuf buffer = ctx.alloc().buffer();
buffer.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15});
ctx.writeAndFlush(buffer);
//发送消息完毕直接关闭当前连接
ctx.channel().close();
}
});
}
});
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
worker.shutdownGracefully();
}
}
}
服务器端接收到的数据
12:34:19.748 [nioEventLoopGroup-3-2] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0xc66b0759, L:/127.0.0.1:8080 - R:/127.0.0.1:59142] READ: 16B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
+--------+-------------------------------------------------+----------------+
12:34:19.748 [nioEventLoopGroup-3-1] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0xab6e04cf, L:/127.0.0.1:8080 - R:/127.0.0.1:59141] READ: 16B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
+--------+-------------------------------------------------+----------------+
12:34:19.752 [nioEventLoopGroup-3-2] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0xc0916bf1, L:/127.0.0.1:8080 - R:/127.0.0.1:59144] READ: 16B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
+--------+-------------------------------------------------+----------------+
12:34:19.753 [nioEventLoopGroup-3-1] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x9d1db727, L:/127.0.0.1:8080 - R:/127.0.0.1:59143] READ: 16B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
+--------+-------------------------------------------------+----------------+
但是如果客户端一次性发送的数据过多,服务端的接收缓冲区较小,仍然会产生半包现象。
客户端代码修改
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//连接建立发送消息
ByteBuf buffer = ctx.alloc().buffer();
//一次性发送18B的数据
buffer.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17});
ctx.writeAndFlush(buffer);
//发送消息完毕直接关闭当前连接
ctx.channel().close();
}
});
}
服务器端代码修改
//Netty的缓冲区只能接收16字节的数据
serverBootstrap.childOption(ChannelOption.RCVBUF_ALLOCATOR, new AdaptiveRecvByteBufAllocator(16, 16, 16));
运行结果
//62253连接
12:53:58.783 [nioEventLoopGroup-3-1] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x86e77db1, L:/127.0.0.1:8080 - R:/127.0.0.1:62253] READ: 16B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
+--------+-------------------------------------------------+----------------+
//62254连接
12:53:58.783 [nioEventLoopGroup-3-2] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x293260ad, L:/127.0.0.1:8080 - R:/127.0.0.1:62254] READ: 16B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
+--------+-------------------------------------------------+----------------+
//62254连接剩余数据
12:53:58.785 [nioEventLoopGroup-3-2] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x293260ad, L:/127.0.0.1:8080 - R:/127.0.0.1:62254] READ: 2B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 10 11 |.. |
+--------+-------------------------------------------------+----------------+
//62253连接剩余数据
12:53:58.785 [nioEventLoopGroup-3-1] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x86e77db1, L:/127.0.0.1:8080 - R:/127.0.0.1:62253] READ: 2B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 10 11 |.. |
+--------+-------------------------------------------------+----------------+
12:53:58.786 [nioEventLoopGroup-3-1] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x5dddab8c, L:/127.0.0.1:8080 - R:/127.0.0.1:62255] READ: 16B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 01 02 03 04 05 06 07 08 09 0a 0b 0c 0d 0e 0f |................|
+--------+-------------------------------------------------+----------------+
12:53:58.788 [nioEventLoopGroup-3-1] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x5dddab8c, L:/127.0.0.1:8080 - R:/127.0.0.1:62255] READ: 2B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 10 11 |.. |
+--------+-------------------------------------------------+----------------+
从上面代码可以看出,数据并没有丢失,在一次连接中产生了半包现象,即一次连接中两次发送数据给服务器,这样服务器才得到了完整的数据。
2、固定长度消息解码器
io.netty.handler.codec.FixedLengthFrameDecoder
客户端和服务器约定一个最大长度,保证客户端每次发送的数据长度都不会大于该长度。若客户端发送数据长度不足则需要补齐至该长度,服务器接收数据时,将接收到的数据按照约定的最大长度进行拆分,即使发送过程中产生了粘包,也可以通过定长解码器将数据正确地进行拆分。服务端需要用到FixedLengthFrameDecoder对数据进行定长解码。
服务器端代码
public class StudyServer {
void start() {
NioEventLoopGroup boss = new NioEventLoopGroup(1);
NioEventLoopGroup worker = new NioEventLoopGroup(2);
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.group(boss, worker);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
//固定长度消息解码器-固定长度为10
ch.pipeline().addLast(new FixedLengthFrameDecoder(10));
//将解码器放在日志打印前面,确保消息被正常的拆分
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
}
});
ChannelFuture channelFuture = serverBootstrap.bind(8080);
channelFuture.sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
public static void main(String[] args) {
new StudyServer().start();
}
}
客户端代码
public class StudyClient {
public static void main(String[] args) {
sendMessage();
}
public static byte[] fill10Bytes(char c, int len) {
//约定消息固定长度为10
byte[] bytes = new byte[10];
Arrays.fill(bytes, (byte) '_');
for (int i = 0; i < len; i++) {
bytes[i] = (byte) c;
}
return bytes;
}
private static void sendMessage() {
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(worker);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//打印日志
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
//发送消息
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
// 会在连接 channel 建立成功后,会触发 active 事件
@Override
public void channelActive(ChannelHandlerContext ctx) {
ByteBuf buf = ctx.alloc().buffer();
char c = '0';
Random r = new Random();
for (int i = 0; i < 3; i++) {
byte[] bytes = fill10Bytes(c, r.nextInt(10) + 1);
c++;
buf.writeBytes(bytes);
}
//一次性发送
ctx.writeAndFlush(buf);
}
});
}
});
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
worker.shutdownGracefully();
}
}
}
服务器端接收到的数据
13:16:45.326 [nioEventLoopGroup-3-2] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x523feaa1, L:/127.0.0.1:8080 - R:/127.0.0.1:49480] READ: 10B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 30 30 5f 5f 5f 5f 5f 5f 5f 5f |00________ |
+--------+-------------------------------------------------+----------------+
13:16:45.326 [nioEventLoopGroup-3-2] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x523feaa1, L:/127.0.0.1:8080 - R:/127.0.0.1:49480] READ: 10B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 31 31 31 31 31 5f 5f 5f 5f 5f |11111_____ |
+--------+-------------------------------------------------+----------------+
13:16:45.326 [nioEventLoopGroup-3-2] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x523feaa1, L:/127.0.0.1:8080 - R:/127.0.0.1:49480] READ: 10B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 32 32 32 32 32 5f 5f 5f 5f 5f |22222_____ |
+--------+-------------------------------------------------+----------------+
客户端发送的数据
//出现了粘包,但是在服务器端可以正常划分出消息
13:16:45.323 [nioEventLoopGroup-2-1] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x8bf2c23f, L:/127.0.0.1:49480 - R:/127.0.0.1:8080] WRITE: 30B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 30 30 5f 5f 5f 5f 5f 5f 5f 5f 31 31 31 31 31 5f |00________11111_|
|00000010| 5f 5f 5f 5f 32 32 32 32 32 5f 5f 5f 5f 5f |____22222_____ |
+--------+-------------------------------------------------+----------------+
3、基于分隔符的消息解码器
io.netty.handler.codec.LineBasedFrameDecoder
io.netty.handler.codec.DelimiterBasedFrameDecoder
通过分隔符对数据进行拆分来解决粘包半包问题,可以通过LineBasedFrameDecoder(int maxLength)来拆分以\n或者\r\n为分隔符的数据,也可以通过DelimiterBasedFrameDecoder(int maxFrameLength, ByteBuf... delimiters)来自定义分隔符从而实现拆分数据(可以传入多个分隔符)。
两种解码器都需要传入数据的最大长度,若超出最大长度,会抛出
TooLongFrameException异常。
服务器端代码
public class StudyServer {
void start() {
NioEventLoopGroup boss = new NioEventLoopGroup(1);
NioEventLoopGroup worker = new NioEventLoopGroup(2);
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.group(boss, worker);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
//基于分隔符的消息解码器-如果读取了1024个字节还没有发现分隔符就会报异常
ch.pipeline().addLast(new LineBasedFrameDecoder(1024));
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
}
});
ChannelFuture channelFuture = serverBootstrap.bind(8080);
channelFuture.sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
public static void main(String[] args) {
new StudyServer().start();
}
}
客户端代码
public class StudyClient {
public static void main(String[] args) {
sendMessage();
}
//构造含有换行符的字符串
public static String makeString(char c, int len) {
StringBuilder sb = new StringBuilder(len + 2);
for (int i = 0; i < len; i++) {
sb.append(c);
}
sb.append("\n");
return sb.toString();
}
private static void sendMessage() {
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(worker);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//打印日志
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
//发送消息
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
// 会在连接 channel 建立成功后,会触发 active 事件
@Override
public void channelActive(ChannelHandlerContext ctx) {
ByteBuf buf = ctx.alloc().buffer();
char c = 'a';
Random r = new Random();
for (int i = 0; i < 3; i++) {
String sb = makeString(c, r.nextInt(5) + 1);
c++;
buf.writeBytes(sb.getBytes());
}
//一次性发送
ctx.writeAndFlush(buf);
}
});
}
});
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
worker.shutdownGracefully();
}
}
}
服务器端接收到的数据
13:37:01.039 [nioEventLoopGroup-3-2] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x0787dfae, L:/127.0.0.1:8080 - R:/127.0.0.1:52708] READ: 3B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 61 61 61 |aaa |
+--------+-------------------------------------------------+----------------+
13:37:01.039 [nioEventLoopGroup-3-2] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x0787dfae, L:/127.0.0.1:8080 - R:/127.0.0.1:52708] READ: 3B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 62 62 62 |bbb |
+--------+-------------------------------------------------+----------------+
13:37:01.039 [nioEventLoopGroup-3-2] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x0787dfae, L:/127.0.0.1:8080 - R:/127.0.0.1:52708] READ: 5B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 63 63 63 63 63 |ccccc |
+--------+-------------------------------------------------+----------------+
客户端发送的数据
13:37:01.036 [nioEventLoopGroup-2-1] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x795fc01d, L:/127.0.0.1:52708 - R:/127.0.0.1:8080] WRITE: 14B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 61 61 61 0a 62 62 62 0a 63 63 63 63 63 0a |aaa.bbb.ccccc. |
+--------+-------------------------------------------------+----------------+
从运行结果可以看到一个换行符占据一个字节。
使用自定义分隔符的消息解码器:
//固定长度消息解码器-固定长度为10 ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16); ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, buffer.writeBytes("\n".getBytes())));
4、基于长度字段的消息解码器
io.netty.handler.codec.LengthFieldBasedFrameDecoder
在传送数据时可以在数据中添加一个用于表示有用数据长度的字段。LengthFieldBasedFrameDecoder解码器可以提供更为丰富的拆分方法,其构造方法有五个参数:
public LengthFieldBasedFrameDecoder(
int maxFrameLength,
int lengthFieldOffset, int lengthFieldLength,
int lengthAdjustment, int initialBytesToStrip)
//具体参数的设置应该根据不同的协议进行分析
参数解析
maxFrameLength数据最大长度- 表示数据的最大长度(包括附加信息、长度标识等内容)
lengthFieldOffset长度字段的起始偏移量- 用于指明数据第几个字节开始是用于标识有用数据字节长度的,因为前面可能还有其他附加信息
lengthFieldLength长度字段所占字节数(用于指明有用数据的长度)- 数据中用于表示有用数据长度的标识所占的字节数
lengthAdjustment长度表示与有用数据的偏移量- 用于指明数据长度标识和有用数据之间的距离,因为两者之间还可能有附加信息
initialBytesToStrip数据读取起点- 读取起点,不读取0 到
initialBytesToStrip之间的数据
- 读取起点,不读取0 到
例子
//例子1->长度值仅代表有用数据的长度
<pre>
<b>lengthFieldOffset</b> = <b>0</b>
<b>lengthFieldLength</b> = <b>2</b>
lengthAdjustment = 0
initialBytesToStrip = 0 (= do not strip header)
BEFORE DECODE (14 bytes) AFTER DECODE (14 bytes)
+--------+----------------+ +--------+----------------+
| Length | Actual Content |----->| Length | Actual Content |
| 0x000C | "HELLO, WORLD" | | 0x000C | "HELLO, WORLD" |
+--------+----------------+ +--------+----------------+
</pre>
//例子2
<pre>
lengthFieldOffset = 0
lengthFieldLength = 2
lengthAdjustment = 0
<b>initialBytesToStrip</b> = <b>2</b> (= the length of the Length field)
BEFORE DECODE (14 bytes) AFTER DECODE (12 bytes)
+--------+----------------+ +----------------+
| Length | Actual Content |----->| Actual Content |
| 0x000C | "HELLO, WORLD" | | "HELLO, WORLD" |
+--------+----------------+ +----------------+
</pre>
//例子3->长度值仅代表所有数据的长度
<pre>
lengthFieldOffset = 0
lengthFieldLength = 2
<b>lengthAdjustment</b> = <b>-2</b> (= the length of the Length field)
initialBytesToStrip = 0
BEFORE DECODE (14 bytes) AFTER DECODE (14 bytes)
+--------+----------------+ +--------+----------------+
| Length | Actual Content |----->| Length | Actual Content |
| 0x000E | "HELLO, WORLD" | | 0x000E | "HELLO, WORLD" |
+--------+----------------+ +--------+----------------+
</pre>
//例子4
<pre>
<b>lengthFieldOffset</b> = <b>2</b> (= the length of Header 1)
<b>lengthFieldLength</b> = <b>3</b>
lengthAdjustment = 0
initialBytesToStrip = 0
BEFORE DECODE (17 bytes) AFTER DECODE (17 bytes)
+----------+----------+----------------+ +----------+----------+----------------+
| Header 1 | Length | Actual Content |----->| Header 1 | Length | Actual Content |
| 0xCAFE | 0x00000C | "HELLO, WORLD" | | 0xCAFE | 0x00000C | "HELLO, WORLD" |
+----------+----------+----------------+ +----------+----------+----------------+
</pre>
//例子5
<pre>
lengthFieldOffset = 0
lengthFieldLength = 3
<b>lengthAdjustment</b> = <b>2</b> (= the length of Header 1)
initialBytesToStrip = 0
BEFORE DECODE (17 bytes) AFTER DECODE (17 bytes)
+----------+----------+----------------+ +----------+----------+----------------+
| Length | Header 1 | Actual Content |----->| Length | Header 1 | Actual Content |
| 0x00000C | 0xCAFE | "HELLO, WORLD" | | 0x00000C | 0xCAFE | "HELLO, WORLD" |
+----------+----------+----------------+ +----------+----------+----------------+
</pre>
//例子6
<pre>
lengthFieldOffset = 1 (= the length of HDR1)
lengthFieldLength = 2
<b>lengthAdjustment</b> = <b>1</b> (= the length of HDR2)
<b>initialBytesToStrip</b> = <b>3</b> (= the length of HDR1 + LEN)
BEFORE DECODE (16 bytes) AFTER DECODE (13 bytes)
+------+--------+------+----------------+ +------+----------------+
| HDR1 | Length | HDR2 | Actual Content |----->| HDR2 | Actual Content |
| 0xCA | 0x000C | 0xFE | "HELLO, WORLD" | | 0xFE | "HELLO, WORLD" |
+------+--------+------+----------------+ +------+----------------+
</pre>
//例子7->长度值仅代表所有数据的长度
<pre>
lengthFieldOffset = 1
lengthFieldLength = 2
<b>lengthAdjustment</b> = <b>-3</b> (= the length of HDR1 + LEN, negative)
<b>initialBytesToStrip</b> = <b> 3</b>
BEFORE DECODE (16 bytes) AFTER DECODE (13 bytes)
+------+--------+------+----------------+ +------+----------------+
| HDR1 | Length | HDR2 | Actual Content |----->| HDR2 | Actual Content |
| 0xCA | 0x0010 | 0xFE | "HELLO, WORLD" | | 0xFE | "HELLO, WORLD" |
+------+--------+------+----------------+ +------+----------------+
</pre>
关于lengthAdjustment为什么可以是负数?参考:https://www.cnblogs.com/motianlong/p/14465098.html
图解
使用
public class TestLengthFieldDecoder {
public static void main(String[] args) {
EmbeddedChannel channel = new EmbeddedChannel(
new LengthFieldBasedFrameDecoder(
1024, 0, 4, 1, 5),
new LoggingHandler(LogLevel.DEBUG)
);
// 4 个字节的内容长度+Header+实际内容
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
build(buffer, "Hello, world");
build(buffer, "Hi!");
channel.writeInbound(buffer);
}
private static void build(ByteBuf buffer, String content) {
byte[] bytes = content.getBytes(); // 实际内容
int length = bytes.length; // 实际内容长度
//长度字段
buffer.writeInt(length);
//Header字段
buffer.writeByte(1);
//有效数据字段
buffer.writeBytes(bytes);
}
}
14:52:00 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] READ: 12B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 48 65 6c 6c 6f 2c 20 77 6f 72 6c 64 |Hello, world |
+--------+-------------------------------------------------+----------------+
14:52:00 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] READ: 3B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 48 69 21 |Hi! |
+--------+-------------------------------------------------+----------------+
2、协议设计与解析
1、协议的作用
协议的目的就是划定消息的边界,制定通信双方要共同遵守的通信规则。
2、Redis中的协议
和Redis服务器通信,需要按照Redis协议向服务器发送命令,在Redis中,向Redis服务器发送一条set name Nyima的指令,需要遵守如下协议:
// 该指令一共有3部分,每条指令之后都要添加回车与换行符
// 将命令看作是一个数组,第一个参数代表的是数组长度
*3\r\n
// 第一个指令的长度是3
$3\r\n
// 第一个指令是set指令
set\r\n
// 第二个指令的长度是4
$4\r\n
// 第二个指令是name
name\r\n
// 第三个指令的长度是5
$5\r\n
// 第三个指令是Nyima
Nyima\r\n
客户端代码
public class RedisClient {
public static void main(String[] args) {
NioEventLoopGroup group = new NioEventLoopGroup();
try {
ChannelFuture channelFuture = new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
// 打印日志
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
// 入站处理器
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
//连接建立的时候立即执行
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 回车与换行符
final byte[] LINE = {'\r', '\n'};
// 获得ByteBuf
ByteBuf buffer = ctx.alloc().buffer();
// 连接建立后,向Redis中发送一条指令,注意添加回车与换行
// set name Nyima
buffer.writeBytes("*3".getBytes());
buffer.writeBytes(LINE);
buffer.writeBytes("$3".getBytes());
buffer.writeBytes(LINE);
buffer.writeBytes("set".getBytes());
buffer.writeBytes(LINE);
buffer.writeBytes("$4".getBytes());
buffer.writeBytes(LINE);
buffer.writeBytes("name".getBytes());
buffer.writeBytes(LINE);
buffer.writeBytes("$5".getBytes());
buffer.writeBytes(LINE);
buffer.writeBytes("Nyima".getBytes());
buffer.writeBytes(LINE);
//一次性发送数据
ctx.writeAndFlush(buffer);
}
});
}
})
.connect(new InetSocketAddress("localhost", 6379));
channelFuture.sync();
// 关闭channel
channelFuture.channel().close().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 关闭group
group.shutdownGracefully();
}
}
}
21:31:58.072 [nioEventLoopGroup-2-1] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x5f81273d, L:/127.0.0.1:56694 - R:localhost/127.0.0.1:6379] WRITE: 34B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 2a 33 0d 0a 24 33 0d 0a 73 65 74 0d 0a 24 34 0d |*3..$3..set..$4.|
|00000010| 0a 6e 61 6d 65 0d 0a 24 35 0d 0a 4e 79 69 6d 61 |.name..$5..Nyima|
|00000020| 0d 0a |.. |
+--------+-------------------------------------------------+----------------+
通过Redis客户端查看Redis中存储的数据如下:
通过上述客户端代码可以看出只要遵守Redis的协议向其服务器发送命令,服务器就可以解析该命令并执行。
3、HTTP协议
HTTP协议包含复杂的内容,自己实现较为困难,可以使用HttpServerCodec作为服务器端的解码器与编码器,来处理HTTP请求。
// HttpServerCodec 中既有请求的解码器 HttpRequestDecoder 又有响应的编码器 HttpResponseEncoder
// 一般带有Codec(CodeCombine) 就说明该类既作为 编码器 又作为 解码器
public final class HttpServerCodec extends CombinedChannelDuplexHandler<HttpRequestDecoder, HttpResponseEncoder>
implements HttpServerUpgradeHandler.SourceCodec
服务器端代码
public class HttpServer {
public static void main(String[] args) {
NioEventLoopGroup group = new NioEventLoopGroup();
new ServerBootstrap()
.group(group)
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
// 打印日志
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
// 作为服务器,使用 HttpServerCodec 作为编码器与解码器,既是入站又是出战
ch.pipeline().addLast(new HttpServerCodec());
// 解码器获得的是什么类型的数据(两种)
// ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
// @Override
// public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// System.out.println("---" + msg.getClass() + "---");
// //---class io.netty.handler.codec.http.DefaultHttpRequest---请求行,请求头
// //---class io.netty.handler.codec.http.LastHttpContent$1---请求体
// if (msg instanceof HttpRequest) {
// System.out.println("请求行、请求头的处理");
// } else if (msg instanceof HttpContent) {
// System.out.println("请求体的处理");
// }
// }
// });
// 服务器只处理HTTPRequest-使用SimpleChannelInboundHandler传入泛型
ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, HttpRequest msg) {
// 获得请求uri
System.out.println(msg.uri());
// 获得完整响应,设置版本号与状态码,进行返回
DefaultFullHttpResponse response = new DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK);
// 设置响应内容
byte[] bytes = "<h1>Hello, World!</h1>".getBytes(StandardCharsets.UTF_8);
// 设置响应体长度,避免浏览器一直接收响应内容
response.headers().setInt(CONTENT_LENGTH, bytes.length);
// 设置响应体
response.content().writeBytes(bytes);
// 写回响应
ctx.writeAndFlush(response);
}
});
}
})
.bind(8080);
}
}
服务器负责处理请求并响应浏览器,所以只需要处理HTTP请求头和请求行即可:
// 服务器只处理HTTPRequest
ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>()
获得请求后,需要返回响应给浏览器。需要创建响应对象DefaultFullHttpResponse,设置HTTP版本号及状态码,为避免浏览器获得响应后,因为未设置CONTENT_LENGTH而一直空转接收响应内容,所以需要添加CONTENT_LENGTH字段,表明响应体中数据的具体长度。
// 获得完整响应,设置版本号与状态码
DefaultFullHttpResponse response = new DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK);
// 设置响应内容
byte[] bytes = "<h1>Hello, World!</h1>".getBytes(StandardCharsets.UTF_8);
// 设置响应体长度,避免浏览器一直接收响应内容
response.headers().setInt(CONTENT_LENGTH, bytes.length);
// 设置响应体
response.content().writeBytes(bytes);
运行结果:
//浏览器请求
00:48.091 [nioEventLoopGroup-2-2] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x393520e9, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:61142] READ: 698B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 47 45 54 20 2f 20 48 54 54 50 2f 31 2e 31 0d 0a |GET / HTTP/1.1..|
|00000010| 48 6f 73 74 3a 20 6c 6f 63 61 6c 68 6f 73 74 3a |Host: localhost:|
//请求路径如下
/
//请求的响应内容
22:00:48.092 [nioEventLoopGroup-2-2] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x393520e9, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:61142] WRITE: 61B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 48 54 54 50 2f 31 2e 31 20 32 30 30 20 4f 4b 0d |HTTP/1.1 200 OK.|
|00000010| 0a 63 6f 6e 74 65 6e 74 2d 6c 65 6e 67 74 68 3a |.content-length:|
|00000020| 20 32 32 0d 0a 0d 0a 3c 68 31 3e 48 65 6c 6c 6f | 22....<h1>Hello|
|00000030| 2c 20 57 6f 72 6c 64 21 3c 2f 68 31 3e |, World!</h1> |
+--------+-------------------------------------------------+----------------+
//浏览器请求->浏览器自动对favicon的请求,与用户的主动请求无关,浏览器自动执行的
22:00:48.178 [nioEventLoopGroup-2-2] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x393520e9, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:61142] READ: 598B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 47 45 54 20 2f 66 61 76 69 63 6f 6e 2e 69 63 6f |GET /favicon.ico|
|00000010| 20 48 54 54 50 2f 31 2e 31 0d 0a 48 6f 73 74 3a | HTTP/1.1..Host:|
//请求路径如下
/favicon.ico
//请求的响应内容
22:00:48.179 [nioEventLoopGroup-2-2] DEBUG io.netty.handler.logging.LoggingHandler - [id: 0x393520e9, L:/0:0:0:0:0:0:0:1:8080 - R:/0:0:0:0:0:0:0:1:61142] WRITE: 61B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 48 54 54 50 2f 31 2e 31 20 32 30 30 20 4f 4b 0d |HTTP/1.1 200 OK.|
|00000010| 0a 63 6f 6e 74 65 6e 74 2d 6c 65 6e 67 74 68 3a |.content-length:|
|00000020| 20 32 32 0d 0a 0d 0a 3c 68 31 3e 48 65 6c 6c 6f | 22....<h1>Hello|
|00000030| 2c 20 57 6f 72 6c 64 21 3c 2f 68 31 3e |, World!</h1> |
+--------+-------------------------------------------------+----------------+
4、自定义协议
1、组成要素
- 魔数:用来在第一时间判定接收的数据是否为无效数据包
- 版本号:可以支持协议的升级
- 序列化算法:消息正文到底采用哪种序列化及反序列化方式。如:Json、Protobuf、Hessian、Jdk
- 指令类型:是登录、注册、单聊、群聊… 跟业务相关的消息类型
- 请求序号:为了双工通信,提供异步能力
- 正文长度:有效数据的长度
- 消息正文:有效数据
2、自定义编码器与解码器
public class MessageCodec extends ByteToMessageCodec<Message> {
/**
* @Description: 对Message进行编码,编码为ByteBuf类型
* @Author: Mr.Tong
*/
@Override
public void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {
// 设置魔数 4个字节
out.writeBytes(new byte[]{'1', '2', '3', '4'});
// 设置版本号 1个字节
out.writeByte(1);
// 设置序列化方式 1个字节 Json->0 Jdk->1
out.writeByte(1);
// 设置指令类型 1个字节
out.writeByte(msg.getMessageType());
// 设置请求序号 4个字节
out.writeInt(msg.getSequenceId());
// 为了补齐为16个字节,填充1个字节的数据
out.writeByte(0xff);
// 获得序列化后的msg
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(msg);
byte[] bytes = bos.toByteArray();
// 获得并设置正文长度 长度用4个字节标识
out.writeInt(bytes.length);
// 设置消息正文
out.writeBytes(bytes);
}
/**
* @Description: 对ByteBuf进行解码,解码为Message
* @Author: Mr.Tong
*/
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
// 获取魔数
int magic = in.readInt();
// 获取版本号
byte version = in.readByte();
// 获得序列化方式
byte seqType = in.readByte();
// 获得指令类型
byte messageType = in.readByte();
// 获得请求序号
int sequenceId = in.readInt();
// 移除补齐字节
in.readByte();
// 获得正文长度
int length = in.readInt();
// 获得正文内容->进行反序列化
byte[] bytes = new byte[length];
in.readBytes(bytes, 0, length);
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
Message message = (Message) ois.readObject();
// 将信息放入List中,传递给下一个handler
out.add(message);
// 打印获得的信息正文
System.out.println("===========魔数===========");
System.out.println(magic);
System.out.println("===========版本号===========");
System.out.println(version);
System.out.println("===========序列化方法===========");
System.out.println(seqType);
System.out.println("===========指令类型===========");
System.out.println(messageType);
System.out.println("===========请求序号===========");
System.out.println(sequenceId);
System.out.println("===========正文长度===========");
System.out.println(length);
System.out.println("===========正文===========");
System.out.println(message);
}
}
- 编码器与解码器方法源于父类
ByteToMessageCodec,通过该类可以自定义编码器与解码器,泛型类型为被编码与被解码的类。此处使用了自定义类Message,代表消息。
//继承ByteToMessageCodec
public class MessageCodec extends ByteToMessageCodec<Message>
编码器负责将附加信息与正文信息写入到ByteBuf中,其中附加信息总字节数最好为2的N次方,不足需要补齐,正文内容如果为对象,需要通过序列化将其放入到ByteBuf中。
解码器负责将ByteBuf中的信息取出,并放入List中,该List用于将信息传递给下一个Handler。
编码测试类
public class TestMessageCodec {
public static void main(String[] args) throws Exception {
EmbeddedChannel channel = new EmbeddedChannel(
new LoggingHandler(),
//避免出现半包及粘包问题
new LengthFieldBasedFrameDecoder(
1024, 12, 4, 0, 0),
new MessageCodec()
);
LoginRequestMessage message = new LoginRequestMessage("tys", "2577297621");
//1、测试编码
channel.writeOutbound(message);
/* //2、测试解码
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();
new MessageCodec().encode(null, message, buf);//编码消息放入buf中
//buf切片模拟半包
ByteBuf s1 = buf.slice(0, 100);//零拷贝
ByteBuf s2 = buf.slice(100, buf.readableBytes() - 100);
s1.retain(); // 引用计数 2
channel.writeInbound(s1); // release 1 writeInbound方法会使得buf的引用计数为0,内存被释放掉
channel.writeInbound(s2);*/
}
}
测试编码执行结果:
22:52:14 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] WRITE: 216B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 31 32 33 34 01 01 00 00 00 00 00 ff 00 00 00 c8 |1234............|
|00000010| ac ed 00 05 73 72 00 25 63 6e 2e 69 74 63 61 73 |....sr.%cn.itcas|
|00000020| 74 2e 6d 65 73 73 61 67 65 2e 4c 6f 67 69 6e 52 |t.message.LoginR|
|00000030| 65 71 75 65 73 74 4d 65 73 73 61 67 65 a0 3f 71 |equestMessage.?q|
|00000040| cb 31 45 b5 88 02 00 02 4c 00 08 70 61 73 73 77 |.1E.....L..passw|
|00000050| 6f 72 64 74 00 12 4c 6a 61 76 61 2f 6c 61 6e 67 |ordt..Ljava/lang|
|00000060| 2f 53 74 72 69 6e 67 3b 4c 00 08 75 73 65 72 6e |/String;L..usern|
|00000070| 61 6d 65 71 00 7e 00 01 78 72 00 19 63 6e 2e 69 |ameq.~..xr..cn.i|
|00000080| 74 63 61 73 74 2e 6d 65 73 73 61 67 65 2e 4d 65 |tcast.message.Me|
|00000090| 73 73 61 67 65 3d dd 19 a0 bc 07 47 cb 02 00 02 |ssage=.....G....|
|000000a0| 49 00 0b 6d 65 73 73 61 67 65 54 79 70 65 49 00 |I..messageTypeI.|
|000000b0| 0a 73 65 71 75 65 6e 63 65 49 64 78 70 00 00 00 |.sequenceIdxp...|
|000000c0| 00 00 00 00 00 74 00 0a 32 35 37 37 32 39 37 36 |.....t..25772976|
|000000d0| 32 31 74 00 03 74 79 73 |21t..tys |
+--------+-------------------------------------------------+----------------+
//16个字节的附加信息,其后是跟着的有效数据信息,共200个字节,0XC8是十六进制,换算为十进制就是200个字节
测试解码执行结果:
22:54:43 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] READ: 100B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 31 32 33 34 01 01 00 00 00 00 00 ff 00 00 00 c8 |1234............|
|00000010| ac ed 00 05 73 72 00 25 63 6e 2e 69 74 63 61 73 |....sr.%cn.itcas|
|00000020| 74 2e 6d 65 73 73 61 67 65 2e 4c 6f 67 69 6e 52 |t.message.LoginR|
|00000030| 65 71 75 65 73 74 4d 65 73 73 61 67 65 a0 3f 71 |equestMessage.?q|
|00000040| cb 31 45 b5 88 02 00 02 4c 00 08 70 61 73 73 77 |.1E.....L..passw|
|00000050| 6f 72 64 74 00 12 4c 6a 61 76 61 2f 6c 61 6e 67 |ordt..Ljava/lang|
|00000060| 2f 53 74 72 |/Str |
+--------+-------------------------------------------------+----------------+
22:54:43 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] READ COMPLETE
22:54:43 [DEBUG] [main] i.n.h.l.LoggingHandler - [id: 0xembedded, L:embedded - R:embedded] READ: 116B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 69 6e 67 3b 4c 00 08 75 73 65 72 6e 61 6d 65 71 |ing;L..usernameq|
|00000010| 00 7e 00 01 78 72 00 19 63 6e 2e 69 74 63 61 73 |.~..xr..cn.itcas|
|00000020| 74 2e 6d 65 73 73 61 67 65 2e 4d 65 73 73 61 67 |t.message.Messag|
|00000030| 65 3d dd 19 a0 bc 07 47 cb 02 00 02 49 00 0b 6d |e=.....G....I..m|
|00000040| 65 73 73 61 67 65 54 79 70 65 49 00 0a 73 65 71 |essageTypeI..seq|
|00000050| 75 65 6e 63 65 49 64 78 70 00 00 00 00 00 00 00 |uenceIdxp.......|
|00000060| 00 74 00 0a 32 35 37 37 32 39 37 36 32 31 74 00 |.t..2577297621t.|
|00000070| 03 74 79 73 |.tys |
+--------+-------------------------------------------------+----------------+
===========魔数===========
825373492
===========版本号===========
1
===========序列化方法===========
1
===========指令类型===========
0
===========请求序号===========
0
===========正文长度===========
200
===========正文===========
LoginRequestMessage(super=Message(sequenceId=0, messageType=0), username=tys, password=2577297621)
//通过上述结果可以看到,已经避免了半包问题,同时结果也可以正常的被解码
3、@Sharable注解
为了提高handler的复用率,可以将handler创建为handler对象,然后在不同的channel中使用该handler对象进行处理操作。
LoggingHandler loggingHandler = new LoggingHandler(LogLevel.DEBUG);
// 不同的channel中使用同一个handler对象,提高复用率
channel1.pipeline().addLast(loggingHandler);
channel2.pipeline().addLast(loggingHandler);
但并不是所有的handler都能通过这种方法来提高复用率的,例如LengthFieldBasedFrameDecoder。如果多个channel中使用同一个LengthFieldBasedFrameDecoder对象,则可能发生如下问题:
- channel1中收到了一个半包,LengthFieldBasedFrameDecoder发现不是一条完整的数据,则没有继续向下传播
- 此时channel2中也收到了一个半包,因为两个channel使用了同一个LengthFieldBasedFrameDecoder,存入其中的数据刚好拼凑成了一个完整的数据包。LengthFieldBasedFrameDecoder让该数据包继续向下传播,最终引发错误
所以可以看到LengthFieldBasedFrameDecoder存在线程安全问题,对于LoggingHandler并不存在线程安全问题,只是打印详细数据日志。为了提高handler的复用率,同时又避免出现一些并发问题,Netty中原生的handler中用@Sharable注解来标明该handler能否在多个channel中共享。只有带有该注解,才能通过对象的方式被共享,否则无法被共享。
源码:
//LoggingHandler源码->Sharable注解
@Sharable
@SuppressWarnings({ "StringConcatenationInsideStringBufferAppend", "StringBufferReplaceableByString" })
public class LoggingHandler extends ChannelDuplexHandler {
//LengthFieldBasedFrameDecoder源码->没有Sharable注解
public class LengthFieldBasedFrameDecoder extends ByteToMessageDecoder {
一个Handler如果是线程安全的,不会记录上个状态信息,即无状态的,就可以使用Sharable注解,反之,不能使用该注解。一个Handler是不是无状态的,要根据这个Handler的功能考虑。
4、使用@Sharable注解
自定义编解码器能否使用@Sharable注解,这需要根据自定义的handler的处理逻辑进行分析。
我们的MessageCodec本身接收的是LengthFieldBasedFrameDecoder处理之后的数据,那么数据肯定是完整的,按分析来说是可以添加@Sharable注解的,但是实际情况我们并不能添加该注解,会抛出异常信息。
Exception in thread "main" java.lang.IllegalStateException: ChannelHandler cn.itcast.protocol.MessageCodec is not allowed to be shared
这是Netty的默认保护机制,会认为我们自定义的Handler不能在各个channel中进行共享,担心我们无法处理好线程安全问题,分析源码(查看父类ByteToMessageCodec)如下:
/**
* A Codec for on-the-fly encoding/decoding of bytes to messages and vise-versa.
*
* This can be thought of as a combination of {@link ByteToMessageDecoder} and {@link MessageToByteEncoder}.
*
* Be aware that sub-classes of {@link ByteToMessageCodec} <strong>MUST NOT</strong>
* annotated with {@link @Sharable}.
*/
public abstract class ByteToMessageCodec<I> extends ChannelDuplexHandler
//上面的注释明确说明该类的子类不能使用该注解
这就意味着ByteToMessageCodec不能被多个channel所共享的,因为该类的目标是:将ByteBuf转化为Message,意味着传进该handler的数据还未被处理过,所以传过来的ByteBuf可能并不是完整的数据,如果共享则会出现问题。
如果想要共享,需要怎么办呢?继承MessageToMessageDecoder即可。该类的目标是:将已经被处理的完整数据再次被处理。传过来的Message如果是被处理过的完整数据,那么被共享也就不会出现问题了,也就可以使用@Sharable注解了。实现方式与ByteToMessageCodec类似。
@ChannelHandler.Sharable
/**
* 必须和 LengthFieldBasedFrameDecoder 一起使用,确保接到的 ByteBuf 消息是完整的
*/
public class MessageSharableCodec extends MessageToMessageCodec<ByteBuf, Message> {
@Override
protected void encode(ChannelHandlerContext ctx, Message msg, List<Object> out) throws Exception {
...
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf msg, List<Object> out) throws Exception {
...
}
}
@Slf4j
@ChannelHandler.Sharable
/**
* 必须和 LengthFieldBasedFrameDecoder 一起使用,确保接到的 ByteBuf 消息是完整的
*/
public class MessageCodecSharable extends MessageToMessageCodec<ByteBuf, Message> {
@Override
protected void encode(ChannelHandlerContext ctx, Message msg, List<Object> outList) throws Exception {
ByteBuf out = ctx.alloc().buffer();
// 1. 4 字节的魔数
out.writeBytes(new byte[]{1, 2, 3, 4});
// 2. 1 字节的版本,
out.writeByte(1);
// 3. 1 字节的序列化方式 jdk 0 , json 1
out.writeByte(0);
// 4. 1 字节的指令类型
out.writeByte(msg.getMessageType());
// 5. 4 个字节
out.writeInt(msg.getSequenceId());
// 无意义,对齐填充
out.writeByte(0xff);
// 6. 获取内容的字节数组
ByteArrayOutputStream bos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(bos);
oos.writeObject(msg);
byte[] bytes = bos.toByteArray();
// 7. 长度
out.writeInt(bytes.length);
// 8. 写入内容
out.writeBytes(bytes);
outList.add(out);
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
int magicNum = in.readInt();
byte version = in.readByte();
byte serializerType = in.readByte();
byte messageType = in.readByte();
int sequenceId = in.readInt();
in.readByte();
int length = in.readInt();
byte[] bytes = new byte[length];
in.readBytes(bytes, 0, length);
ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));
Message message = (Message) ois.readObject();
log.debug("{}, {}, {}, {}, {}, {}", magicNum, version, serializerType, messageType, sequenceId, length);
log.debug("{}", message);
out.add(message);
}
}
3、在线聊天室
1、聊天室业务
用户管理-UserService
public interface UserService {
/**
* 登录
* @param username 用户名
* @param password 密码
* @return 登录成功返回 true, 否则返回 false
*/
boolean login(String username, String password);
}
聊天组会话管理-GroupSession
public interface GroupSession {
/**
* 创建一个聊天组, 如果不存在才能创建成功, 否则返回 null
* @param name 组名
* @param members 成员
* @return 成功时返回组对象, 失败返回 null
*/
Group createGroup(String name, Set<String> members);
/**
* 加入聊天组
* @param name 组名
* @param member 成员名
* @return 如果组不存在返回 null, 否则返回组对象
*/
Group joinMember(String name, String member);
/**
* 移除组成员
* @param name 组名
* @param member 成员名
* @return 如果组不存在返回 null, 否则返回组对象
*/
Group removeMember(String name, String member);
/**
* 移除聊天组
* @param name 组名
* @return 如果组不存在返回 null, 否则返回组对象
*/
Group removeGroup(String name);
/**
* 获取组成员
* @param name 组名
* @return 成员集合, 没有成员会返回 empty set
*/
Set<String> getMembers(String name);
/**
* 获取组成员的 channel 集合, 只有在线的 channel 才会返回
* @param name 组名
* @return 成员 channel 集合
*/
List<Channel> getMembersChannel(String name);
}
会话管理-Session
public interface Session {
/**
* 绑定会话
* @param channel 哪个 channel 要绑定会话
* @param username 会话绑定用户
*/
void bind(Channel channel, String username);
/**
* 解绑会话
* @param channel 哪个 channel 要解绑会话
*/
void unbind(Channel channel);
/**
* 获取属性
* @param channel 哪个 channel
* @param name 属性名
* @return 属性值
*/
Object getAttribute(Channel channel, String name);
/**
* 设置属性
* @param channel 哪个 channel
* @param name 属性名
* @param value 属性值
*/
void setAttribute(Channel channel, String name, Object value);
/**
* 根据用户名获取 channel
* @param username 用户名
* @return channel
*/
Channel getChannel(String username);
}
客户端代码
@Slf4j
public class ChatClient {
public static void main(String[] args) {
NioEventLoopGroup group = new NioEventLoopGroup();
//日志
LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
//消息编解码器
MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(group);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//处理粘包-半包问题,确保获得的是完整的每一条消息
ch.pipeline().addLast(new ProcotolFrameDecoder());
//日志
ch.pipeline().addLast(LOGGING_HANDLER);
//消息编解码器
ch.pipeline().addLast(MESSAGE_CODEC);
}
});
Channel channel = bootstrap.connect("localhost", 8080).sync().channel();
channel.closeFuture().sync();
} catch (Exception e) {
log.error("client error", e);
} finally {
group.shutdownGracefully();
}
}
}
基于长度字段的消息解码器
//继承LengthFieldBasedFrameDecoder
public class ProcotolFrameDecoder extends LengthFieldBasedFrameDecoder {
public ProcotolFrameDecoder() {
this(1024, 12, 4, 0, 0);
}
public ProcotolFrameDecoder(int maxFrameLength, int lengthFieldOffset, int lengthFieldLength, int lengthAdjustment, int initialBytesToStrip) {
super(maxFrameLength, lengthFieldOffset, lengthFieldLength, lengthAdjustment, initialBytesToStrip);
}
}
服务器端代码
@Slf4j
public class ChatServer {
public static void main(String[] args) {
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
//可以共享-日志-消息编解码器
LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.group(boss, worker);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//获得的是完整的一条消息记录-解决粘包及半包问题
ch.pipeline().addLast(new ProcotolFrameDecoder());
ch.pipeline().addLast(LOGGING_HANDLER);
ch.pipeline().addLast(MESSAGE_CODEC);
}
});
Channel channel = serverBootstrap.bind(8080).sync().channel();
channel.closeFuture().sync();
} catch (InterruptedException e) {
log.error("server error", e);
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
2、登陆
客户端代码
思路-新开辟一个线程实现建立连接之后发送登陆请求及后续操作,登陆请求消息体构建之后发送到服务器,服务器进行验证,此时客户端处于阻塞状态。当读取到服务器的响应之后,关闭新开辟线程的阻塞状态,向下执行,这里的线程同步可以使用CountDownLatch。
@Slf4j
public class ChatClient {
public static void main(String[] args) {
NioEventLoopGroup group = new NioEventLoopGroup();
LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
//使用CountDownLatch实现线程通信
CountDownLatch WAIT_FOR_LOGIN = new CountDownLatch(1);
//登陆状态标记-默认是未登陆状态
AtomicBoolean LOGIN = new AtomicBoolean(false);
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(group);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//获得一条完整的消息
ch.pipeline().addLast(new ProcotolFrameDecoder());
//打印日志
// ch.pipeline().addLast(LOGGING_HANDLER);
//消息编解码
ch.pipeline().addLast(MESSAGE_CODEC);
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
//读取服务器响应
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof LoginResponseMessage) {
LoginResponseMessage response = (LoginResponseMessage) msg;
if (response.isSuccess()) {
//登陆成功-设置登陆状态标记
LOGIN.set(true);
}
//登陆后-唤醒登陆线程
WAIT_FOR_LOGIN.countDown();
}
}
//连接成功进行登陆操作
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//新开辟一个线程执行登陆操作
new Thread(() -> {
//获取用户输入的用户名和密码
Scanner scanner = new Scanner(System.in);
System.out.println("请输入用户名:");
String username = scanner.nextLine();
System.out.println("请输入密码:");
String password = scanner.nextLine();
//构造登陆请求
LoginRequestMessage loginRequestMessage = new LoginRequestMessage(username, password);
//将登陆请求发送到channel中
ctx.writeAndFlush(loginRequestMessage);
//当前线程等待服务器验证用户名和密码
//阻塞-直到登陆成功后CountDownLatch被设置为0-即:WAIT_FOR_LOGIN.countDown();
try {
WAIT_FOR_LOGIN.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
//判断登陆状态
if (!LOGIN.get()) {//登陆失败
//关闭channel
ctx.channel().close();
return;
}
//登陆成功
while (true) {
System.out.println("==================================");
System.out.println("send [username] [content]");
System.out.println("gsend [group name] [content]");
System.out.println("gcreate [group name] [m1,m2,m3...]");
System.out.println("gmembers [group name]");
System.out.println("gjoin [group name]");
System.out.println("gquit [group name]");
System.out.println("quit");
System.out.println("==================================");
try {
Thread.sleep(1000000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "LoginThread").start();
}
});
}
});
Channel channel = bootstrap.connect("localhost", 8080).sync().channel();
channel.closeFuture().sync();
} catch (Exception e) {
log.error("client error", e);
} finally {
group.shutdownGracefully();
}
}
}
服务器端代码
思路-自定义添加处理登陆请求的Handler。
@Slf4j
public class ChatServer {
public static void main(String[] args) {
//boss处理连接
NioEventLoopGroup boss = new NioEventLoopGroup();
//worker处理读写-两个线程
NioEventLoopGroup worker = new NioEventLoopGroup(2);
LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.group(boss, worker);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
//完整消息
ch.pipeline().addLast(new ProcotolFrameDecoder());
//日志
// ch.pipeline().addLast(LOGGING_HANDLER);
//解码
ch.pipeline().addLast(MESSAGE_CODEC);
ch.pipeline().addLast("login-request", new LoginRequestMessageHandler());
}
});
Channel channel = serverBootstrap.bind(8080).sync().channel();
channel.closeFuture().sync();
} catch (InterruptedException e) {
log.error("server error", e);
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
自定义的登陆请求处理器
/**
* @Name: LoginRequestMessageHandler
* @Description: 处理登陆请求的handler
* @Author: Mr.Tong
*/
public class LoginRequestMessageHandler extends SimpleChannelInboundHandler<LoginRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, LoginRequestMessage msg) throws Exception {
//验证登陆请求
String username = msg.getUsername();
String password = msg.getPassword();
UserService userService = UserServiceFactory.getUserService();
boolean login = userService.login(username, password);
LoginResponseMessage loginResponse;
//判断登陆状态
if (login) {
loginResponse = new LoginResponseMessage(true, "登陆成功");
} else {
loginResponse = new LoginResponseMessage(false, "登陆失败");
}
//将服务器的用户名密码验证响应写入channel中
ctx.writeAndFlush(loginResponse);
}
}
登陆成功:
请输入用户名:
zhangsan
请输入密码:
123
20:37:47 [DEBUG] [nioEventLoopGroup-2-1] c.i.p.MessageCodecSharable - LoginResponseMessage(super=AbstractResponseMessage(super=Message(sequenceId=0, messageType=1), success=true, reason=登陆成功))
==================================
send [username] [content]
gsend [group name] [content]
gcreate [group name] [m1,m2,m3...]
gmembers [group name]
gjoin [group name]
gquit [group name]
quit
==================================
登陆失败:
请输入用户名:
zhangsan
请输入密码:
234
20:38:25 [DEBUG] [nioEventLoopGroup-2-1] c.i.p.MessageCodecSharable - LoginResponseMessage(super=AbstractResponseMessage(super=Message(sequenceId=0, messageType=1), success=false, reason=登陆失败))
Process finished with exit code 0
3、单聊
客户端应该匹配用户输入的命令,从而实现不同的功能。
//登陆成功
while (true) {
System.out.println("==================================");
System.out.println("send [username] [content]");
System.out.println("gsend [group name] [content]");
System.out.println("gcreate [group name] [m1,m2,m3...]");
System.out.println("gmembers [group name]");
System.out.println("gjoin [group name]");
System.out.println("gquit [group name]");
System.out.println("quit");
System.out.println("==================================");
//获取用户输入的命令
String command = scanner.nextLine();
String[] messageData = command.split(" ");
//匹配用户的命令
switch (messageData[0]) {
//单聊发送消息
case "send":
ChatRequestMessage chatRequestMessage = new ChatRequestMessage(username, messageData[1], messageData[2]);
ctx.channel().writeAndFlush(chatRequestMessage);
break;
case "gsend":
GroupChatRequestMessage groupChatRequestMessage = new GroupChatRequestMessage(username, messageData[1], messageData[2]);
ctx.channel().writeAndFlush(groupChatRequestMessage);
break;
case "gcreate":
String[] split = messageData[2].split(",");
//小组成员-使用set集合,防止出现两个相同的成员
HashSet<String> members = new HashSet<>(Arrays.asList(split));
members.add(username);//加入当前用户自己
//建群请求
GroupCreateRequestMessage groupCreateRequestMessage = new GroupCreateRequestMessage(messageData[1], members);
ctx.writeAndFlush(groupCreateRequestMessage);
break;
case "gmembers":
GroupMembersRequestMessage groupMembersRequestMessage = new GroupMembersRequestMessage(messageData[1]);
ctx.writeAndFlush(groupMembersRequestMessage);
break;
case "gjoin":
GroupJoinRequestMessage groupJoinRequestMessage = new GroupJoinRequestMessage(username, messageData[1]);
ctx.writeAndFlush(groupJoinRequestMessage);
break;
case "gquit":
GroupQuitRequestMessage groupQuitRequestMessage = new GroupQuitRequestMessage(username, messageData[1]);
ctx.writeAndFlush(groupQuitRequestMessage);
break;
case "quit":
ctx.channel().close();
return;
}
}
服务器端首先应该记录channel和用户名的映射关系,这样A向B发送消息,就可以找到B的channel,从而实现写入消息。
应该在哪里记录?在LoginRequestMessageHandler中记录,channel和用户名的映射关系在登陆的时候就已经确定了,所以登陆成功之后就需要记录该关系。
@ChannelHandler.Sharable
public class LoginRequestMessageHandler extends SimpleChannelInboundHandler<LoginRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, LoginRequestMessage msg) throws Exception {
//验证登陆请求
String username = msg.getUsername();
String password = msg.getPassword();
UserService userService = UserServiceFactory.getUserService();
boolean login = userService.login(username, password);
LoginResponseMessage loginResponse;
//判断登陆状态
if (login) {
loginResponse = new LoginResponseMessage(true, "登陆成功");
//记录channel和username的映射关系-通过会话管理器
SessionFactory.getSession().bind(ctx.channel(),username);
} else {
loginResponse = new LoginResponseMessage(false, "登陆失败");
}
//将服务器的用户名密码验证响应写入channel中
ctx.writeAndFlush(loginResponse);
}
}
添加单聊的Handler,代码如下:
@ChannelHandler.Sharable
public class ChatRequestMessageHandler extends SimpleChannelInboundHandler<ChatRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, ChatRequestMessage msg) throws Exception {
//服务器拿到客户端发送过来的聊天消息
String to = msg.getTo();
Channel channel = SessionFactory.getSession().getChannel(to);
if (channel != null) {//用户在线
//服务器给to用户写入消息
channel.writeAndFlush(new ChatResponseMessage(msg.getFrom(), msg.getContent()));
} else {//用户不在线或者不存在
//服务器给from用户写入消息
ctx.writeAndFlush(new ChatResponseMessage(false, "用户不在线或者不存在"));
}
}
}
服务端代码优化:将所有可以共享的Handler抽取出来优化代码。
@Slf4j
public class ChatServer {
public static void main(String[] args) {
//boss处理连接
NioEventLoopGroup boss = new NioEventLoopGroup();
//worker处理读写-两个线程
NioEventLoopGroup worker = new NioEventLoopGroup(2);
LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
LoginRequestMessageHandler LOGIN_HANDLER = new LoginRequestMessageHandler();
ChatRequestMessageHandler CHAT_HANDLER = new ChatRequestMessageHandler();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.group(boss, worker);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ProcotolFrameDecoder());
ch.pipeline().addLast(MESSAGE_CODEC);
ch.pipeline().addLast(LOGIN_HANDLER);
ch.pipeline().addLast(CHAT_HANDLER);
}
});
Channel channel = serverBootstrap.bind(8080).sync().channel();
channel.closeFuture().sync();
} catch (InterruptedException e) {
log.error("server error", e);
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}
注意:
上面自定义的Handler都使用了Sharable注解,什么情况下需要使用该注解呢?
标有@Sharable的Handler,代表了他是一个可以被分享的Handler,这就是说服务器注册了这个Handler后,可以分享给多个客户端使用,如果没有使用该注解,则每次客户端请求时,都必须重新创建一个Handler。
正常情况下同一个ChannelHandler的不同的实例会被添加到不同的Channel管理的管线里面的,但是如果你需要全局统计一些信息,比如所有连接报错次数等,这时候你可能需要使用单例的ChannelHandler,需要注意的是这时候ChannelHandler上需要添加@Sharable注解。
参考:
你真的了解Netty中@Sharable? - 知乎 (zhihu.com)
实现效果:
发送方-zhangsan
send lisi nihao!
接收方-lisi
09:07:48 [DEBUG] [nioEventLoopGroup-2-1] c.i.p.MessageCodecSharable - ChatResponseMessage(super=AbstractResponseMessage(super=Message(sequenceId=0, messageType=3), success=false, reason=null), from=zhangsan, content=nihao!)
4、群聊创建
客户端代码:
case "gcreate":
String[] split = messageData[2].split(",");
//小组成员-使用set集合,防止出现两个相同的成员
HashSet<String> members = new HashSet<>(Arrays.asList(split));
members.add(username);//不要忘记将自己拉入到群聊中
GroupCreateRequestMessage groupCreateRequestMessage = new GroupCreateRequestMessage(messageData[1], members);
ctx.writeAndFlush(groupCreateRequestMessage);
break;
服务器端代码:
GroupCreateRequestMessageHandler GROUP_CREATE_HANDLER = new GroupCreateRequestMessageHandler();
ch.pipeline().addLast(GROUP_CREATE_HANDLER);
群组创建请求消息的Handler:
@ChannelHandler.Sharable
public class GroupCreateRequestMessageHandler extends SimpleChannelInboundHandler<GroupCreateRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, GroupCreateRequestMessage msg) throws Exception {
//获取群组管理器
GroupSession groupSession = GroupSessionFactory.getGroupSession();
//创建群组-如果存在群名对应的群组,则直接返回群组,如果不存在就会创建群名对应的群组,并且返回null
Group group = groupSession.createGroup(msg.getGroupName(), msg.getMembers());
if (group == null) {//创建成功
//向创建者发送创建成功的消息
ctx.channel().writeAndFlush(new GroupCreateResponseMessage(true, msg.getGroupName() + "创建成功"));
//向群组的其他人发送消息-被拉入群中
List<Channel> membersChannel = groupSession.getMembersChannel(msg.getGroupName());
membersChannel.forEach(channel -> channel.writeAndFlush(new GroupCreateResponseMessage(true, "您已经被拉入" + msg.getGroupName())));
} else {
//向创建者发送创建失败的消息
ctx.channel().writeAndFlush(new GroupCreateResponseMessage(false, msg.getGroupName() + "已经存在"));
}
}
}
运行效果:
zhangsan
gcreate 群聊1 lisi,wangwu
10:51:29 [DEBUG] [nioEventLoopGroup-2-1] c.i.p.MessageCodecSharable - GroupCreateResponseMessage(super=AbstractResponseMessage(super=Message(sequenceId=0, messageType=5), success=true, reason=群聊1创建成功))
10:51:29 [DEBUG] [nioEventLoopGroup-2-1] c.i.p.MessageCodecSharable - GroupCreateResponseMessage(super=AbstractResponseMessage(super=Message(sequenceId=0, messageType=5), success=true, reason=您已经被拉入群聊1))
lisi
10:51:29 [DEBUG] [nioEventLoopGroup-2-1] c.i.p.MessageCodecSharable - GroupCreateResponseMessage(super=AbstractResponseMessage(super=Message(sequenceId=0, messageType=5), success=true, reason=您已经被拉入群聊1))
wangwu
10:51:29 [DEBUG] [nioEventLoopGroup-2-1] c.i.p.MessageCodecSharable - GroupCreateResponseMessage(super=AbstractResponseMessage(super=Message(sequenceId=0, messageType=5), success=true, reason=您已经被拉入群聊1))
Map中的putIfAbsent方法:
default V putIfAbsent(K key, V value) { V v = get(key); if (v == null) { v = put(key, value); //此时的v经过put之后仍然是null值 } return v; }如果key对应的value存在,就会直接返回该value,如果不存在,会将新的key和value存入到map中,返回null值。
5、群聊消息发送
@ChannelHandler.Sharable
public class GroupChatRequestMessageHandler extends SimpleChannelInboundHandler<GroupChatRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, GroupChatRequestMessage msg) throws Exception {
String groupName = msg.getGroupName();
String from = msg.getFrom();
String content = msg.getContent();
//获取群组成员的channel
GroupSession groupSession = GroupSessionFactory.getGroupSession();
List<Channel> membersChannel = groupSession.getMembersChannel(groupName);
//遍历channel发送消息
membersChannel.forEach(channel -> channel.writeAndFlush(new GroupChatResponseMessage(from, content)));
//告诉用户发送成功
ctx.channel().writeAndFlush(new GroupChatResponseMessage(true, "发送消息成功"));
}
}
GroupChatRequestMessageHandler GROUP_CHAT_HANDLER = new GroupChatRequestMessageHandler();
ch.pipeline().addLast(GROUP_CHAT_HANDLER);
6、获取群聊内所有成员
@ChannelHandler.Sharable
public class GroupMembersRequestMessageHandler extends SimpleChannelInboundHandler<GroupMembersRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, GroupMembersRequestMessage msg) throws Exception {
String groupName = msg.getGroupName();
GroupSession groupSession = GroupSessionFactory.getGroupSession();
Set<String> members = groupSession.getMembers(groupName);
ctx.channel().writeAndFlush(new GroupMembersResponseMessage(members));
//上述代码是所有的客户端都可以查看某一个群聊的所有成员
//业务逻辑来讲,只有群聊内的成员才可以查看群成员,后续可以进行优化
}
}
GroupMembersRequestMessageHandler GROUP_MEMBERS_HANDLER = new GroupMembersRequestMessageHandler();
ch.pipeline().addLast(GROUP_MEMBERS_HANDLER);
7、加入群聊
@ChannelHandler.Sharable
public class GroupJoinRequestMessageHandler extends SimpleChannelInboundHandler<GroupJoinRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, GroupJoinRequestMessage msg) throws Exception {
//获取新加入的用户及群聊信息
String username = msg.getUsername();
String groupName = msg.getGroupName();
GroupSession groupSession = GroupSessionFactory.getGroupSession();
Group group = groupSession.joinMember(groupName, username);
if (group != null) {
//给用户发送加入成功的消息
ctx.channel().writeAndFlush(new GroupJoinResponseMessage(true, "加入群聊成功"));
//通知群聊的所有用户
List<Channel> membersChannel = groupSession.getMembersChannel(groupName);
membersChannel.forEach(channel -> channel.writeAndFlush(new GroupJoinResponseMessage(true, username + "加入了群聊")));
}
}
}
GroupJoinRequestMessageHandler GROUP_JOIN_HANDLER = new GroupJoinRequestMessageHandler();
ch.pipeline().addLast(GROUP_JOIN_HANDLER);
8、退出群聊
@ChannelHandler.Sharable
public class GroupQuitRequestMessageHandler extends SimpleChannelInboundHandler<GroupQuitRequestMessage> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, GroupQuitRequestMessage msg) throws Exception {
String username = msg.getUsername();
String groupName = msg.getGroupName();
GroupSession groupSession = GroupSessionFactory.getGroupSession();
Group group = groupSession.removeMember(groupName, username);
if (group != null) {
ctx.writeAndFlush(new GroupQuitResponseMessage(true, "退出群聊成功"));
//通知其他人
List<Channel> membersChannel = groupSession.getMembersChannel(groupName);
membersChannel.forEach(channel -> channel.writeAndFlush(new GroupQuitResponseMessage(true, username + "退出群聊")));
}
}
}
GroupQuitRequestMessageHandler GROUP_QUIT_HANDLER = new GroupQuitRequestMessageHandler();
ch.pipeline().addLast(GROUP_QUIT_HANDLER);
9、退出聊天室
客户端退出分为两种(服务器需要捕捉到两种情况都进行处理):
1、正常退出-channel.close()
2、异常退出-客户端强制关闭channel
@Slf4j
@ChannelHandler.Sharable
public class QuitHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
//正常退出的情况
SessionFactory.getSession().unbind(ctx.channel());//解绑
log.debug("{} 已经断开", ctx.channel());
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
//异常退出的情况
SessionFactory.getSession().unbind(ctx.channel());//解绑
log.debug("{} 已经异常断开-异常信息{}", ctx.channel(), cause.getMessage());
}
}
强制退出客户端似乎不能触发
exceptionCaught的情况。关于
exceptionCaught可参考:Netty源码分析之ChannelPipeline(五)—异常事件的传播 - DaFanJoy - 博客园 (cnblogs.com)
10、空闲检测
连接假死
原因
- 网络设备出现故障,例如网卡,机房等,底层的 TCP 连接已经断开了,但应用程序没有感知到,仍然占用着资源
- 公网网络不稳定,出现丢包,如果连续出现丢包,这时现象就是客户端数据发不出去,服务端也一直收不到数据,会白白地消耗资源
- 应用程序线程阻塞,无法进行数据读写
问题
- 假死的连接占用的资源不能自动释放
- 向假死的连接发送数据,得到的反馈是发送超时
解决方法
可以添加IdleStateHandler对空闲时间进行检测,通过构造函数可以传入三个参数
- readerIdleTimeSeconds 读空闲经过的秒数
- writerIdleTimeSeconds 写空闲经过的秒数
- allIdleTimeSeconds 读和写空闲经过的秒数
当指定时间内未发生读或写事件时,会触发特定事件
- 读空闲会触发
READER_IDLE - 写空闲会触发
WRITE_IDLE - 读和写空闲会触发
ALL_IDEL
服务器端检测读空闲时间
// 用于空闲连接的检测,5s内未读到数据,会触发READ_IDLE事件
ch.pipeline().addLast(new IdleStateHandler(5, 0, 0));
// 添加双向处理器,负责处理READER_IDLE事件
ch.pipeline().addLast(new ChannelDuplexHandler() {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
// 获得事件
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.READER_IDLE) {
log.debug("建立连接之后【并不是登陆之后】-服务器已经5秒没有读到数据了...");
//关闭当前连接
ctx.channel().close();
}
}
});
- 使用
IdleStateHandler进行空闲检测 - 使用双向处理器
ChannelDuplexHandler对入站与出站事件进行处理IdleStateHandler中的事件为特殊事件,需要实现ChannelDuplexHandler的userEventTriggered方法,判断事件类型并自定义处理方式,来对事件进行处理
为避免因非网络等原因引发的READ_IDLE事件,比如网络情况良好,只是用户本身没有输入数据,这时发生READ_IDLE事件,直接让服务器断开连接是不可取的。为避免此类情况,需要在客户端向服务器发送心跳包,发送频率要小于服务器设置的IdleTimeSeconds,一般设置为其值的一半。
上面的代码中,客户端和服务器端建立连接之后的5秒,客户端并没有因为网络问题而向服务器发送不了消息,而是因为客户端在输入用户名和密码的时候阻塞了,这个时候服务器就直接把这个连接关掉了,误伤了客户端,所以需要在客户端建立连接之后(不是登陆之后),就要发送心跳包给服务器,用于证明自己不是因为网络问题而发不过去消息。
//连接建立
09:47:05 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0xb8424f09, L:/127.0.0.1:8080 - R:/127.0.0.1:51412] REGISTERED
09:47:05 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0xb8424f09, L:/127.0.0.1:8080 - R:/127.0.0.1:51412] ACTIVE
//5秒之后没有收到读消息
09:47:10 [DEBUG] [nioEventLoopGroup-3-1] c.i.s.ChatServer - 建立连接之后【并不是登陆之后】-服务器已经5秒没有读到数据了...
//关闭当前连接,客户端无法实现登陆
09:47:10 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0xb8424f09, L:/127.0.0.1:8080 - R:/127.0.0.1:51412] CLOSE
09:47:10 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0xb8424f09, L:/127.0.0.1:8080 ! R:/127.0.0.1:51412] INACTIVE
09:47:10 [DEBUG] [nioEventLoopGroup-3-1] i.n.h.l.LoggingHandler - [id: 0xb8424f09, L:/127.0.0.1:8080 ! R:/127.0.0.1:51412] UNREGISTERED
正确的处理方式应该在客户端添加写空闲时间的检测,连接建立后,到达指定时间如果没有写入数据,就会向服务器写入心跳数据包,代码如下:
// 发送心跳包,让服务器知道客户端在线
// 建立连接之后,3s未发生WRITER_IDLE,就像服务器发送心跳包
// 该值为服务器端设置的READER_IDLE触发时间的一半左右
ch.pipeline().addLast(new IdleStateHandler(0, 3, 0));
ch.pipeline().addLast(new ChannelDuplexHandler() {
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.WRITER_IDLE) {
// log.debug("建立连接之后,客户端3秒中没有发送消息了,发送心跳包...");
// 发送心跳包
ctx.writeAndFlush(new PingMessage());
}
}
});