概述

Netty 是一个异步的、基于事件驱动的网络应用框架,用于快速开发可维护、高性能的网络服务器和客户端

注意:Netty的异步还是基于多路复用的,并没有实现真正意义上的异步IO


优势

image-20220921220936680

入门案例

添加依赖

1
2
3
4
5
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.39.Final</version>
</dependency>

服务器步骤

1、创建启动器类,添加组件(NioEventLoopGroup)

2、添加一个ServerSocketChannel的实现

3、添加处理器和初始化器

4、绑定监听端口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class HelloServer {
public static void main(String[] args) {
//1、启动器:负责组装 netty 组件,启动服务器
new ServerBootstrap()
//2、BossEventLoop,WorkEventLoop(selector,thread),group组
.group(new NioEventLoopGroup())
//3、选择服务器的 ServerSocketChannel 实现
.channel(NioServerSocketChannel.class)
/**
* 4、childHandler:决定了worker(child) 能执行哪些操作(handler)
* (boss 负责处理连接)
* (worker(child) 负责处理读写)
*/
.childHandler(
/**
* 5、负责添加别的handler(在initChannel方法里进行添加)
* Channel:代表和客户端进行数据读写的通道
* Initializer:初始化器
*/
new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
//负责解码,将ByteBuf转换为字符串
nioSocketChannel.pipeline().addLast(new StringDecoder());
//自定义handler
nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() {
//7、channelRead:在读事件发生后,执行该方法
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println(msg);
}
});
}
})
//8、服务器启动后绑定的监听端口
.bind(8080);
}
}

客户端步骤

1、创建启动器类,添加组件

2、添加一个SocketChannel的实现

3、添加处理器

4、连接到服务器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class HelloClient {
public static void main(String[] args) throws InterruptedException {
//1、启动器
new Bootstrap()
//2、添加 EventLoop
.group(new NioEventLoopGroup())
//3、选择客户端 channel 实现
.channel(NioSocketChannel.class)
//4、添加处理器
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override //在连接建立后被调用
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringEncoder()); //将ByteBuf解码为字符串
}
})
//5、连接到服务器
.connect(new InetSocketAddress("localhost",8080))
.sync()//阻塞方法,知道连接建立
.channel()
//6、向服务器发送数据
.writeAndFlush("hello,world");
}
}

整体流程

Channel 可以理解为数据的通道

msg 理解为流动的数据,最开始输入是 ByteBuf,但经过 pipeline 中的各个 handler 加工,会变成其它类型对象,最后输出又变成 ByteBuf

handler 可以理解为数据的处理工序

  • 工序有多道,合在一起就是 pipeline(传递途径)pipeline 负责发布事件(读、读取完成…)传播给每个 handler, handler 对自己感兴趣的事件进行处理(重写了相应事件处理方法)
  • pipeline 中有多个 handler,处理时会依次调用其中的 handler
  • handler 分 Inbound(入栈) 和 Outbound(出栈) 两类

EventLoop 可以理解为处理数据的工人

  • EventLoop 可以管理多个 channel 的 IO 操作,并且一旦 EventLoop 负责了某个 channel,就会将其与channel进行绑定,以后该 channel 中的 IO 操作都由该 EventLoop 负责
  • EventLoop 既可以执行 IO 操作,也可以进行任务处理,每个 EventLoop 有自己的任务队列,队列里可以堆放多个 channel 的待处理任务,任务分为普通任务、定时任务
  • EventLoop 按照 pipeline 顺序,依次按照 handler 的规划(代码)处理数据,可以为每个 handler 指定不同的 EventLoop

左侧为Client,右侧为server

image-20220921221000675


EventLoop相关

EventLoop本质上是一个单线程执行器(同时维护了一个Selector),里面由run方法处理Channel上源源不断的IO事件

继承关系

  • 一条线继承自:java.util.concurrent.ScheduledExecutorService 因此包含了线程池中所有的方法
  • 另外一条线继承自Netty本身的OrderedEventExecutor
    • 提供了 BooleaninEventLoop(Thread thread) 方法判断一个线程是否属于此 EventLoop
    • 提供了 EventLoopGroup parent() 方法来看看自身属于哪个 EventLoopGroup

EventLoopGroup

EventLoopGroup 是一组 EventLoop,Channel 一般会调用 EventLoopGroup 的 register 方法来绑定其中一个 EventLoop,后续这个 Channel 上的 IO 事件都由此 EventLoop 来处理(保证了 IO 事件处理时的线程安全)

注意:EventLoopGroup继承自 Netty 自己的 EventExecutorGroup

  • 实现了 Iterable 接口提供遍历 EventLoop 的能力
  • 另有 next 方法获取集合中下一个 EventLoop

处理普通任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Slf4j
public class MyTestEventLoop {
public static void main(String[] args) {
//1、创建事件循环组
// EventLoopGroup group = new DefaultEventLoopGroup();//可以处理普通任务和定时任务
EventLoopGroup group = new NioEventLoopGroup();//可以处理IO事件,普通任务和定时任务

//2、获取下一个循环对象
group.next();

//3、执行普通任务
group.next().submit(()->{
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
log.debug("ok");
});
log.debug("main");
}
}

输出结果

image-20220921221010946

处理定时任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Slf4j
public class MyTestEventLoop {
public static void main(String[] args) {
//1、创建事件循环组
EventLoopGroup group = new NioEventLoopGroup();//可以处理IO事件,普通任务和定时任务

//2、获取下一个循环对象
group.next();

/**
* 4、执行定时任务
* 参数一:任务对象
* 参数二:初始延迟时间,设置为0代表立刻执行
* 参数三:间隔时间
* 参数四:时间单位
*/
group.next().scheduleAtFixedRate(()->{
log.debug("ok");
},0,1, TimeUnit.SECONDS);
}
}

输出结果:每隔一秒打印一次

image-20220921221019654

处理IO事件

服务器代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
@Slf4j
public class MyEventLoopServer {
public static void main(String[] args) {
new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){
@Override //ByteBuf类型
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
log.debug(buf.toString(Charset.defaultCharset()));
}
});
}
})
.bind(8080);
}
}

