博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
netty源码分析之-处理器详解(9)
阅读量:4290 次
发布时间:2019-05-27

本文共 5266 字,大约阅读时间需要 17 分钟。

Netty处理器重要的概念:

  • Netty的处理器可以分为两类:入站处理器与出站处理器
  • 入站处理器的顶层是ChannelInboundHandler,出站处理器的顶层是ChannelOutboundHandler
  • 数据处理时常用的各种编码器本质上都是处理器
  • 编解码器:无论我们向网络写入的数据是什么类型(int、char、String、二进制等),数据在网络中传递时,其都是以字节流的形式呈现的;将数据由原本的形式转换为字节流的操作称为编码(encode),将数据由字节转换为它原本的格式或是其他格式的操作称为解码(decode),编解码统一成为codec
  • 编码:本质上是一种出站处理器,因此,编码一定是一种ChannelOutboundHandler
  • 解码:本质上是一种入站处理器,因此,解码一定是一种ChannelInboundHandler
  • 在Netty中,编码器通常以XXXEncoder命名,解码器通常以XXXDdcoder命名

MessageToByteEncoder

对于编码器的顶层抽象类MessageToByteEncoder,像流式处理一样将一个消息转换成一个ByteBuf,encode是需要实现的重要方法:

public abstract class MessageToByteEncoder extends ChannelOutboundHandlerAdapter{
}
public class IntegerEncoder extends MessageToByteEncoder
{
@Override public void encode(ChannelHandlerContext ctx, Integer msg, ByteBuf out) throws Exception { out.writeInt(msg); } }

ByteToMessageDecoder

对于解码器的顶层抽象类ByteToMessageDecoder,像流式处理一样将一个ByteBuf转换成需要的其他对象类型,decode是需要实现的重要方法:

public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter{
}
public class SquareDecoder extends ByteToMessageDecoder {
@Override public void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception { out.add(in.readBytes(in.readableBytes())); } }

在这里由于tcp传输数据的粘包与拆包,接受到的数据不一定是完整的,因此我们需要判断是否能够处理进而转换成我们需要的类型,需要类似以下的判断,实际上后续有ReplayingDecoder能够更加方便的来处理。

//整形举例if (in.readableBytes() >= 8) {      out.add(in.readLong());}

MessageToMessageDecoder

将消息从一个类型转换成另一个类型,我们可以在ChannelInitailizer中添加多个编码器或者解码器来同时处理网络中接受到到消息(或者写到网络),类似的编解码器都是类似的功能

public class StringToIntegerDecoder extends               MessageToMessageDecoder
{
@Override public void decode(ChannelHandlerContext ctx, String message, List
out) throws Exception { out.add(message.length()); } }

ReplayingDecoder

ReplayingDecoder是ByteToMessageDecoder的一个特殊变种,能够在阻塞I/O的范式中实现非阻塞解码。与ByteToMessageDecoder最大的不同是,ReplayingDecoder允许你直接实现decode或者decodeLast就好像所需要的字节数据已经接受到,而不用去监测是否有足够的字节数据能够用来解码

例如,对于ByteToMessageDecoder解码可能会有如下操作:

