作者:唐亚峰 | 出自:唐亚峰博客
TCP
以流的形式传输,在上一章,我们讲了粘包和拆包,以及LineBaseFrameDecoder
使用和源码探讨,接下来讲讲Netty
为我们实现的其它解码器…..
TCP
以流的方式进行数据传输,上层的应用为了对消息进行区分,往往采用如下方式
- 固定消息长度,累计读取到长度和定长LEN的报文后,就认为读取到了个完整的消息,然后将计数器位置重置在读取下一个报文内容
- 将回车换行符作为消息结束符\r\n,列如FTP协议,这种方式在文本中应用比较广泛
- 将特殊分隔符作为消息结束符标志位,回车换行符就是一个特殊结束分隔符(DelimiterBasedFrameDecoder)
- 通过在消息头定义一个长度字段来标示消息的总长度(FixedLengthFrameDecoder)
Netty
对以上4种做个统一抽象封装,提供了四种不同解码器来解决对应问题,使用起来也非常的方便,了解了它们,我们就不需要自己对读取的报文人工解码,也不需要考虑TCP
粘包和拆包的问题了…
Delimiter自定义分隔符
我将公共的部分做了一层抽离,定义成常量方便调用
public interface EchoConstant {
String SEPARATOR = "$_";//特殊分割符号,DelimiterBasedFrameDecoder使用
Integer ECHO_DELIMITER_PORT = 4040;
Integer ECHO_LENGTH_PORT = 5050;
String HOST = "127.0.0.1";
Integer FRAME_LENGTH = 10;//固定消息长度,FixedLengthFrameDecoder使用
}
定义
EchoDelimiterServer
,毫无疑问大部分代码和以前类似,区别是多了一个日志输出以及DelimiterBasedFrameDecoder
的使用
划重点:在做开发调试的时候,我们可以使用Netty
为我们提供的LoggingHandler
输出日志
public static void bind(int port) {
EventLoopGroup masterGroup = new NioEventLoopGroup();//线程组,含一组NIO线程,专门用来处理网络事件
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap();//NIO服务端启动辅助类
bootstrap.group(masterGroup, workerGroup).channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
ByteBuf delimiter = Unpooled.copiedBuffer(EchoConstant.SEPARATOR.getBytes());
channel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
channel.pipeline().addLast(new StringDecoder());
channel.pipeline().addLast(new EchoServerHandler());
}
});
//绑定端口,同步等待成功,
System.out.println("绑定端口,同步等待成功......");
ChannelFuture future = bootstrap.bind(port).sync();
//等待服务端监听端口关闭
future.channel().closeFuture().sync();
System.out.println("等待服务端监听端口关闭......");
} catch (Exception e) {
e.printStackTrace();
} finally {
//优雅退出释放线程池
masterGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
System.out.println("优雅退出释放线程池......");
}
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String body = (String) msg;
System.out.println("EchoDelimiterServer 接收到的消息 :" + body + "; 当前统计:" + ++counter);
body = body + EchoConstant.SEPARATOR;//在消息后面加上特殊分隔符
ByteBuf echo = Unpooled.copiedBuffer(body.getBytes());
ctx.writeAndFlush(echo);//消息写出
}
定义
EchoDelimiterClient
public static void connect(String host, int port) {
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
ChannelFuture future = null;
try {
bootstrap.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) throws Exception {
ByteBuf delimiter = Unpooled.copiedBuffer(EchoConstant.SEPARATOR.getBytes());
channel.pipeline().addLast(new DelimiterBasedFrameDecoder(1024, delimiter));
channel.pipeline().addLast(new StringDecoder());
channel.pipeline().addLast(new EchoClientHandler());
}
});
//发起异步请求
future = bootstrap.connect(host, port).sync();
//等待客户端链路关闭
future.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
}
}
创建
EchoClientHandler
继承ChannelHandlerAdapter
,重写读取和写出事件
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
for (int i = 0; i < 10; i++) {
ctx.writeAndFlush(Unpooled.copiedBuffer(ECHO_REQ.getBytes()));
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String body = (String) msg;
System.out.println("EchoDelimiterClient 接收到的消息 :" + body + "; 当前统计:" + ++counter);
}
试验一把
分别启动
EchoDelimiterServer
和EchoDelimiterClient
,输出如下日志
绑定端口,同步等待成功......
九月 04, 2017 10:41:27 下午 io.netty.handler.logging.LoggingHandler channelRegistered
信息: [id: 0x3b1849e8] REGISTERED
九月 04, 2017 10:41:27 下午 io.netty.handler.logging.LoggingHandler bind
信息: [id: 0x3b1849e8] BIND: 0.0.0.0/0.0.0.0:4040
九月 04, 2017 10:41:27 下午 io.netty.handler.logging.LoggingHandler channelActive
信息: [id: 0x3b1849e8, /0:0:0:0:0:0:0:0:4040] ACTIVE
九月 04, 2017 10:41:33 下午 io.netty.handler.logging.LoggingHandler channelRead
信息: [id: 0x3b1849e8, /0:0:0:0:0:0:0:0:4040] RECEIVED: [id: 0xa45511cd, /127.0.0.1:50226 => /127.0.0.1:4040]
EchoDelimiterServer 接收到的消息 :Hi , Levin .Welcome to Netty.; 当前统计:1
EchoDelimiterServer 接收到的消息 :Hi , Levin .Welcome to Netty.; 当前统计:2
EchoDelimiterServer 接收到的消息 :Hi , Levin .Welcome to Netty.; 当前统计:3
EchoDelimiterServer 接收到的消息 :Hi , Levin .Welcome to Netty.; 当前统计:4
EchoDelimiterServer 接收到的消息 :Hi , Levin .Welcome to Netty.; 当前统计:5
EchoDelimiterServer 接收到的消息 :Hi , Levin .Welcome to Netty.; 当前统计:6
EchoDelimiterServer 接收到的消息 :Hi , Levin .Welcome to Netty.; 当前统计:7
EchoDelimiterServer 接收到的消息 :Hi , Levin .Welcome to Netty.; 当前统计:8
EchoDelimiterServer 接收到的消息 :Hi , Levin .Welcome to Netty.; 当前统计:9
EchoDelimiterServer 接收到的消息 :Hi , Levin .Welcome to Netty.; 当前统计:10
------------------------------------------------------------------------------------------------
EchoDelimiterClient 接收到的消息 :Hi , Levin .Welcome to Netty.; 当前统计:1
EchoDelimiterClient 接收到的消息 :Hi , Levin .Welcome to Netty.; 当前统计:2
EchoDelimiterClient 接收到的消息 :Hi , Levin .Welcome to Netty.; 当前统计:3
EchoDelimiterClient 接收到的消息 :Hi , Levin .Welcome to Netty.; 当前统计:4
EchoDelimiterClient 接收到的消息 :Hi , Levin .Welcome to Netty.; 当前统计:5
EchoDelimiterClient 接收到的消息 :Hi , Levin .Welcome to Netty.; 当前统计:6
EchoDelimiterClient 接收到的消息 :Hi , Levin .Welcome to Netty.; 当前统计:7
EchoDelimiterClient 接收到的消息 :Hi , Levin .Welcome to Netty.; 当前统计:8
EchoDelimiterClient 接收到的消息 :Hi , Levin .Welcome to Netty.; 当前统计:9
EchoDelimiterClient 接收到的消息 :Hi , Levin .Welcome to Netty.; 当前统计:10
FixedLength指定消息长度
FixedLengthFrameDecoder
的方式比较极端,就是解析固定长度的报文消息,举个例子:假设我的报文长度为50
,解析长度为30
,那么这个数据包会被拆分成2
次来解析,反之亦然…..
创建EchoLengthServer
,重写初始化通道与读取事件
@Override
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline().addLast(new FixedLengthFrameDecoder(EchoConstant.FRAME_LENGTH));
channel.pipeline().addLast(new StringDecoder());
channel.pipeline().addLast(new EchoLengthServer.EchoServerHandler());
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String body = (String) msg;
System.out.println("EchoLengthServer 接收到的消息 :" + body + "; 当前统计:" + ++counter);
}
创建EchoLengthClient
,重写初始化通道,写出与读取事件
@Override
protected void initChannel(SocketChannel channel) throws Exception {
channel.pipeline().addLast(new FixedLengthFrameDecoder(EchoConstant.FRAME_LENGTH));
channel.pipeline().addLast(new StringDecoder());
channel.pipeline().addLast(new EchoLengthServer.EchoServerHandler());
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.writeAndFlush(Unpooled.copiedBuffer(ECHO_REQ.getBytes()));
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
String body = (String) msg;
System.out.println("EchoLengthClient 接收到的消息 :" + body + "; 当前统计:" + ++counter);
}
试验一把
分别启动
EchoLengthServer
和EchoLengthClient
,输出如下日志
绑定端口,同步等待成功......
EchoLengthServer 接收到的消息 :EchoLength; 当前统计:1
EchoLengthServer 接收到的消息 :Client Hi; 当前统计:2
EchoLengthServer 接收到的消息 : .Welcome ; 当前统计:3
警告: 前面说过,如果切割内容过长或者过断都会出现拆包或者粘包情况,所以这种方式需要根据具体业务需求来…..
总结
由此可以看到DelimiterBasedFrameDecoder
用于对使用分割符结尾的消息进行自动解码,FixedLengthFrameDecoder
用于对固定长度的消息进行自动解码,有了以上两种解码器在结合其它的解码器,如(StringDecoder
),可以轻松完成大部分消息的自动解码,且无需考虑TCP粘包/拆包
导致的半读问题,极大的提升了开发效率…..
– 说点什么
全文代码:https://git.oschina.net/battcn/battcn-netty/tree/master/Chapter5-1/battcn-netty-5-1-1