客户端代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class MyEventLoopClient {
public static void main(String[] args) throws InterruptedException {
//1、启动器
Channel channel = new Bootstrap()
//2、添加 EventLoop
.group(new NioEventLoopGroup())
//3、选择客户端 channel 实现
.channel(NioSocketChannel.class)
//4、添加处理器
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override //在连接建立后被调用
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringEncoder()); //将ByteBuf解码为字符串
}
})
//5、连接到服务器
.connect(new InetSocketAddress("localhost", 8080))
.sync()
.channel();
// 此处打断点调试,调用 channel.writeAndFlush(...);
System.out.println(channel);
}
}

分工

Bootstrap的group()方法可以传入两个EventLoopGroup参数,分别负责处理不同的事件

1
2
3
4
5
6
7
8
public class MyServer {
public static void main(String[] args) {
new ServerBootstrap()
// 两个Group,分别为Boss 负责Accept事件,Worker 负责读写事件
.group(new NioEventLoopGroup(1), new NioEventLoopGroup(2))
...
}
}

多个客户端分别发送:

1
2
3
4
5
nioEventLoopGroup-3-1 hello1
nioEventLoopGroup-3-2 hello2
nioEventLoopGroup-3-1 hello3
nioEventLoopGroup-3-2 hello4
nioEventLoopGroup-3-2 hello4

总结:一个EventLoop可以负责多个Channel,且EventLoop一旦与Channel绑定,则一直负责处理该Channel中的事件

image-20220921221040084

增加自定义EventLoopGroup

当有的任务需要较长的时间处理时,可以使用非NioEventLoopGroup,避免同一个NioEventLoop中的其他Channel在较长的时间内都无法得到处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Slf4j
public class MyEventLoopServer {
public static void main(String[] args) {
//细分2:创建一个独立的 EventLoopGroup
EventLoopGroup group = new DefaultEventLoopGroup();
new ServerBootstrap()
//细分1:boss只负责accept事件,worker只负责socketChannel上的读写
.group(new NioEventLoopGroup(),new NioEventLoopGroup(2))
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast("handler-1",new ChannelInboundHandlerAdapter(){
@Override //ByteBuf类型
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
log.debug(buf.toString(Charset.defaultCharset()));
//将消息传递给下一个Handler
ctx.fireChannelRead(msg);
}
}).addLast(group,"handler-2",new ChannelInboundHandlerAdapter(){
@Override //ByteBuf类型
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ByteBuf buf = (ByteBuf) msg;
log.debug(buf.toString(Charset.defaultCharset()));
}
});
}
})
.bind(8080);
}
}

启动四个客户端发送数据

1
2
3
4
5
6
7
8
nioEventLoopGroup-4-1 hello1
defaultEventLoopGroup-2-1 hello1
nioEventLoopGroup-4-2 hello2
defaultEventLoopGroup-2-2 hello2
nioEventLoopGroup-4-1 hello3
defaultEventLoopGroup-2-3 hello3
nioEventLoopGroup-4-2 hello4
defaultEventLoopGroup-2-4 hello4

总结:客户端与服务器之间的事件,被nioEventLoopGroup和defaultEventLoopGroup分别处理

image-20220921221055645

切换线程

不同的EventLoopGroup切换的实现原理如下:

(当handler中绑定的Group不同时,需要切换Group来执行不同的任务)

image-20220921221109152

如果两个handler绑定的是同一个线程,那么就直接调用;否则把要调用的代码封装为一个任务对象,由下一个handler的线程来调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
// 获得下一个EventLoop, excutor 即为 EventLoopGroup
EventExecutor executor = next.executor(); //返回下一个handler的EventLoop

//判断当前handler中的线程是否和下一个handler的EventLoop是否是同一个线程
if (executor.inEventLoop()) {
// 使用当前 EventLoopGroup 中的 EventLoop 来处理任务
next.invokeChannelRead(m);
} else {
// 否则让另一个 EventLoopGroup 中的 EventLoop 来创建任务并执行
executor.execute(new Runnable() {
public void run() {
next.invokeChannelRead(m);
}
});
}
}

Channel相关

close() 可以用来关闭Channel

closeFuture() 用来处理 Channel 的关闭

  • sync 方法作用是同步等待 Channel 关闭
  • 而 addListener 方法是异步等待 Channel 关闭

pipeline() 方法用于添加处理器

write() 方法将数据写入到缓冲区中

  • 因为缓冲机制,数据被写入到 Channel 中以后,不会立即被发送
  • 只有当缓冲满了或者调用了flush()方法后,才会将数据通过 Channel 发送出去

writeAndFlush() 方法将数据写入并立即发送(刷出)


连接问题

当我们注释了channelFuture.sync();后,服务器无法收到hello world

因为建立连接的过程是异步非阻塞的,若不通过sync()方法阻塞主线程,等待连接真正建立,这时通过 channelFuture.channel() 拿到的 Channel 对象,并不是真正与服务器建立好连接的 Channel,也就没法将信息正确的传输给服务器端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class MyEventLoopClient {
public static void main(String[] args) throws InterruptedException {
ChannelFuture channelFuture = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override //在连接建立后被调用
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringEncoder()); //将ByteBuf解码为字符串
}
})
/**
* 1、连接到服务器
* 异步非阻塞的方法
* 主线程发起了调用,真正执行connect方法的是另一个线程(NIO线程)
*/
.connect(new InetSocketAddress("localhost", 8080));

//channelFuture.sync();
/**
* 无阻塞的向下执行,获取channel
* 此时获取到的channel是一个还没完全建立好连接的channel
* 此时发送数据就会失败
*/
Channel channel = channelFuture.channel();
//2、向服务器发送数据
channel.writeAndFlush("hello,world");
}
}

解决方法

1、调用channelFuture.sync();方法。阻塞主线程,同步处理结果,等待连接真正建立好以后,再去获得 Channel 传递数据。使用该方法,获取 Channel 和发送数据的线程都是主线程

2、使用 addListener 方法异步处理结果。通过这种方法可以在NIO线程中获取 Channel 并发送数据,而不是在主线程中执行这些操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class MyClient {
public static void main(String[] args) throws IOException, InterruptedException {
ChannelFuture channelFuture = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new StringEncoder());
}
})
// 该方法为异步非阻塞方法,主线程调用后不会被阻塞,真正去执行连接操作的是NIO线程
// NIO线程:NioEventLoop 中的线程
.connect(new InetSocketAddress("localhost", 8080));

