概述
Netty 是一个异步的、基于事件驱动的网络应用框架,用于快速开发可维护、高性能的网络服务器和客户端
注意:Netty的异步还是基于多路复用的,并没有实现真正意义上的异步IO
优势
入门案例
添加依赖
1 2 3 4 5
| <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.39.Final</version> </dependency>
|
服务器步骤
1、创建启动器类,添加组件(NioEventLoopGroup)
2、添加一个ServerSocketChannel的实现
3、添加处理器和初始化器
4、绑定监听端口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| public class HelloServer { public static void main(String[] args) { new ServerBootstrap() .group(new NioEventLoopGroup()) .channel(NioServerSocketChannel.class)
.childHandler(
new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception { nioSocketChannel.pipeline().addLast(new StringDecoder()); nioSocketChannel.pipeline().addLast(new ChannelInboundHandlerAdapter() { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println(msg); } }); } }) .bind(8080); } }
|
客户端步骤
1、创建启动器类,添加组件
2、添加一个SocketChannel的实现
3、添加处理器
4、连接到服务器
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| public class HelloClient { public static void main(String[] args) throws InterruptedException { new Bootstrap() .group(new NioEventLoopGroup()) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) throws Exception { ch.pipeline().addLast(new StringEncoder()); } }) .connect(new InetSocketAddress("localhost",8080)) .sync() .channel() .writeAndFlush("hello,world"); } }
|
整体流程
Channel 可以理解为数据的通道
msg 理解为流动的数据,最开始输入是 ByteBuf,但经过 pipeline 中的各个 handler 加工,会变成其它类型对象,最后输出又变成 ByteBuf
handler 可以理解为数据的处理工序
- 工序有多道,合在一起就是 pipeline(传递途径)pipeline 负责发布事件(读、读取完成…)传播给每个 handler, handler 对自己感兴趣的事件进行处理(重写了相应事件处理方法)
- pipeline 中有多个 handler,处理时会依次调用其中的 handler
- handler 分 Inbound(入栈) 和 Outbound(出栈) 两类
EventLoop 可以理解为处理数据的工人
- EventLoop 可以管理多个 channel 的 IO 操作,并且一旦 EventLoop 负责了某个 channel,就会将其与channel进行绑定,以后该 channel 中的 IO 操作都由该 EventLoop 负责
- EventLoop 既可以执行 IO 操作,也可以进行任务处理,每个 EventLoop 有自己的任务队列,队列里可以堆放多个 channel 的待处理任务,任务分为普通任务、定时任务
- EventLoop 按照 pipeline 顺序,依次按照 handler 的规划(代码)处理数据,可以为每个 handler 指定不同的 EventLoop
左侧为Client,右侧为server
EventLoop相关
EventLoop本质上是一个单线程执行器(同时维护了一个Selector),里面由run方法处理Channel上源源不断的IO事件
继承关系
- 一条线继承自:java.util.concurrent.ScheduledExecutorService 因此包含了线程池中所有的方法
- 另外一条线继承自Netty本身的OrderedEventExecutor
- 提供了 BooleaninEventLoop(Thread thread) 方法判断一个线程是否属于此 EventLoop
- 提供了 EventLoopGroup parent() 方法来看看自身属于哪个 EventLoopGroup
EventLoopGroup
EventLoopGroup 是一组 EventLoop,Channel 一般会调用 EventLoopGroup 的 register 方法来绑定其中一个 EventLoop,后续这个 Channel 上的 IO 事件都由此 EventLoop 来处理(保证了 IO 事件处理时的线程安全)
注意:EventLoopGroup继承自 Netty 自己的 EventExecutorGroup
- 实现了 Iterable 接口提供遍历 EventLoop 的能力
- 另有 next 方法获取集合中下一个 EventLoop
处理普通任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| @Slf4j public class MyTestEventLoop { public static void main(String[] args) {
EventLoopGroup group = new NioEventLoopGroup();
group.next();
group.next().submit(()->{ try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } log.debug("ok"); }); log.debug("main"); } }
|
输出结果
处理定时任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| @Slf4j public class MyTestEventLoop { public static void main(String[] args) { EventLoopGroup group = new NioEventLoopGroup();
group.next();
group.next().scheduleAtFixedRate(()->{ log.debug("ok"); },0,1, TimeUnit.SECONDS); } }
|
输出结果:每隔一秒打印一次
处理IO事件
服务器代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| @Slf4j public class MyEventLoopServer { public static void main(String[] args) { new ServerBootstrap() .group(new NioEventLoopGroup()) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) throws Exception { ch.pipeline().addLast(new ChannelInboundHandlerAdapter(){ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; log.debug(buf.toString(Charset.defaultCharset())); } }); } }) .bind(8080); } }
|
客户端代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| public class MyEventLoopClient { public static void main(String[] args) throws InterruptedException { Channel channel = new Bootstrap() .group(new NioEventLoopGroup()) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) throws Exception { ch.pipeline().addLast(new StringEncoder()); } }) .connect(new InetSocketAddress("localhost", 8080)) .sync() .channel(); System.out.println(channel); } }
|
分工
Bootstrap的group()方法可以传入两个EventLoopGroup参数,分别负责处理不同的事件
1 2 3 4 5 6 7 8
| public class MyServer { public static void main(String[] args) { new ServerBootstrap() .group(new NioEventLoopGroup(1), new NioEventLoopGroup(2)) ... } }
|
多个客户端分别发送:
1 2 3 4 5
| nioEventLoopGroup-3-1 hello1 nioEventLoopGroup-3-2 hello2 nioEventLoopGroup-3-1 hello3 nioEventLoopGroup-3-2 hello4 nioEventLoopGroup-3-2 hello4
|
总结:一个EventLoop可以负责多个Channel,且EventLoop一旦与Channel绑定,则一直负责处理该Channel中的事件
增加自定义EventLoopGroup
当有的任务需要较长的时间处理时,可以使用非NioEventLoopGroup,避免同一个NioEventLoop中的其他Channel在较长的时间内都无法得到处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| @Slf4j public class MyEventLoopServer { public static void main(String[] args) { EventLoopGroup group = new DefaultEventLoopGroup(); new ServerBootstrap() .group(new NioEventLoopGroup(),new NioEventLoopGroup(2)) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) throws Exception { ch.pipeline().addLast("handler-1",new ChannelInboundHandlerAdapter(){ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; log.debug(buf.toString(Charset.defaultCharset())); ctx.fireChannelRead(msg); } }).addLast(group,"handler-2",new ChannelInboundHandlerAdapter(){ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf buf = (ByteBuf) msg; log.debug(buf.toString(Charset.defaultCharset())); } }); } }) .bind(8080); } }
|
启动四个客户端发送数据
1 2 3 4 5 6 7 8
| nioEventLoopGroup-4-1 hello1 defaultEventLoopGroup-2-1 hello1 nioEventLoopGroup-4-2 hello2 defaultEventLoopGroup-2-2 hello2 nioEventLoopGroup-4-1 hello3 defaultEventLoopGroup-2-3 hello3 nioEventLoopGroup-4-2 hello4 defaultEventLoopGroup-2-4 hello4
|
总结:客户端与服务器之间的事件,被nioEventLoopGroup和defaultEventLoopGroup分别处理
切换线程
不同的EventLoopGroup切换的实现原理如下:
(当handler中绑定的Group不同时,需要切换Group来执行不同的任务)
如果两个handler绑定的是同一个线程,那么就直接调用;否则把要调用的代码封装为一个任务对象,由下一个handler的线程来调用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) { final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) { next.invokeChannelRead(m); } else { executor.execute(new Runnable() { public void run() { next.invokeChannelRead(m); } }); } }
|
Channel相关
close() 可以用来关闭Channel
closeFuture() 用来处理 Channel 的关闭
- sync 方法作用是同步等待 Channel 关闭
- 而 addListener 方法是异步等待 Channel 关闭
pipeline() 方法用于添加处理器
write() 方法将数据写入到缓冲区中
- 因为缓冲机制,数据被写入到 Channel 中以后,不会立即被发送
- 只有当缓冲满了或者调用了flush()方法后,才会将数据通过 Channel 发送出去
writeAndFlush() 方法将数据写入并立即发送(刷出)
连接问题
当我们注释了channelFuture.sync();
后,服务器无法收到hello world
因为建立连接的过程是异步非阻塞的,若不通过sync()
方法阻塞主线程,等待连接真正建立,这时通过 channelFuture.channel() 拿到的 Channel 对象,并不是真正与服务器建立好连接的 Channel,也就没法将信息正确的传输给服务器端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| public class MyEventLoopClient { public static void main(String[] args) throws InterruptedException { ChannelFuture channelFuture = new Bootstrap() .group(new NioEventLoopGroup()) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) throws Exception { ch.pipeline().addLast(new StringEncoder()); } })
.connect(new InetSocketAddress("localhost", 8080));
Channel channel = channelFuture.channel(); channel.writeAndFlush("hello,world"); } }
|
解决方法
1、调用channelFuture.sync();
方法。阻塞主线程,同步处理结果,等待连接真正建立好以后,再去获得 Channel 传递数据。使用该方法,获取 Channel 和发送数据的线程都是主线程
2、使用 addListener
方法异步处理结果。通过这种方法可以在NIO线程中获取 Channel 并发送数据,而不是在主线程中执行这些操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| public class MyClient { public static void main(String[] args) throws IOException, InterruptedException { ChannelFuture channelFuture = new Bootstrap() .group(new NioEventLoopGroup()) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { socketChannel.pipeline().addLast(new StringEncoder()); } }) .connect(new InetSocketAddress("localhost", 8080)); channelFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture channelFuture) throws Exception { Channel channel = channelFuture.channel(); channel.writeAndFlush("hello world"); } }); System.in.read(); } }
|
关闭问题
要关闭channel时,可以调用channel.close()方法进行关闭。该方法也是一个异步方法。真正的关闭操作并不是在调用该方法的线程中执行的,而是在NIO线程中执行真正的关闭操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| public class MyEventLoopClient { public static void main(String[] args) throws InterruptedException { ChannelFuture channelFuture = new Bootstrap() .group(new NioEventLoopGroup()) .channel(NioSocketChannel.class) .handler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel ch) throws Exception { ch.pipeline().addLast(new StringEncoder()); } }) .connect(new InetSocketAddress("localhost", 8080)); Channel channel = channelFuture.sync().channel(); new Thread(()->{ Scanner scanner = new Scanner(System.in); while (true) { String line = scanner.nextLine(); if ("q".equals(line)){ channel.close(); break; } channel.writeAndFlush(line); } },"input").start(); } }
|
解决方法
可以选择两种方法实现:在真正关闭channel后执行的一些额外操作
1、通过channel.closeFuture()方法获得对应的ChannelFuture对象,然后调用sync()方法阻塞执行操作的线程,等待channel真正关闭后,再执行其他操作
1 2 3 4 5 6 7 8
|
ChannelFuture closeFuture = channel.closeFuture(); closeFuture.sync();
|
2、调用closeFuture.addListener方法,添加close的后续操作
1 2 3 4 5 6 7 8 9
| ChannelFuture closeFuture = channel.closeFuture(); closeFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { group.shutdownGracefully(); } });
|
Future与Promise
netty 中的 Future 与 jdk 中的 Future 同名,但是是两个接口
netty 的 Future 继承自 jdk 的 Future,而 Promise 又对 netty Future 进行了扩展
- jdk Future 只能同步等待任务结束(或成功、或失败)才能得到结果
- netty Future 可以同步等待任务结束得到结果,也可以异步方式得到结果,但都是要等任务结束
- netty Promise 不仅有 netty Future 的功能,而且脱离了任务独立存在,只作为两个线程间传递结果的容器
功能/名称 |
jdk Future |
netty Future |
Promise |
cancel |
取消任务 |
- |
- |
isCanceled |
任务是否取消 |
- |
- |
isDone |
任务是否完成,不能区分成功失败 |
- |
- |
get |
获取任务结果,阻塞等待 |
- |
- |
getNow |
- |
获取任务结果,非阻塞,还未产生结果时返回 null |
- |
await |
- |
等待任务结束,如果任务失败,不会抛异常,而是通过 isSuccess 判断 |
- |
sync |
- |
等待任务结束,如果任务失败,抛出异常 |
- |
isSuccess |
- |
判断任务是否成功 |
- |
cause |
- |
获取失败信息,非阻塞,如果没有失败,返回null |
- |
addLinstener |
- |
添加回调,异步接收结果 |
- |
setSuccess |
- |
- |
设置成功结果 |
setFailure |
- |
- |
设置失败结果 |
JDK Future
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| @Slf4j public class TestJdkFuture { public static void main(String[] args) throws ExecutionException, InterruptedException { ExecutorService service = Executors.newFixedThreadPool(2); Future<Integer> future = service.submit(new Callable<Integer>() { @Override public Integer call() throws Exception { log.debug("线程池中的线程执行计算......"); Thread.sleep(1000); return 50; } }); log.debug("等待结果......"); Integer res = future.get(); log.debug("结果是{}",res); } }
|
Netty Future
Netty中的Future对象,可以通过EventLoop的sumbit()方法得到
- 可以通过Future对象的get方法,阻塞地获取返回结果
- 可以通过getNow方法,获取结果,若还没有结果,则返回null,该方法是非阻塞的
- 可以通过future.addListener方法,在Callable方法执行的线程中,异步获取返回结果
同步方式获取结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| @Slf4j public class TestNettyFuture { public static void main(String[] args) throws ExecutionException, InterruptedException { NioEventLoopGroup group = new NioEventLoopGroup(); EventLoop eventLoop = group.next(); Future<Integer> future = eventLoop.submit(new Callable<Integer>() { @Override public Integer call() throws Exception { log.debug("线程池中的线程执行计算......"); Thread.sleep(1000); return 50; } }); log.debug("等待结果......"); Integer res = future.get(); log.debug("结果是{}",res); } }
|
异步方式获取结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| @Slf4j public class TestNettyFuture { public static void main(String[] args) throws ExecutionException, InterruptedException { NioEventLoopGroup group = new NioEventLoopGroup(); EventLoop eventLoop = group.next(); Future<Integer> future = eventLoop.submit(new Callable<Integer>() { @Override public Integer call() throws Exception { log.debug("线程池中的线程执行计算......"); Thread.sleep(1000); return 50; } }); future.addListener(new GenericFutureListener<Future<? super Integer>>() { @Override public void operationComplete(Future<? super Integer> future) throws Exception { log.debug("接收结果:{}",future.getNow()); } }); } }
|
Netty Promise
Promise相当于一个容器,可以用于存放各个线程中的结果,然后让其他线程去获取该结果
演示向promise填充正确结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| @Slf4j public class TestNettyPromise { public static void main(String[] args) throws ExecutionException, InterruptedException { EventLoop eventLoop = new NioEventLoopGroup().next();
DefaultPromise<Integer> promise = new DefaultPromise<>(eventLoop);
new Thread(()->{ log.debug("开始计算..."); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } promise.setSuccess(80); }).start();
log.debug("等待结果..."); log.debug("结果是:{}",promise.get()); } }
|
演示向promise填充错误结果
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| @Slf4j public class TestNettyPromise { public static void main(String[] args) throws ExecutionException, InterruptedException { EventLoop eventLoop = new NioEventLoopGroup().next();
DefaultPromise<Integer> promise = new DefaultPromise<>(eventLoop);
new Thread(()->{ log.debug("开始计算..."); try { int i = 1 / 0; Thread.sleep(1000); promise.setSuccess(80); } catch (InterruptedException e) { promise.setFailure(e); e.printStackTrace(); } }).start();
log.debug("等待结果..."); log.debug("结果是:{}",promise.get()); } }
|
Handler与Pipeline
Pipeline代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78
| @Slf4j public class TestPipeline { public static void main(String[] args) { new ServerBootstrap() .group(new NioEventLoopGroup()) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast("handler-1",new ChannelInboundHandlerAdapter(){ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { log.debug("handler-1"); super.channelRead(ctx, msg); } }); pipeline.addLast("handler-2",new ChannelInboundHandlerAdapter(){ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { log.debug("handler-2"); super.channelRead(ctx, msg); } }); pipeline.addLast("handler-3",new ChannelInboundHandlerAdapter(){ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { log.debug("handler-3");
channel.writeAndFlush(ctx.alloc().buffer().writeBytes("server....".getBytes())); } }); pipeline.addLast("handler-4",new ChannelOutboundHandlerAdapter() { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { log.debug("handler-4"); super.write(ctx, msg, promise); } }); pipeline.addLast("handler-5",new ChannelOutboundHandlerAdapter() { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { log.debug("handler-5"); super.write(ctx, msg, promise); channel.writeAndFlush(ctx.alloc().buffer().writeBytes("server....".getBytes())); } }); pipeline.addLast("handler-6",new ChannelOutboundHandlerAdapter() { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { log.debug("handler-6"); super.write(ctx, msg, promise); channel.writeAndFlush(ctx.alloc().buffer().writeBytes("server....".getBytes())); } }); } }) .bind(8080); } }
|
运行结果
对channelRead传递的信息进行加工
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85
| @Slf4j public class TestPipeline { public static void main(String[] args) { new ServerBootstrap() .group(new NioEventLoopGroup()) .channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer<NioSocketChannel>() { @Override protected void initChannel(NioSocketChannel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast("handler-1",new ChannelInboundHandlerAdapter(){ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { log.debug("handler-1"); ByteBuf buf = (ByteBuf) msg; String name = buf.toString(Charset.defaultCharset()); super.channelRead(ctx, name); } });
pipeline.addLast("handler-2",new ChannelInboundHandlerAdapter(){ @Override public void channelRead(ChannelHandlerContext ctx, Object name) throws Exception { log.debug("handler-2"); Student student = new Student(name.toString()); super.channelRead(ctx, student); } });
pipeline.addLast("handler-3",new ChannelInboundHandlerAdapter(){ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { log.debug("handler-3,结果:{},class:{}",msg,msg.getClass()); channel.writeAndFlush(ctx.alloc().buffer().writeBytes("server....".getBytes())); } });
pipeline.addLast("handler-4",new ChannelOutboundHandlerAdapter() { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { log.debug("handler-4"); super.write(ctx, msg, promise); } });
pipeline.addLast("handler-5",new ChannelOutboundHandlerAdapter() { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { log.debug("handler-5"); super.write(ctx, msg, promise); channel.writeAndFlush(ctx.alloc().buffer().writeBytes("server....".getBytes())); } });
pipeline.addLast("handler-6",new ChannelOutboundHandlerAdapter() { @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { log.debug("handler-6"); super.write(ctx, msg, promise); channel.writeAndFlush(ctx.alloc().buffer().writeBytes("server....".getBytes())); } }); } }) .bind(8080); } @Data @AllArgsConstructor static class Student { private String name; } }
|
总结
1、通过channel.pipeline().addLast(name, handler)
添加handler时,记得给handler取名字。这样可以调用pipeline的addAfter、addBefore等方法更灵活地向pipeline中添加handler
2、pipeline是结构是一个带有head与tail指针的双向链表,其中的节点为handler
要通过ctx.fireChannelRead(msg)等方法,将当前handler的处理结果传递给下一个handler
当有入站(Inbound)操作时,会从head开始向后调用handler,直到handler不是处理Inbound操作为止
当有出站(Outbound)操作时,会从tail开始向前调用handler,直到handler不是处理Outbound操作为止
如果不调用ctx.fireChannelRead(msg)
方法,就无法将当前handler的处理结果传递给下一个handler,从而无法进行下一个InboundHandler的调用。也可以使用ChannelRead内部的ctx.fireChannelRead(msg)
来进行结果的传递
socketChannel.writeAndFlush()
当handler中调用该方法进行写操作时,会触发Outbound操作,此时是从tail向前寻找OutboundHandler
ctx.writeAndFlush()
当handler中调用该方法进行写操作时,会触发Outbound操作,此时是从当前handler向前寻找OutboundHandler
EmbeddedChannel进行测试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| @Slf4j public class TestEmbeddedChannel { public static void main(String[] args) { ChannelInboundHandlerAdapter h1 = new ChannelInboundHandlerAdapter(){ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { log.debug("h1"); super.channelRead(ctx, msg); } }; ChannelInboundHandlerAdapter h2 = new ChannelInboundHandlerAdapter(){ @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { log.debug("h2"); super.channelRead(ctx, msg); } }; ChannelOutboundHandlerAdapter h3 = new ChannelOutboundHandlerAdapter(){ @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { log.debug("h3"); super.write(ctx, msg, promise); } }; ChannelOutboundHandlerAdapter h4 = new ChannelOutboundHandlerAdapter(){ @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { log.debug("h4"); super.write(ctx, msg, promise); } }; EmbeddedChannel channel = new EmbeddedChannel(h1, h2, h3, h4); channel.writeInbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("hello".getBytes()));
channel.writeOutbound(ByteBufAllocator.DEFAULT.buffer().writeBytes("world".getBytes())); } }
|
ByteBuf
创建
1 2 3 4 5 6 7 8 9 10 11 12
| public static void main(String[] args) { ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(); System.out.println(buf.toString()); StringBuilder stringBuilder = new StringBuilder(); for (int i = 0; i < 300; i++) { stringBuilder.append("a"); } buf.writeBytes(stringBuilder.toString().getBytes()); System.out.println(buf); }
|
ByteBuf通过ByteBufAllocator
选择allocator并调用对应的buffer()方法来创建的,默认使用直接内存作为ByteBuf,容量为256个字节,可以指定初始容量的大小
当ByteBuf的容量无法容纳所有数据时,ByteBuf会进行扩容操作,如果在handler中创建ByteBuf,建议使用ChannelHandlerContext ctx.alloc().buffer()
来创建
直接内存与堆内存
ByteBuf支持两种内存分配,一种是直接内存,一种是堆内存,Netty默认使用的是直接内存
堆内存的分配效率比较高,但是读写效率比较低
直接内存分配效率比较低,读写效率比较高
1、直接内存使用的是系统内存,在磁盘中读取文件时,可以将文件直接读入系统内存,系统内存可以用直接内存的方式映射到JVM中,他们访问的都是同一块内存,可以减少一次内存的复制
2、堆内存收到垃圾回收的影响,垃圾回收会涉及到对象的复制和移动,会影响效率
池化与非池化
对于一些创建比较耗时的资源,可以使用池的思想进行优化,池化的最大意义在于可以重用ByteBuf
比如数据库连接创建很耗时,所以我们就可以用预先将连接对象创建好,在使用的过程中,就可以直接拿到连接,在使用完毕后,再将连接归还
池化优点:
1、如果没有池化,那么每次都得创建新的 ByteBuf 实例,这个操作堆直接内存代价很高,就算是堆内存,也会增加GC的压力
2、池化可以重用池中的ByteBuf实例,并且采用了与 jemalloc 类似的内存分配算法提升分配效率
3、高并发时,池化功能更节约内存,减少内存溢出的可能
池化功能默认是开启的,也可以通过系统环境变量来进行设置
1
| -Dio.netty.allocator.type={unpooled|pooled}
|
组成
- 在构造ByteBuf时,可传入两个参数,分别代表初始容量和最大容量,若未传入第二个参数(最大容量),最大容量默认为Integer.MAX_VALUE
- 当ByteBuf容量无法容纳所有数据时,会进行扩容操作,若超出最大容量,会抛出
java.lang.IndexOutOfBoundsException
异常
注意:最开始读写指针都在 0 位置
读写操作不同于ByteBuffer只用position进行控制,ByteBuf分别由读指针和写指针两个指针控制
进行读写操作时,无需进行模式的切换
读指针前的部分被称为废弃部分,是已经读过的内容
读指针与写指针之间的空间称为可读部分
写指针与当前容量之间的空间称为可写部分
写入
方法 |
含义 |
备注 |
writeBoolean(boolean value) |
写入 boolean 值 |
用一个字节 01|00 代表 true|false |
writeByte(int value) |
写入 byte 值 |
|
writeShort(int value) |
写入 short 值 |
|
writeInt(int value) |
写入 int 值 |
Big Endian(大端写入)如 0x250,写入后 00 00 02 50 |
writeIntLE(int value) |
写入 int 值 |
Little Endian(小端写入),即 0x250,写入后 50 02 00 00 |
writeLong(long value) |
写入 long 值 |
|
writeChar(int value) |
写入 char 值 |
|
writeFloat(float value) |
写入 float 值 |
|
writeDouble(double value) |
写入 double 值 |
|
writeBytes(ByteBuf src) |
写入 netty 的 ByteBuf |
|
writeBytes(byte[] src) |
写入 byte[] |
|
writeBytes(ByteBuffer src) |
写入 nio 的 ByteBuffer |
|
int writeCharSequence(CharSequence sequence, Charset charset) |
写入字符串 |
CharSequence为字符串类的父类,第二个参数为对应的字符集 |
注意
- 这些方法的未指明返回值的,其返回值都是 ByteBuf,意味着可以链式调用来写入不同的数据
- 网络传输中,默认习惯是 Big Endian,使用 writeInt(int value)
- 还有一类方法是 set 开头的一系列方法,也可以写入数据,但不会改变写指针位置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| public class ByteBufStudy { public static void main(String[] args) { ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16, 20); ByteBufUtil.log(buffer);
buffer.writeBytes(new byte[]{1, 2, 3, 4}); ByteBufUtil.log(buffer);
buffer.writeInt(5); ByteBufUtil.log(buffer);
buffer.writeIntLE(6); ByteBufUtil.log(buffer);
buffer.writeLong(7); ByteBufUtil.log(buffer); } }
|
运行结果
扩容
当ByteBuf中的容量无法容纳写入的数据时(初始容量是10),会进行扩容操作
1 2
| buffer.writeInt(6); log(buffer);
|
扩容规则
如果写入后数据大小未超过 512 字节,则选择下一个 16 的整数倍进行扩容
- 例如写入后大小为 12 字节,则扩容后 capacity 是 16 字节
如果写入后数据大小超过 512 字节,则选择下一个 2^n
- 例如写入后大小为 513 字节,则扩容后 capacity 是 2^10=1024 字节(2^9=512 已经不够了)
扩容不能超过 maxCapacity,否则会抛出java.lang.IndexOutOfBoundsException
异常
读取
读取主要是通过一系列read方法进行读取,读取时会根据读取数据的字节数移动读指针
注意:读过的内容,属于废弃部分,再次读取只能读取那些尚未读取的部分
如果需要重复读取,需要调用buffer.markReaderIndex()
对读指针进行标记,并通过buffer.resetReaderIndex()
将读指针恢复到mark标记的位置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| public class ByteBufStudy { public static void main(String[] args) { ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(16, 20);
buffer.writeBytes(new byte[]{1, 2, 3, 4}); buffer.writeInt(5);
System.out.println(buffer.readByte()); System.out.println(buffer.readByte()); System.out.println(buffer.readByte()); System.out.println(buffer.readByte()); ByteBufUtil.log(buffer);
buffer.markReaderIndex(); System.out.println(buffer.readInt()); ByteBufUtil.log(buffer);
buffer.resetReaderIndex(); ByteBufUtil.log(buffer); } }
|
释放
由于 Netty 中有堆外内存的 ByteBuf 实现,堆外内存最好是通过手动来进行释放,而不是等待垃圾回收
- UnpooledHeapByteBuf 使用的是 JVM 内存,只需等 GC 回收内存即可
- UnpooledDirectByteBuf 使用的就是直接内存了,需要特殊的方法来回收内存
- PooledByteBuf 和它的子类使用了池化机制,需要更复杂的规则来回收内存
Netty采用了引用计数法来控制回收内存,每个 ByteBuf 都实现了 ReferenceCounted 接口
- 每个 ByteBuf 对象的初始计数为1
- 调用 release 方法计数减1,如果计数为0,ByteBuf 内存被回收
- 调用 retain 方法计数加1,表示调用者没用完之前,其他 handler 即使调用了 release 也不会造成回收
- 当计数为 0 时,底层内存会被回收,这时即使 ByteBuf 对象还存在,其各个方法均无法正常使用
因为 pipeline 的存在,一般需要将 ByteBuf 传递给下一个 ChannelHandler,如果传递到最后,则总是由tail和head进行释放。在这个过程中,如果 ByteBuf 进行了转换,tail或head拿到的就不是原本的 ByteBuf 了,也就无法进行释放。所以一般谁最后使用了,谁负责 release
1、起点,对于 NIO 实现来讲,在 io.netty.channel.nio.AbstractNioByteChannel.NioByteUnsafe.read
方法中首次创建 ByteBuf 放入 pipeline(line 163 pipeline.fireChannelRead(byteBuf))
2、入站 ByteBuf 处理原则
- 对原始 ByteBuf 不做处理,调用 ctx.fireChannelRead(msg) 向后传递,这时无须 release
- 将原始 ByteBuf 转换为其它类型的 Java 对象,这时 ByteBuf 就没用了,必须 release
- 如果不调用 ctx.fireChannelRead(msg) 向后传递,那么也必须 release
- 注意各种异常,如果 ByteBuf 没有成功传递到下一个 ChannelHandler,必须 release
- 假设消息一直向后传,那么 TailContext 会负责释放未处理消息(原始的 ByteBuf)
3、出站 ByteBuf 处理原则
- 出站消息最终都会转为 ByteBuf 输出,一直向前传,由 HeadContext flush 后 release
4、异常处理原则:不清楚 ByteBuf 被引用了多少次,但又必须彻底释放,可以循环调用 release 直到返回 true
1
| while (!buffer.release()) {}
|
当ByteBuf被传到了pipeline的head与tail时,ByteBuf会被其中的方法彻底释放,但前提是ByteBuf被传递到了head与tail中
TailConext中释放ByteBuf的源码
1 2 3 4 5 6 7 8
| protected void onUnhandledInboundMessage(Object msg) { try { logger.debug("Discarded inbound message {} that reached at the tail of the pipeline. Please check your pipeline configuration.", msg); } finally { ReferenceCountUtil.release(msg); } }
|
判断传过来的是否为ByteBuf,是的话才需要释放
1 2 3
| public static boolean release(Object msg) { return msg instanceof ReferenceCounted ? ((ReferenceCounted)msg).release() : false; }
|
零拷贝
slice
是零拷贝的体现之一,对原始的 ByteBuf 进行切片成多个 ByteBuf(逻辑上并非物理上),切片后的 ByteBuf 并没有发生内存复制,还是使用原始 ByteBuf 的内存,切片后的 ByteBuf 维护独立的 read,write指针
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| public static void main(String[] args) { ByteBuf buf = ByteBufAllocator.DEFAULT.buffer(10); buf.writeBytes(new byte[]{'a','b','c','d','e','f','g','h','i','j'}); log(buf);
ByteBuf f1 = buf.slice(0, 5);log(f1); ByteBuf f2 = buf.slice(5, 5);log(f2);
}
|
当原有的ByteBuf调用release释放的时候,为了避免切片出来的f1和f2受到影响,一般在slice后,调用retain方法
duplicate
也是零拷贝的体现之一,截取了原始ByteBuf的所有内容,并且没有 max capacity 的限制,也是和原来的ByteBuf 使用同一块底层内存,只是读写指针是独立的
copy
会将底层内存数据进行深拷贝,因此无论读写,都与原始 ByteBuf 无关
CompositeByteBuf
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| public static void main(String[] args) { ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer(); buf1.writeBytes(new byte[]{1,2,3,4,5});
ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer(); buf2.writeBytes(new byte[]{6,7,8,9,10});
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();
CompositeByteBuf compositeByteBuf = ByteBufAllocator.DEFAULT.compositeBuffer(); compositeByteBuf.addComponents(true,buf1,buf2); log(compositeByteBuf); }
|
Unpooled工具类
提供了非池化的 ByteBuf 创建,组合,复制等操作
例子:wrappedBuffer方法,包装 ByteBuf
1 2 3 4 5 6 7 8
| ByteBuf buf1 = ByteBufAllocator.DEFAULT.buffer(5); buf1.writeBytes(new byte[]{1, 2, 3, 4, 5}); ByteBuf buf2 = ByteBufAllocator.DEFAULT.buffer(5); buf2.writeBytes(new byte[]{6, 7, 8, 9, 10});
ByteBuf buf3 = Unpooled.wrappedBuffer(buf1, buf2); System.out.println(ByteBufUtil.prettyHexDump(buf3));
|
也可以用来包装普通字节数组,底层也不会有拷贝操作
1 2 3
| ByteBuf buf4 = Unpooled.wrappedBuffer(new byte[]{1, 2, 3}, new byte[]{4, 5, 6}); System.out.println(buf4.getClass()); System.out.println(ByteBufUtil.prettyHexDump(buf4));
|
总结优势
粘包现象
服务端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| public class TestServer { static final Logger log = LoggerFactory.getLogger(TestServer.class); void start() { NioEventLoopGroup boss = new NioEventLoopGroup(1); NioEventLoopGroup worker = new NioEventLoopGroup(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.channel(NioServerSocketChannel.class); serverBootstrap.group(boss, worker); serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG)); ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { log.debug("connected {}", ctx.channel()); super.channelActive(ctx); }
@Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { log.debug("disconnect {}", ctx.channel()); super.channelInactive(ctx); } }); } }); ChannelFuture channelFuture = serverBootstrap.bind(8080); log.debug("{} binding...", channelFuture.channel()); channelFuture.sync(); log.debug("{} bound...", channelFuture.channel()); channelFuture.channel().closeFuture().sync(); } catch (InterruptedException e) { log.error("server error", e); } finally { boss.shutdownGracefully(); worker.shutdownGracefully(); log.debug("stoped"); } }
public static void main(String[] args) { new TestServer().start(); } }
|
客户端
客户端代码希望发送 10 个消息,每个消息是 16 字节
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| public class TestClient { static final Logger log = LoggerFactory.getLogger(TestClient.class); public static void main(String[] args) { NioEventLoopGroup worker = new NioEventLoopGroup(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.channel(NioSocketChannel.class); bootstrap.group(worker); bootstrap.handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ChannelInboundHandlerAdapter() { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { for (int i = 0; i < 10; i++) { ByteBuf buffer = ctx.alloc().buffer(); buffer.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}); ctx.writeAndFlush(buffer); } } }); } }); ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 8080).sync(); channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) { log.error("client error", e); } finally { worker.shutdownGracefully(); } } }
|
服务器端一次就接收了 160 个字节,而非分 10 次接收
半包现象
客户端
客户端代码希望发送 1 个消息,这个消息是 160 字节,代码改为
1 2 3 4 5
| ByteBuf buffer = ctx.alloc().buffer(); for (int i = 0; i < 10; i++) { buffer.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}); } ctx.writeAndFlush(buffer);
|
服务端
服务端修改一下接收缓冲区,其它代码不变
注意:serverBootstrap.option(ChannelOption.SO_RCVBUF, 10)
影响的底层接收缓冲区(即滑动窗口)大小,仅决定了 netty 读取的最小单位,netty 实际每次读取的一般是它的整数倍
1
| serverBootstrap.option(ChannelOption.SO_RCVBUF, 10);
|
服务器端输出可以看到,接收的消息被分为两节,第一次 20 字节,第二次 140 字节
现象分析
滑动窗口
TCP 以一个段(segment)为单位,每发送一个段就需要进行一次确认应答(ack)处理,但如果这么做,缺点是包的往返时间越长,性能就越差
为了解决此问题,引入了窗口概念,窗口大小即决定了无需等待应答而可以继续发送的数据最大值
- 窗口实际就起到一个缓冲区的作用,同时也能起到流量控制的作用
- 窗口内的数据才允许被发送,当应答未到达前,窗口必须停止滑动
- 如果数据 ack 返回了,窗口就可以向前滑动
- 接收方也会维护一个窗口,只有落在窗口内的数据才能允许接收
粘包
现象:发送 abc def,接受为 abcdef
原因
- 应用层:接受方 ByteBuf 设置太大(Netty默认为1024)
- 滑动窗口:假设发送方 256 bytes 表示一个完整报文,但由于接受方处理不及时且窗口大小足够大,这部分256 bytes 字节就会缓冲在接受方的滑动窗口中,当滑动窗口中缓冲了多个报文就会造成粘包
- Nagle算法:TCP-IP详解:Nagle算法
半包
现象:
原因
- 应用层:接收方 ByteBuf 小于实际发送数据量
- 传输层-网络层:滑动窗口的问题,假设接收方的窗口只剩了 128 bytes,发送方的报文大小是 256 bytes,这时接收方窗口中无法容纳发送方的全部报文,发送方只能先发送前 128 bytes,等待 ack 后才能发送剩余部分,这就造成了半包
- 数据链路层:MSS 限制:当发送的数据超过 MSS 限制后,会将数据切分发送,就会造成半包
粘包、拆包发生原因滑动窗口、MSS/MTU限制、Nagle算法(博客园)
解决粘包半包现象
短链接
可以解决粘包的现象,但是不能解决半包的现象
客户端每次向服务器发送数据以后,就与服务器断开连接,此时的消息边界为连接建立到连接断开。这时便无需使用滑动窗口等技术来缓冲数据,则不会发生粘包现象。但如果一次性数据发送过多,接收方无法一次性容纳所有数据,还是会发生半包现象,所以短链接无法解决半包现象
客户端代码改进
修改channelActive方法
1 2 3 4 5 6 7 8
| public void channelActive(ChannelHandlerContext ctx) throws Exception { log.debug("sending..."); ByteBuf buffer = ctx.alloc().buffer(16); buffer.writeBytes(new byte[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}); ctx.writeAndFlush(buffer); ctx.channel().close(); }
|
将发送步骤整体封装为send()方法,调用10次send()方法,模拟发送10次数据
1 2 3 4 5 6
| public static void main(String[] args) { for (int i = 0; i < 10; i++) { send(); } }
|
定长解码器
客户端于服务器约定一个最大长度,保证客户端每次发送的数据长度都不会大于该长度。若发送数据长度不足则需要补齐至该长度
服务器接收数据时,将接收到的数据按照约定的最大长度进行拆分,即使发送过程中产生了粘包,也可以通过定长解码器将数据正确地进行拆分。服务端需要用到FixedLengthFrameDecoder
对数据进行定长解码,具体使用方法如下
1
| ch.pipeline().addLast(new FixedLengthFrameDecoder(16));
|
待补充……
行解码器
待补充……
长度字段解码器
待补充……
协议设计
待补充……
聊天室案例
业务概述
用户登录接口
1 2 3 4 5 6 7 8 9
| public interface UserService {
boolean login(String username, String password); }
|
用户会话接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| public interface Session {
void bind(Channel channel, String username);
void unbind(Channel channel);
Object getAttribute(Channel channel, String name);
void setAttribute(Channel channel, String name, Object value);
Channel getChannel(String username); }Copy
|
群聊会话接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
| public interface GroupSession {
Group createGroup(String name, Set<String> members);
Group joinMember(String name, String member);
Group removeMember(String name, String member);
Group removeGroup(String name);
Set<String> getMembers(String name);
List<Channel> getMembersChannel(String name);
boolean isCreated(String name); }
|
登录
客户端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
| @Slf4j public class ChatClient { public static void main(String[] args) { NioEventLoopGroup group = new NioEventLoopGroup(); LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG); MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable(); try { Bootstrap bootstrap = new Bootstrap(); bootstrap.channel(NioSocketChannel.class); bootstrap.group(group); bootstrap.handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new ProcotolFrameDecoder()); ch.pipeline().addLast(LOGGING_HANDLER); ch.pipeline().addLast(MESSAGE_CODEC); ch.pipeline().addLast("client handler",new ChannelInboundHandlerAdapter(){
@Override public void channelActive(ChannelHandlerContext ctx) throws Exception { new Thread(()->{ Scanner scanner = new Scanner(System.in); System.out.println("请输入用户名:"); String username = scanner.nextLine(); System.out.println("请输入密码:"); String password = scanner.nextLine(); LoginRequestMessage message = new LoginRequestMessage(username, password); ctx.writeAndFlush(message);
System.out.println("等待后续操作"); try { System.in.read(); } catch (IOException e) { e.printStackTrace(); } },"system in").start(); super.channelActive(ctx); } }); } }); Channel channel = bootstrap.connect("localhost", 8080).sync().channel(); channel.closeFuture().sync(); } catch (Exception e) { log.error("client error", e); } finally { group.shutdownGracefully(); } } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48
| @Slf4j public class ChatServer { public static void main(String[] args) { NioEventLoopGroup boss = new NioEventLoopGroup(); NioEventLoopGroup worker = new NioEventLoopGroup(); LoggingHandler LOGGING_HANDLER = new LoggingHandler(LogLevel.DEBUG); MessageCodecSharable MESSAGE_CODEC = new MessageCodecSharable(); try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.channel(NioServerSocketChannel.class); serverBootstrap.group(boss, worker); serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new ProcotolFrameDecoder()); ch.pipeline().addLast(LOGGING_HANDLER); ch.pipeline().addLast(MESSAGE_CODEC); ch.pipeline().addLast(new SimpleChannelInboundHandler<LoginRequestMessage>() { @Override protected void channelRead0(ChannelHandlerContext ctx, LoginRequestMessage msg) throws Exception { String username = msg.getUsername(); String password = msg.getPassword(); boolean login = UserServiceFactory.getUserService().login(username, password); LoginResponseMessage responseMessage; if (login){ responseMessage = new LoginResponseMessage(true, "登录成功"); }else { responseMessage = new LoginResponseMessage(false, "登录失败"); } ctx.writeAndFlush(responseMessage); } }); } }); Channel channel = serverBootstrap.bind(8080).sync().channel(); channel.closeFuture().sync(); } catch (InterruptedException e) { log.error("server error", e); } finally { boss.shutdownGracefully(); worker.shutdownGracefully(); } } }
|