本文共 5266 字,大约阅读时间需要 17 分钟。
Netty处理器重要的概念:
对于编码器的顶层抽象类MessageToByteEncoder,像流式处理一样将一个消息转换成一个ByteBuf,encode是需要实现的重要方法:
public abstract class MessageToByteEncoder extends ChannelOutboundHandlerAdapter{ }
public class IntegerEncoder extends MessageToByteEncoder{ @Override public void encode(ChannelHandlerContext ctx, Integer msg, ByteBuf out) throws Exception { out.writeInt(msg); } }
对于解码器的顶层抽象类ByteToMessageDecoder,像流式处理一样将一个ByteBuf转换成需要的其他对象类型,decode是需要实现的重要方法:
public abstract class ByteToMessageDecoder extends ChannelInboundHandlerAdapter{ }
public class SquareDecoder extends ByteToMessageDecoder { @Override public void decode(ChannelHandlerContext ctx, ByteBuf in, List
在这里由于tcp传输数据的粘包与拆包,接受到的数据不一定是完整的,因此我们需要判断是否能够处理进而转换成我们需要的类型,需要类似以下的判断,实际上后续有ReplayingDecoder能够更加方便的来处理。
//整形举例if (in.readableBytes() >= 8) { out.add(in.readLong());}
将消息从一个类型转换成另一个类型,我们可以在ChannelInitailizer中添加多个编码器或者解码器来同时处理网络中接受到到消息(或者写到网络),类似的编解码器都是类似的功能
public class StringToIntegerDecoder extends MessageToMessageDecoder{ @Override public void decode(ChannelHandlerContext ctx, String message, List
ReplayingDecoder是ByteToMessageDecoder的一个特殊变种,能够在阻塞I/O的范式中实现非阻塞解码。与ByteToMessageDecoder最大的不同是,ReplayingDecoder允许你直接实现decode或者decodeLast就好像所需要的字节数据已经接受到,而不用去监测是否有足够的字节数据能够用来解码
例如,对于ByteToMessageDecoder解码可能会有如下操作:public class IntegerHeaderFrameDecoder extends ByteToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List
如果第一次读取的header,能够在接下来的读取中全部接受则进行解码处理,否则返回到标记处,也就是无法解码,等待接受更多的数据
而对于ReplayingDecoder的解码,可能会更简洁:
public class IntegerHeaderFrameDecoder extends ReplayingDecoder{ protected void decode(ChannelHandlerContext ctx, ByteBuf buf) throws Exception { out.add(buf.readBytes(buf.readInt())); } }
ReplayingDecoder对ByteBuf进行了特殊的实现,当没有足够的数据来解码成特定类型的时候会抛错,在上述方法IntegerHeaderFrameDecoder实现中,当接受到4个或者更多字节的时候能够正常返回整型header,否则将抛错,如果ReplayingDecoder捕获到了异常,将会重新设置readerIndex到初始位置,然后重复调用decode来接受更多的字节数据。
ReplayingDecoder如此简洁也会有一些代价,如:public class MyDecoder extends ReplayingDecoder{ private final Queue values = new LinkedList (); @Override public void decode(.., ByteBuf buf, List
当调用第二次values.offer(buf.readInt());的时候可能没有足够多的字节数据,则会重复调用decode,但是此时queue中可能不止2条数据。
正确的做法应该是以下方式:
public class MyDecoder extends ReplayingDecoder{ private final Queue values = new LinkedList (); @Override public void decode(.., ByteBuf buf, List
当数据不够需要重新调用decode的时候,做一次清除queue的操作来保证队列正确的数据
但是我们可以通过调用checkpoint()方法来更新buffer的初始位置,让readerIndex能够重新回到该起始位置。从而避免多次重复的调用decode来对同一个消息多次解码
下面的这些类处理更加复杂的用例:
自定义协议编解码来解决粘包与拆包:
关于Netty编解码器的重要结论:
同样对于编码器和解码器来说,引用计数也需要特别注意,一旦消息被编码或者解码,它就会被ReferenceCountUtil.release(message)调用自动释放,如果需要保留引用以便稍后使用,那么可以调用referenceCountUtil.retain(message)方法来增加该引用计数,从而防止该消息被释放
转载地址:http://ehrgi.baihongyu.com/