// 当connect方法执行完毕后,也就是连接真正建立后
// 会在NIO线程中调用operationComplete方法
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture channelFuture) throws Exception {
Channel channel = channelFuture.channel();
channel.writeAndFlush("hello world");
}
});
System.in.read();
}
}

关闭问题

要关闭channel时,可以调用channel.close()方法进行关闭。该方法也是一个异步方法。真正的关闭操作并不是在调用该方法的线程中执行的,而是在NIO线程中执行真正的关闭操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class MyEventLoopClient {
public static void main(String[] args) throws InterruptedException {
ChannelFuture channelFuture = new Bootstrap()
.group(new NioEventLoopGroup())
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override //在连接建立后被调用
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline().addLast(new StringEncoder()); //将ByteBuf解码为字符串
}
})
.connect(new InetSocketAddress("localhost", 8080));

Channel channel = channelFuture.sync().channel();
new Thread(()->{
Scanner scanner = new Scanner(System.in);
while (true) {
String line = scanner.nextLine();
if ("q".equals(line)){
channel.close();
break;
}
channel.writeAndFlush(line);
}
},"input").start();

}
}

解决方法

可以选择两种方法实现:在真正关闭channel后执行的一些额外操作

1、通过channel.closeFuture()方法获得对应的ChannelFuture对象,然后调用sync()方法阻塞执行操作的线程,等待channel真正关闭后,再执行其他操作

1
2
3
4
5
6
7
8
/**
* 获取 CloseFuture 对象
* 1)同步模式处理关闭
* 2)异步模式处理关闭
*/
ChannelFuture closeFuture = channel.closeFuture();
closeFuture.sync();
//......sync()方法可以保证关闭操作执行完

2、调用closeFuture.addListener方法,添加close的后续操作

1
2
3
4
5
6
7
8
9
ChannelFuture closeFuture = channel.closeFuture();
closeFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
//......此处可以写处理关闭后的相关操作
// 关闭EventLoopGroup
group.shutdownGracefully(); //优雅地停止:先拒绝接受新的任务,将运行中的线程执行完毕后才停止
}
});

Future与Promise

netty 中的 Future 与 jdk 中的 Future 同名,但是是两个接口

netty 的 Future 继承自 jdk 的 Future,而 Promise 又对 netty Future 进行了扩展

  • jdk Future 只能同步等待任务结束(或成功、或失败)才能得到结果
  • netty Future 可以同步等待任务结束得到结果,也可以异步方式得到结果,但都是要等任务结束
  • netty Promise 不仅有 netty Future 的功能,而且脱离了任务独立存在,只作为两个线程间传递结果的容器
功能/名称 jdk Future netty Future Promise
cancel 取消任务 - -
isCanceled 任务是否取消 - -
isDone 任务是否完成,不能区分成功失败 - -
get 获取任务结果,阻塞等待 - -
getNow - 获取任务结果,非阻塞,还未产生结果时返回 null -
await - 等待任务结束,如果任务失败,不会抛异常,而是通过 isSuccess 判断 -
sync - 等待任务结束,如果任务失败,抛出异常 -
isSuccess - 判断任务是否成功 -
cause - 获取失败信息,非阻塞,如果没有失败,返回null -
addLinstener - 添加回调,异步接收结果 -
setSuccess - - 设置成功结果
setFailure - - 设置失败结果

JDK Future

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Slf4j
public class TestJdkFuture {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//1、线程池
ExecutorService service = Executors.newFixedThreadPool(2);
//2、提交任务
Future<Integer> future = service.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
log.debug("线程池中的线程执行计算......");
Thread.sleep(1000);
return 50;
}
});
//3、主线程通过 future 来获取结果
log.debug("等待结果......");
Integer res = future.get();
log.debug("结果是{}",res);
}
}
image-20220921221129167

Netty Future

Netty中的Future对象,可以通过EventLoop的sumbit()方法得到

  • 可以通过Future对象的get方法,阻塞地获取返回结果
  • 可以通过getNow方法,获取结果,若还没有结果,则返回null,该方法是非阻塞的
  • 可以通过future.addListener方法,在Callable方法执行的线程中,异步获取返回结果

同步方式获取结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Slf4j
public class TestNettyFuture {
public static void main(String[] args) throws ExecutionException, InterruptedException {
NioEventLoopGroup group = new NioEventLoopGroup();
EventLoop eventLoop = group.next();
//提交任务
Future<Integer> future = eventLoop.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
log.debug("线程池中的线程执行计算......");
Thread.sleep(1000);
return 50;
}
});
//主线程通过 future 来获取结果
log.debug("等待结果......");
Integer res = future.get();
log.debug("结果是{}",res);
}
}
image-20220921221138285

异步方式获取结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Slf4j
public class TestNettyFuture {
public static void main(String[] args) throws ExecutionException, InterruptedException {
NioEventLoopGroup group = new NioEventLoopGroup();
EventLoop eventLoop = group.next();
//提交任务
Future<Integer> future = eventLoop.submit(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
log.debug("线程池中的线程执行计算......");
Thread.sleep(1000);
return 50;
}
});
// NIO线程中异步获取结果
future.addListener(new GenericFutureListener<Future<? super Integer>>() {
@Override
public void operationComplete(Future<? super Integer> future) throws Exception {
log.debug("接收结果:{}",future.getNow());
}
});
}
}
image-20220921221150212

Netty Promise

Promise相当于一个容器,可以用于存放各个线程中的结果,然后让其他线程去获取该结果

演示向promise填充正确结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Slf4j
public class TestNettyPromise {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//1.准备一个EventLoop对象
EventLoop eventLoop = new NioEventLoopGroup().next();

//2.主动创建 promise 对象
DefaultPromise<Integer> promise = new DefaultPromise<>(eventLoop);

new Thread(()->{
//3.任意一个线程执行计算,计算完毕后向promise填充结果
log.debug("开始计算...");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
promise.setSuccess(80);
}).start();

//4.接受结果的线程
log.debug("等待结果...");
log.debug("结果是:{}",promise.get());
}
}
image-20220921221159397