public class IntegerHeaderFrameDecoder extends ByteToMessageDecoder {
@Override protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List out) throws Exception { if (buf.readableBytes() < 4) { return; } buf.markReaderIndex(); int length = buf.readInt(); //header if (buf.readableBytes() < length) { //body buf.resetReaderIndex(); return; } out.add(buf.readBytes(length)); } }

如果第一次读取的header,能够在接下来的读取中全部接受则进行解码处理,否则返回到标记处,也就是无法解码,等待接受更多的数据

而对于ReplayingDecoder的解码,可能会更简洁:

public class IntegerHeaderFrameDecoder        extends ReplayingDecoder
{
protected void decode(ChannelHandlerContext ctx, ByteBuf buf) throws Exception { out.add(buf.readBytes(buf.readInt())); } }

ReplayingDecoder对ByteBuf进行了特殊的实现,当没有足够的数据来解码成特定类型的时候会抛错,在上述方法IntegerHeaderFrameDecoder实现中,当接受到4个或者更多字节的时候能够正常返回整型header,否则将抛错,如果ReplayingDecoder捕获到了异常,将会重新设置readerIndex到初始位置,然后重复调用decode来接受更多的字节数据。

ReplayingDecoder如此简洁也会有一些代价,如:

  • 一些ByteBuf的操作可能被禁止
  • 如果网络很差或者消息格式很复杂,那么会导致性能比较差。因为当接受到的字节数据不够会重复的调用多次decode来获取更多的消息;而不会像ByteToMessageDecoder一样直接返回
  • 可能需要注意decode方法会背调用多次为了解码一条消息
public class MyDecoder extends ReplayingDecoder
{
private final Queue
values = new LinkedList
(); @Override public void decode(.., ByteBuf buf, List
out) throws Exception { // A message contains 2 integers. values.offer(buf.readInt()); values.offer(buf.readInt()); // This assertion will fail intermittently since values.offer() // can be called more than two times! assert values.size() == 2; out.add(values.poll() + values.poll()); } }

当调用第二次values.offer(buf.readInt());的时候可能没有足够多的字节数据,则会重复调用decode,但是此时queue中可能不止2条数据。

正确的做法应该是以下方式:

public class MyDecoder extends ReplayingDecoder
{
private final Queue
values = new LinkedList
(); @Override public void decode(.., ByteBuf buf, List
out) throws Exception { // Revert the state of the variable that might have been changed // since the last partial decode. values.clear(); // A message contains 2 integers. values.offer(buf.readInt()); values.offer(buf.readInt()); // Now we know this assertion will never fail. assert values.size() == 2; out.add(values.poll() + values.poll()); } }

当数据不够需要重新调用decode的时候,做一次清除queue的操作来保证队列正确的数据

但是我们可以通过调用checkpoint()方法来更新buffer的初始位置,让readerIndex能够重新回到该起始位置。从而避免多次重复的调用decode来对同一个消息多次解码

下面的这些类处理更加复杂的用例:

  • io.netty.handler.codec.LineBasedFrameDecoder— 这个类在 Netty 内部也有使
    用,它使用了行尾控制字符(\n 或者\r\n)来解析消息数据;
  • io.netty.handler.codec.http.HttpObjectDecoder— 一个 HTTP 数据的解码器。

自定义协议编解码来解决粘包与拆包:


关于Netty编解码器的重要结论:

  • 无论是编码器还是解码器,其所接受的消息类型必须要与待处理的参数类型一直,否则该编码器或解码器并不会被执行
  • 在解码器进行数据解码时,一定要记得判断缓冲(ByteBuf)中的数据是否足够,否则将会产生一些问题

同样对于编码器和解码器来说,引用计数也需要特别注意,一旦消息被编码或者解码,它就会被ReferenceCountUtil.release(message)调用自动释放,如果需要保留引用以便稍后使用,那么可以调用referenceCountUtil.retain(message)方法来增加该引用计数,从而防止该消息被释放

转载地址:http://ehrgi.baihongyu.com/

你可能感兴趣的文章
SpringBoot2.0 基础案例(12):基于转账案例,演示事务管理操作
查看>>
高性能负载均衡:nginx搭建tomcat集群
查看>>
Spring切面中的正则表达式
查看>>
一直再说高并发,多少QPS才算高并发?
查看>>
Git恢复之前版本的两种方法reset、revert(图文详解)
查看>>
Maven打包的三种方式
查看>>
电商场景:并发扣库存,怎么保证不超卖又不影响并发性能
查看>>
分布式事务处理方式总结
查看>>
延迟队列有哪些实现方案?说说你的看法
查看>>
厉害了!我们老大半小时把我的springboot项目并发提升几倍
查看>>
Spring 中Bean 的生命周期
查看>>
为什么要用枚举实现单例模式(避免反射、序列化问题)
查看>>
微服务架构下的分布式限流方案思考
查看>>
全网最详细的一篇SpringCloud总结
查看>>
消息中间件中的有序消息,其实是排队但是不能插队
查看>>
mysql为什么使用B+树作为索引的结构
查看>>
mysql索引总结(1)-mysql 索引类型以及创建(文章写的不错!!!)
查看>>
聊聊CAS - 面试官最喜欢问的并发编程专题
查看>>
Spring Boot 中使用一个注解轻松将 List 转换为 Excel 下载
查看>>
高并发环境下,先操作数据库还是先操作缓存?
查看>>