演示向promise填充错误结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
@Slf4j
public class TestNettyPromise {
public static void main(String[] args) throws ExecutionException, InterruptedException {
//1.准备一个EventLoop对象
EventLoop eventLoop = new NioEventLoopGroup().next();

//2.主动创建 promise 对象
DefaultPromise<Integer> promise = new DefaultPromise<>(eventLoop);

new Thread(()->{
//3.任意一个线程执行计算,计算完毕后向promise填充结果
log.debug("开始计算...");
try {
int i = 1 / 0;
Thread.sleep(1000);
promise.setSuccess(80);
} catch (InterruptedException e) {
promise.setFailure(e);
e.printStackTrace();
}
}).start();

//4.接受结果的线程
log.debug("等待结果...");
log.debug("结果是:{}",promise.get());
}
}
image-20220921221209027

Handler与Pipeline

Pipeline代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
@Slf4j
public class TestPipeline {
public static void main(String[] args) {
new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel channel) throws Exception {
//1.通过channel拿到pipeline
ChannelPipeline pipeline = channel.pipeline();
/**
* 2.添加处理器
* addLast字面意思上是把 pipeline 加入到流水线的最后一个位置
* Netty在建立的时候会将自动加上两个handler
* 一个是head Handler,另外一个是tail Handler
* 此处所加的handler是加在tail之前
*
* 此处添加了四个处理器:head -> h1 -> h2 -> h3 -> h4 ->h5 -> h6 ->tail
*/
pipeline.addLast("handler-1",new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("handler-1");
super.channelRead(ctx, msg);
}
});
pipeline.addLast("handler-2",new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("handler-2");
super.channelRead(ctx, msg);
}
});
pipeline.addLast("handler-3",new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("handler-3");
/**
* 此处可以不需要调用channelRead方法
* 因为channelRead方法目的是为了唤醒下一个入站的handler
* 此时下一个是出站的,所以没必要进行唤醒
*/
//super.channelRead(ctx, msg);

//分配了一个ByteBuffer对象,然后往ByteBuffer里面写入字节,目的为了触发下面三个出站处理器
//只有执行该方法才会触发出栈动作,如果没有写出,那么只会触发读或者写
channel.writeAndFlush(ctx.alloc().buffer().writeBytes("server....".getBytes()));
}
});
pipeline.addLast("handler-4",new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("handler-4");
super.write(ctx, msg, promise);
}
});
pipeline.addLast("handler-5",new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("handler-5");
super.write(ctx, msg, promise);
channel.writeAndFlush(ctx.alloc().buffer().writeBytes("server....".getBytes()));
}
});
pipeline.addLast("handler-6",new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("handler-6");
super.write(ctx, msg, promise);
channel.writeAndFlush(ctx.alloc().buffer().writeBytes("server....".getBytes()));
}
});
}
})
.bind(8080);
}
}

运行结果

image-20220921221220729

对channelRead传递的信息进行加工

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
@Slf4j
public class TestPipeline {
public static void main(String[] args) {
new ServerBootstrap()
.group(new NioEventLoopGroup())
.channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel channel) throws Exception {
//1.通过channel拿到pipeline
ChannelPipeline pipeline = channel.pipeline();
/**
* 2.添加处理器
* addLast字面意思上是把 pipeline 加入到流水线的最后一个位置
* Netty在建立的时候会将自动加上两个handler
* 一个是head Handler,另外一个是tail Handler
* 此处所加的handler是加在tail之前
*
* 此处添加了四个处理器:head -> h1 -> h2 -> h3 -> h4 -> tail
*/
pipeline.addLast("handler-1",new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("handler-1");
//将byteBuf加工成字符串,传递到下一个handler中
ByteBuf buf = (ByteBuf) msg;
String name = buf.toString(Charset.defaultCharset());
super.channelRead(ctx, name);
}
});

pipeline.addLast("handler-2",new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object name) throws Exception {
log.debug("handler-2");
//拿到上一个handler中的字符串,然后加工成Student对象,然后进行传递
Student student = new Student(name.toString());
super.channelRead(ctx, student);
}
});

pipeline.addLast("handler-3",new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("handler-3,结果:{},class:{}",msg,msg.getClass());
//分配了一个ByteBuffer对象,然后往ByteBuffer里面写入字节,目的为了触发下面三个出站处理器
channel.writeAndFlush(ctx.alloc().buffer().writeBytes("server....".getBytes()));
}
});

pipeline.addLast("handler-4",new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("handler-4");
super.write(ctx, msg, promise);
}
});

pipeline.addLast("handler-5",new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("handler-5");
super.write(ctx, msg, promise);
channel.writeAndFlush(ctx.alloc().buffer().writeBytes("server....".getBytes()));
}
});

pipeline.addLast("handler-6",new ChannelOutboundHandlerAdapter() {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("handler-6");
super.write(ctx, msg, promise);
channel.writeAndFlush(ctx.alloc().buffer().writeBytes("server....".getBytes()));
}
});
}
})
.bind(8080);
}
@Data
@AllArgsConstructor
static class Student {
private String name;
}
}

总结

1、通过channel.pipeline().addLast(name, handler)添加handler时,记得给handler取名字这样可以调用pipeline的addAfter、addBefore等方法更灵活地向pipeline中添加handler

2、pipeline是结构是一个带有head与tail指针的双向链表,其中的节点为handler

image-20220921221240384 image-20220921221249204
  • 要通过ctx.fireChannelRead(msg)等方法,将当前handler的处理结果传递给下一个handler

  • 当有入站(Inbound)操作时,会从head开始向后调用handler,直到handler不是处理Inbound操作为止

  • 当有出站(Outbound)操作时,会从tail开始向前调用handler,直到handler不是处理Outbound操作为止

  • 如果不调用ctx.fireChannelRead(msg)方法,就无法将当前handler的处理结果传递给下一个handler,从而无法进行下一个InboundHandler的调用。也可以使用ChannelRead内部的ctx.fireChannelRead(msg)来进行结果的传递

image-20220921221259899

socketChannel.writeAndFlush()

当handler中调用该方法进行写操作时,会触发Outbound操作,此时是从tail向前寻找OutboundHandler

image-20220921221307743

ctx.writeAndFlush()

当handler中调用该方法进行写操作时,会触发Outbound操作,此时是从当前handler向前寻找OutboundHandler

image-20220921221315793

EmbeddedChannel进行测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@Slf4j
public class TestEmbeddedChannel {
public static void main(String[] args) {
ChannelInboundHandlerAdapter h1 = new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("h1");
super.channelRead(ctx, msg);
}
};
ChannelInboundHandlerAdapter h2 = new ChannelInboundHandlerAdapter(){
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
log.debug("h2");
super.channelRead(ctx, msg);
}
};
ChannelOutboundHandlerAdapter h3 = new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("h3");
super.write(ctx, msg, promise);
}
};
ChannelOutboundHandlerAdapter h4 = new ChannelOutboundHandlerAdapter(){
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
log.debug("h4");
super.write(ctx, msg, promise);
}
};
EmbeddedChannel channel = new EmbeddedChannel(h1, h2, h3, h4);
//模拟入站输入数据
channel.writeInbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello".getBytes()));

//模拟出战写出数据
channel.writeOutbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("world".getBytes()));
}
}
image-20220921221323440

ByteBuf

创建

1
2
3
4
5
6
7
8
9
10
11
12
public static void main(String[] args) {
//不指定容量,默认是256
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();
System.out.println(buf.toString());
StringBuilder stringBuilder = new StringBuilder();
//测试ByteBuf容量的可变性
for (int i = 0; i < 300; i++) {
stringBuilder.append("a");
}
buf.writeBytes(stringBuilder.toString().getBytes());
System.out.println(buf);
}
image-20220921221331119

ByteBuf通过ByteBufAllocator选择allocator并调用对应的buffer()方法来创建的,默认使用直接内存作为ByteBuf,容量为256个字节,可以指定初始容量的大小

当ByteBuf的容量无法容纳所有数据时,ByteBuf会进行扩容操作,如果在handler中创建ByteBuf,建议使用ChannelHandlerContext ctx.alloc().buffer()来创建


直接内存与堆内存

ByteBuf支持两种内存分配,一种是直接内存,一种是堆内存,Netty默认使用的是直接内存

堆内存的分配效率比较高,但是读写效率比较低

直接内存分配效率比较低,读写效率比较高

1、直接内存使用的是系统内存,在磁盘中读取文件时,可以将文件直接读入系统内存,系统内存可以用直接内存的方式映射到JVM中,他们访问的都是同一块内存,可以减少一次内存的复制

2、堆内存收到垃圾回收的影响,垃圾回收会涉及到对象的复制和移动,会影响效率


池化与非池化

对于一些创建比较耗时的资源,可以使用池的思想进行优化,池化的最大意义在于可以重用ByteBuf

比如数据库连接创建很耗时,所以我们就可以用预先将连接对象创建好,在使用的过程中,就可以直接拿到连接,在使用完毕后,再将连接归还

池化优点:

1、如果没有池化,那么每次都得创建新的 ByteBuf 实例,这个操作堆直接内存代价很高,就算是堆内存,也会增加GC的压力

2、池化可以重用池中的ByteBuf实例,并且采用了与 jemalloc 类似的内存分配算法提升分配效率

3、高并发时,池化功能更节约内存,减少内存溢出的可能

池化功能默认是开启的,也可以通过系统环境变量来进行设置

1
-Dio.netty.allocator.type={unpooled|pooled}

组成

  • 在构造ByteBuf时,可传入两个参数,分别代表初始容量和最大容量,若未传入第二个参数(最大容量),最大容量默认为Integer.MAX_VALUE
  • 当ByteBuf容量无法容纳所有数据时,会进行扩容操作,若超出最大容量,会抛出java.lang.IndexOutOfBoundsException异常

注意:最开始读写指针都在 0 位置

读写操作不同于ByteBuffer只用position进行控制,ByteBuf分别由读指针和写指针两个指针控制

  • 进行读写操作时,无需进行模式的切换

  • 读指针前的部分被称为废弃部分,是已经读过的内容

  • 读指针与写指针之间的空间称为可读部分

  • 写指针与当前容量之间的空间称为可写部分

image-20220921221346623

写入

方法 含义 备注
writeBoolean(boolean value) 写入 boolean 值 用一个字节 01|00 代表 true|false
writeByte(int value) 写入 byte 值
writeShort(int value) 写入 short 值
writeInt(int value) 写入 int 值 Big Endian(大端写入)如 0x250,写入后 00 00 02 50
writeIntLE(int value) 写入 int 值 Little Endian(小端写入),即 0x250,写入后 50 02 00 00
writeLong(long value) 写入 long 值
writeChar(int value) 写入 char 值
writeFloat(float value) 写入 float 值
writeDouble(double value) 写入 double 值
writeBytes(ByteBuf src) 写入 netty 的 ByteBuf
writeBytes(byte[] src) 写入 byte[]
writeBytes(ByteBuffer src) 写入 nio 的 ByteBuffer
int writeCharSequence(CharSequence sequence, Charset charset) 写入字符串 CharSequence为字符串类的父类,第二个参数为对应的字符集

注意

  • 这些方法的未指明返回值的,其返回值都是 ByteBuf,意味着可以链式调用来写入不同的数据
  • 网络传输中,默认习惯是 Big Endian,使用 writeInt(int value)
  • 还有一类方法是 set 开头的一系列方法,也可以写入数据,但不会改变写指针位置
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class ByteBufStudy {
public static void main(String[] args) {
// 创建ByteBuf
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16, 20);
ByteBufUtil.log(buffer);

// 向buffer中写入数据
buffer.writeBytes(new byte[]{1, 2, 3, 4});
ByteBufUtil.log(buffer);

//大端写入
buffer.writeInt(5);
ByteBufUtil.log(buffer);

//小端写入
buffer.writeIntLE(6);
ByteBufUtil.log(buffer);

//写入Long值
buffer.writeLong(7);
ByteBufUtil.log(buffer);
}
}

运行结果

image-20220921221404096

扩容

当ByteBuf中的容量无法容纳写入的数据时(初始容量是10),会进行扩容操作

1
2
buffer.writeInt(6);
log(buffer);
image-20220921221415418

扩容规则

如果写入后数据大小未超过 512 字节,则选择下一个 16 的整数倍进行扩容

  • 例如写入后大小为 12 字节,则扩容后 capacity 是 16 字节

如果写入后数据大小超过 512 字节,则选择下一个 2^n

  • 例如写入后大小为 513 字节,则扩容后 capacity 是 2^10=1024 字节(2^9=512 已经不够了)

扩容不能超过 maxCapacity,否则会抛出java.lang.IndexOutOfBoundsException异常


读取

读取主要是通过一系列read方法进行读取,读取时会根据读取数据的字节数移动读指针

注意:读过的内容,属于废弃部分,再次读取只能读取那些尚未读取的部分

如果需要重复读取,需要调用buffer.markReaderIndex()对读指针进行标记,并通过buffer.resetReaderIndex()将读指针恢复到mark标记的位置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class ByteBufStudy {
public static void main(String[] args) {
// 创建ByteBuf
ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16, 20);

// 向buffer中写入数据
buffer.writeBytes(new byte[]{1, 2, 3, 4});
buffer.writeInt(5);

// 读取4个字节
System.out.println(buffer.readByte());
System.out.println(buffer.readByte());
System.out.println(buffer.readByte());
System.out.println(buffer.readByte());
ByteBufUtil.log(buffer);

// 通过mark与reset实现重复读取
buffer.markReaderIndex();
System.out.println(buffer.readInt());
ByteBufUtil.log(buffer);

// 恢复到mark标记处
buffer.resetReaderIndex();
ByteBufUtil.log(buffer);
}
}
image-20220921221426655

释放

由于 Netty 中有堆外内存的 ByteBuf 实现,堆外内存最好是通过手动来进行释放,而不是等待垃圾回收

  • UnpooledHeapByteBuf 使用的是 JVM 内存,只需等 GC 回收内存即可
  • UnpooledDirectByteBuf 使用的就是直接内存了,需要特殊的方法来回收内存
  • PooledByteBuf 和它的子类使用了池化机制,需要更复杂的规则来回收内存

Netty采用了引用计数法来控制回收内存,每个 ByteBuf 都实现了 ReferenceCounted 接口

  • 每个 ByteBuf 对象的初始计数为1
  • 调用 release 方法计数减1,如果计数为0,ByteBuf 内存被回收
  • 调用 retain 方法计数加1,表示调用者没用完之前,其他 handler 即使调用了 release 也不会造成回收
  • 当计数为 0 时,底层内存会被回收,这时即使 ByteBuf 对象还存在,其各个方法均无法正常使用

因为 pipeline 的存在,一般需要将 ByteBuf 传递给下一个 ChannelHandler,如果传递到最后,则总是由tail和head进行释放。在这个过程中,如果 ByteBuf 进行了转换,tail或head拿到的就不是原本的 ByteBuf 了,也就无法进行释放。所以一般谁最后使用了,谁负责 release

1、起点,对于 NIO 实现来讲,在 io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe.read 方法中首次创建 ByteBuf 放入 pipeline(line 163 pipeline.fireChannelRead(byteBuf))

2、入站 ByteBuf 处理原则

  • 对原始 ByteBuf 不做处理,调用 ctx.fireChannelRead(msg) 向后传递,这时无须 release
  • 将原始 ByteBuf 转换为其它类型的 Java 对象,这时 ByteBuf 就没用了,必须 release
  • 如果不调用 ctx.fireChannelRead(msg) 向后传递,那么也必须 release
  • 注意各种异常,如果 ByteBuf 没有成功传递到下一个 ChannelHandler,必须 release
  • 假设消息一直向后传,那么 TailContext 会负责释放未处理消息(原始的 ByteBuf)

3、出站 ByteBuf 处理原则

  • 出站消息最终都会转为 ByteBuf 输出,一直向前传,由 HeadContext flush 后 release

4、异常处理原则:不清楚 ByteBuf 被引用了多少次,但又必须彻底释放,可以循环调用 release 直到返回 true

1
while (!buffer.release()) {}

当ByteBuf被传到了pipeline的head与tail时,ByteBuf会被其中的方法彻底释放,但前提是ByteBuf被传递到了head与tail中

TailConext中释放ByteBuf的源码

1
2
3
4
5
6
7
8
protected void onUnhandledInboundMessage(Object msg) {
try {
logger.debug("Discarded inbound message {} that reached at the tail of the pipeline. Please check your pipeline configuration.", msg);
} finally {
// 具体的释放方法
ReferenceCountUtil.release(msg);
}
}

判断传过来的是否为ByteBuf,是的话才需要释放

1
2
3
public static boolean release(Object msg) {
return msg instanceof ReferenceCounted ? ((ReferenceCounted)msg).release() : false;
}

零拷贝

slice

是零拷贝的体现之一,对原始的 ByteBuf 进行切片成多个 ByteBuf(逻辑上并非物理上),切片后的 ByteBuf 并没有发生内存复制,还是使用原始 ByteBuf 的内存,切片后的 ByteBuf 维护独立的 read,write指针

image-20220921215859047
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public static void main(String[] args) {
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(10);
buf.writeBytes(new byte[]{'a','b','c','d','e','f','g','h','i','j'});
log(buf);
/**
* 参数一:切片的起点
* 参数二:切片的长度
* 在切片的过程中并没有出现数据的复制
*/
ByteBuf f1 = buf.slice(0, 5);log(f1);
ByteBuf f2 = buf.slice(5, 5);log(f2);
/**
* slice方法对切片出来的ByteBuf做了限制
* UnpooledSlicedByteBuf(ridx: 0, widx: 5, cap: 5/5
* unwrapped: PooledUnsafeDirectByteBuf(ridx: 0, widx: 10, cap: 10))
* f1.writeByte('x');
*/
}
image-20220921220124685

当原有的ByteBuf调用release释放的时候,为了避免切片出来的f1和f2受到影响,一般在slice后,调用retain方法

image-20220921221450641

duplicate

也是零拷贝的体现之一,截取了原始ByteBuf的所有内容,并且没有 max capacity 的限制,也是和原来的ByteBuf 使用同一块底层内存,只是读写指针是独立的

image-20220921221501863

copy

会将底层内存数据进行深拷贝,因此无论读写,都与原始 ByteBuf 无关


CompositeByteBuf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public static void main(String[] args) {
ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer();
buf1.writeBytes(new byte[]{1,2,3,4,5});

ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer();
buf2.writeBytes(new byte[]{6,7,8,9,10});

ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();
//此处会发生两次数据复制,将 buf1 和 buf2 的数据分别复制到 buf 中
// buf.writeBytes(buf1).writeBytes(buf2);log(buf);

CompositeByteBuf compositeByteBuf = ByteBufAllocator.DEFAULT.compositeBuffer();
compositeByteBuf.addComponents(true,buf1,buf2);
log(compositeByteBuf);
}
image-20220921221513171

Unpooled工具类

提供了非池化的 ByteBuf 创建,组合,复制等操作

例子:wrappedBuffer方法,包装 ByteBuf

1
2
3
4
5
6
7
8
ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer(5);
buf1.writeBytes(new byte[]{1, 2, 3, 4, 5});
ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer(5);
buf2.writeBytes(new byte[]{6, 7, 8, 9, 10});

// 当包装 ByteBuf 个数超过一个时, 底层使用了 CompositeByteBuf
ByteBuf buf3 = Unpooled.wrappedBuffer(buf1, buf2);
System.out.println(ByteBufUtil.prettyHexDump(buf3));
image-20220921221524022

也可以用来包装普通字节数组,底层也不会有拷贝操作

1
2
3
ByteBuf buf4 = Unpooled.wrappedBuffer(new byte[]{1, 2, 3}, new byte[]{4, 5, 6});
System.out.println(buf4.getClass());
System.out.println(ByteBufUtil.prettyHexDump(buf4));
image-20220921221533537

总结优势

image-20220921221542304

粘包现象

服务端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public class TestServer {
static final Logger log = LoggerFactory.getLogger(TestServer.class);
void start() {
NioEventLoopGroup boss = new NioEventLoopGroup(1);
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.group(boss, worker);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.debug("connected {}", ctx.channel());
super.channelActive(ctx);
}

@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
log.debug("disconnect {}", ctx.channel());
super.channelInactive(ctx);
}
});
}
});
ChannelFuture channelFuture = serverBootstrap.bind(8080);
log.debug("{} binding...", channelFuture.channel());
channelFuture.sync();
log.debug("{} bound...", channelFuture.channel());
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.error("server error", e);
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
log.debug("stoped");
}
}

public static void main(String[] args) {
new TestServer().start();
}
}

客户端

客户端代码希望发送 10 个消息,每个消息是 16 字节

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public class TestClient {
static final Logger log = LoggerFactory.getLogger(TestClient.class);
public static void main(String[] args) {
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(worker);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
/**
* 加入自定义的入站 handler
* channelActive:在channel连接建立好后触发该事件
*/
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//每循环一次,就往 ByteBuf 添加16个byte的数据,并发送
for (int i = 0; i < 10; i++) {
ByteBuf buffer = ctx.alloc().buffer();
buffer.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15});
ctx.writeAndFlush(buffer);
}
}
});
}
});
ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync();
channelFuture.channel().closeFuture().sync();

} catch (InterruptedException e) {
log.error("client error", e);
} finally {
worker.shutdownGracefully();
}
}
}

服务器端一次就接收了 160 个字节,而非分 10 次接收

image-20220921221600029

半包现象

客户端

客户端代码希望发送 1 个消息,这个消息是 160 字节,代码改为

1
2
3
4
5
ByteBuf buffer = ctx.alloc().buffer();
for (int i = 0; i < 10; i++) {
buffer.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15});
}
ctx.writeAndFlush(buffer);

服务端

服务端修改一下接收缓冲区,其它代码不变

注意:serverBootstrap.option(ChannelOption.SO_RCVBUF, 10) 影响的底层接收缓冲区(即滑动窗口)大小,仅决定了 netty 读取的最小单位,netty 实际每次读取的一般是它的整数倍

1
serverBootstrap.option(ChannelOption.SO_RCVBUF, 10);

服务器端输出可以看到,接收的消息被分为两节,第一次 20 字节,第二次 140 字节

image-20220921221617581

现象分析

滑动窗口

TCP 以一个段(segment)为单位,每发送一个段就需要进行一次确认应答(ack)处理,但如果这么做,缺点是包的往返时间越长,性能就越差

image-20220921221629455

为了解决此问题,引入了窗口概念,窗口大小即决定了无需等待应答而可以继续发送的数据最大值

  • 窗口实际就起到一个缓冲区的作用,同时也能起到流量控制的作用
  • 窗口内的数据才允许被发送,当应答未到达前,窗口必须停止滑动
  • 如果数据 ack 返回了,窗口就可以向前滑动
  • 接收方也会维护一个窗口,只有落在窗口内的数据才能允许接收

粘包

现象:发送 abc def,接受为 abcdef

原因

  • 应用层:接受方 ByteBuf 设置太大(Netty默认为1024)
  • 滑动窗口:假设发送方 256 bytes 表示一个完整报文,但由于接受方处理不及时且窗口大小足够大,这部分256 bytes 字节就会缓冲在接受方的滑动窗口中,当滑动窗口中缓冲了多个报文就会造成粘包
  • Nagle算法:TCP-IP详解:Nagle算法

半包

现象:

  • 发送 abcdef,接收 abc def

原因

  • 应用层:接收方 ByteBuf 小于实际发送数据量
  • 传输层-网络层:滑动窗口的问题,假设接收方的窗口只剩了 128 bytes,发送方的报文大小是 256 bytes,这时接收方窗口中无法容纳发送方的全部报文,发送方只能先发送前 128 bytes,等待 ack 后才能发送剩余部分,这就造成了半包
  • 数据链路层:MSS 限制:当发送的数据超过 MSS 限制后,会将数据切分发送,就会造成半包

粘包、拆包发生原因滑动窗口、MSS/MTU限制、Nagle算法(博客园)


解决粘包半包现象

短链接

可以解决粘包的现象,但是不能解决半包的现象

客户端每次向服务器发送数据以后,就与服务器断开连接,此时的消息边界为连接建立到连接断开。这时便无需使用滑动窗口等技术来缓冲数据,则不会发生粘包现象。但如果一次性数据发送过多,接收方无法一次性容纳所有数据,还是会发生半包现象,所以短链接无法解决半包现象

客户端代码改进

修改channelActive方法

1
2
3
4
5
6
7
8
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.debug("sending...");
ByteBuf buffer = ctx.alloc().buffer(16);
buffer.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15});
ctx.writeAndFlush(buffer);
// 使用短链接,每次发送完毕后就断开连接
ctx.channel().close();
}

将发送步骤整体封装为send()方法,调用10次send()方法,模拟发送10次数据

1
2
3
4
5
6
public static void main(String[] args) {
// 发送10次
for (int i = 0; i < 10; i++) {
send();
}
}

定长解码器

客户端于服务器约定一个最大长度,保证客户端每次发送的数据长度都不会大于该长度。若发送数据长度不足则需要补齐至该长度

服务器接收数据时,将接收到的数据按照约定的最大长度进行拆分,即使发送过程中产生了粘包,也可以通过定长解码器将数据正确地进行拆分。服务端需要用到FixedLengthFrameDecoder对数据进行定长解码,具体使用方法如下

1
ch.pipeline().addLast(new FixedLengthFrameDecoder(16));

待补充……


行解码器

待补充……

长度字段解码器

待补充……

协议设计

待补充……

聊天室案例

业务概述

用户登录接口

1
2
3
4
5
6
7
8
9
public interface UserService {
/**
* 登录
* @param username 用户名
* @param password 密码
* @return 登录成功返回 true, 否则返回 false
*/
boolean login(String username, String password);
}

用户会话接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
public interface Session {

/**
* 绑定会话
* @param channel 哪个 channel 要绑定会话
* @param username 会话绑定用户
*/
void bind(Channel channel, String username);

/**
* 解绑会话
* @param channel 哪个 channel 要解绑会话
*/
void unbind(Channel channel);

/**
* 获取属性
* @param channel 哪个 channel
* @param name 属性名
* @return 属性值
*/
Object getAttribute(Channel channel, String name);

/**
* 设置属性
* @param channel 哪个 channel
* @param name 属性名
* @param value 属性值
*/
void setAttribute(Channel channel, String name, Object value);

/**
* 根据用户名获取 channel
* @param username 用户名
* @return channel
*/
Channel getChannel(String username);
}Copy

群聊会话接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
public interface GroupSession {

/**
* 创建一个聊天组, 如果不存在才能创建成功, 否则返回 null
* @param name 组名
* @param members 成员
* @return 成功时返回组对象, 失败返回 null
*/
Group createGroup(String name, Set<String> members);

/**
* 加入聊天组
* @param name 组名
* @param member 成员名
* @return 如果组不存在返回 null, 否则返回组对象
*/
Group joinMember(String name, String member);

/**
* 移除组成员
* @param name 组名
* @param member 成员名
* @return 如果组不存在返回 null, 否则返回组对象
*/
Group removeMember(String name, String member);

/**
* 移除聊天组
* @param name 组名
* @return 如果组不存在返回 null, 否则返回组对象
*/
Group removeGroup(String name);

/**
* 获取组成员
* @param name 组名
* @return 成员集合, 如果群不存在或没有成员会返回 empty set
*/
Set<String> getMembers(String name);

/**
* 获取组成员的 channel 集合, 只有在线的 channel 才会返回
* @param name 组名
* @return 成员 channel 集合
*/
List<Channel> getMembersChannel(String name);

/**
* 判断群聊是否一被创建
* @param name 群聊名称
* @return 是否存在
*/
boolean isCreated(String name);
}

登录

客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
@Slf4j
public class ChatClient {
public static void main(String[] args) {
NioEventLoopGroup group = new NioEventLoopGroup();
LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.channel(NioSocketChannel.class);
bootstrap.group(group);
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ProcotolFrameDecoder());
ch.pipeline().addLast(LOGGING_HANDLER);
ch.pipeline().addLast(MESSAGE_CODEC);
ch.pipeline().addLast("client handler",new ChannelInboundHandlerAdapter(){
/**
* 在链接建立后触发 active 事件
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
//在负责接收用户在控制台的输入,向服务器发送消息
new Thread(()->{
Scanner scanner = new Scanner(System.in);
System.out.println("请输入用户名:");
String username = scanner.nextLine();
System.out.println("请输入密码:");
String password = scanner.nextLine();
//构造消息对象
LoginRequestMessage message = new LoginRequestMessage(username, password);
//发送消息
ctx.writeAndFlush(message);

System.out.println("等待后续操作");
try {
System.in.read();
} catch (IOException e) {
e.printStackTrace();
}
},"system in").start();
super.channelActive(ctx);
}
});
}
});
Channel channel = bootstrap.connect("localhost", 8080).sync().channel();
channel.closeFuture().sync();
} catch (Exception e) {
log.error("client error", e);
} finally {
group.shutdownGracefully();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
@Slf4j
public class ChatServer {
public static void main(String[] args) {
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG);
MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable();
try {
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.channel(NioServerSocketChannel.class);
serverBootstrap.group(boss, worker);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ProcotolFrameDecoder());
ch.pipeline().addLast(LOGGING_HANDLER);
ch.pipeline().addLast(MESSAGE_CODEC);
//处理 LoginRequestMessage
ch.pipeline().addLast(new SimpleChannelInboundHandler<LoginRequestMessage>() {
@Override
protected void channelRead0(ChannelHandlerContext ctx, LoginRequestMessage msg) throws Exception {
String username = msg.getUsername();
String password = msg.getPassword();
boolean login = UserServiceFactory.getUserService().login(username, password);
LoginResponseMessage responseMessage;
//登录成功
if (login){
responseMessage = new LoginResponseMessage(true, "登录成功");
}else {
//登录失败
responseMessage = new LoginResponseMessage(false, "登录失败");
}
//返回消息给客户端
ctx.writeAndFlush(responseMessage);
}
});
}
});
Channel channel = serverBootstrap.bind(8080).sync().channel();
channel.closeFuture().sync();
} catch (InterruptedException e) {
log.error("server error", e);
} finally {
boss.shutdownGracefully();
worker.shutdownGracefully();
}
}
}