Netty是一款用于快速开发高性能的网络应用程序的Java框架,正是因为有 Netty 的存在,网络编程领域 Java 才得以与 C++ 并肩而立。
- Netty 官网给出了有关 Netty 的整体功能模块结构
- Core核心层
提供底层网络通信抽象和实现,其中包括了可扩展的时间模型、通信API、支持零拷贝的buf等。
- Protocol Support 协议支持层
协议支持层基本上覆盖了主流协议的编解码实现,比如HTTP、SSL、WebSocket
- Transport Service 传输服务层
传输服务层提供了网络传输能力的定义和实现方法,支持 Socket、HTTP 隧道等。Netty 对 TCP、UDP 等数据传输做了抽象和封装
Netty流程
从功能上理解顺序
- 启动服务 -> 构建连接 -> 接受数据 -> 业务处理 -> 发送数据 -> 断开连接 -> 关闭服务
案例
- 初始化线程池
- 初始化channel
- 绑定端口并启动
@Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ctx.write(msg);
}
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
}
}
/**
* Echoes back any received data from a client.
*/
public final class EchoServer {
static final boolean SSL = System.getProperty("ssl") != null;
static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));
public static void main(String[] args) throws Exception {
// Configure SSL.
final SslContext sslCtx;
if (SSL) {
SelfSignedCertificate ssc = new SelfSignedCertificate();
sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
} else {
sslCtx = null;
}
// Configure the server.
// 在官方的example中默认是Reactor主从多线程模式
// 1.配置线程池
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
final EchoServerHandler serverHandler = new EchoServerHandler();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // 2.初始化channel类型
.option(ChannelOption.SO_BACKLOG, 100) // 2.1.设置channel参数
.handler(new LoggingHandler(LogLevel.INFO))
// 注册channelhandler,在netty中通过ChannelPipeline 去注册多个 ChannelHandler
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
}
});
// Start the server.
// 3.启动 通过bind() 方法会真正触发启动,sync() 方法则会阻塞
ChannelFuture f = b.bind(PORT).sync();
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
bind方法
private ChannelFuture doBind(final SocketAddress localAddress) {
// 1.调用 initAndRegister()方法初始化Channel,接受返回到ChannelFuture。
final ChannelFuture regFuture = initAndRegister();
// 2.调用 ChannelFuture.channel方法,返回Channel。
final Channel channel = regFuture.channel();
// 3.调用 ChannelFuture.cause方法判断 initAndRegister()是否存在异常。存在异常则直接返回
if (regFuture.cause() != null) {
return regFuture;
}
// 4.调用 ChannelFuture.isDone方法,判断initAndRegister()方法是否执行完成。
// 如果执行完成则调用bind0方法,如果没有执行完成,ChannelFuture 添加一个ChannelFutureListener回调监听,当initAndRegister()方法执行完成后回调operationComplete方法,调用调用 ChannelFuture.cause()方法判断initAndRegister()是否存在异常,无异常在执行bind0方法。
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
Channel
Netty网络通信组件、用于执行网络I/O操作。
提供网络相关配置信息
Channel参数
SO_KEEPALIVE
- 设置为 true 代表启用了 TCP SO_KEEPALIVE 属性,TCP 会主动探测连接状态,即连接保活
SO_BACKLOG
- 已完成三次握手的请求队列最大长度,同一时刻服务端可能会处理多个连接,在高并发海量连接的场景下,该参数应适当调大
TCP_NODELAY
- Netty 默认是 true,表示立即发送数据。如果设置为 false 表示启用 Nagle 算法,该算法会将 TCP 网络数据包累积到一定量才会发送,虽然可以减少报文发送的数量,但是会造成一定的数据延迟。Netty 为了最小化数据传输的延迟,默认禁用了 Nagle 算法
SO_SNDBUF TCP
- 数据发送缓冲区大小
SO_RCVBUF
- TCP数据接收缓冲区大小,TCP数据接收缓冲区大小
SO_LINGER
- 设置延迟关闭的时间,等待缓冲区中的数据发送完成
CONNECT_TIMEOUT_MILLIS
- 连接超时时间
常用的Channel类型
NioSocketChannel,异步的客户端 TCP Socket 连接。
NioServerSocketChannel,异步的服务器端 TCP Socket 连接。
NioDatagramChannel,异步的 UDP 连接。
NioSctpChannel,异步的客户端 Sctp 连接。
NioSctpServerChannel,异步的 Sctp 服务器端连接,这些通道涵盖了 UDP 和 TCP 网络 IO 以及文件 IO。
EventLoop
EventLoop并不是Netty独有的,它是一种事件等待和处理的程序模型,可以解决多线程资源消耗高的问题。
- 在 Netty 中 EventLoop 可以理解为 Reactor 线程模型的事件处理引擎,每个 EventLoop 线程都维护一个 Selector 选择器和任务队列 taskQueue。它主要负责处理 I/O 事件、普通任务和定时任务。Netty 中推荐使用 NioEventLoop 作为实现类。
- 上图是EventLoop常见模型
- 每次出现事件的时候,Eventloop会将事件存放在Event Queue事件队列中,通过轮询取出事件执行或者将事件分发给响应的事件监听执行,事件执行的方式通常分为立即执行、延后执行、定期执行几种。
run方法
@Override
protected void run() {
for (;;) {
try {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
// fall-through to SELECT since the busy-wait is not supported with NIO
case SelectStrategy.SELECT: // 处理select事件
// 轮训io事件
select(wakenUp.getAndSet(false));
// 'wakenUp.compareAndSet(false, true)' is always evaluated
// before calling 'selector.wakeup()' to reduce the wake-up
// overhead. (Selector.wakeup() is an expensive operation.)
//
// However, there is a race condition in this approach.
// The race condition is triggered when 'wakenUp' is set to
// true too early.
//
// 'wakenUp' is set to true too early if:
// 1) Selector is waken up between 'wakenUp.set(false)' and
// 'selector.select(...)'. (BAD)
// 2) Selector is waken up between 'selector.select(...)' and
// 'if (wakenUp.get()) { ... }'. (OK)
//
// In the first case, 'wakenUp' is set to true and the
// following 'selector.select(...)' will wake up immediately.
// Until 'wakenUp' is set to false again in the next round,
// 'wakenUp.compareAndSet(false, true)' will fail, and therefore
// any attempt to wake up the Selector will fail, too, causing
// the following 'selector.select(...)' call to block
// unnecessarily.
//
// To fix this problem, we wake up the selector again if wakenUp
// is true immediately after selector.select(...).
// It is inefficient in that it wakes up the selector for both
// the first case (BAD - wake-up required) and the second case
// (OK - no wake-up required).
if (wakenUp.get()) {
selector.wakeup();
}
// fall through
default:
}
} catch (IOException e) {
// If we receive an IOException here its because the Selector is messed up. Let's rebuild
// the selector and retry. https://github.com/netty/netty/issues/8566
rebuildSelector0();
handleLoopException(e);
continue;
}
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
processSelectedKeys(); // 处理io事件
} finally {
// Ensure we always run tasks.
runAllTasks(); // 处理所有任务
}
} else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys(); // 处理io事件
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}
EventLoop良好的实践
- 网络连接建立过程中三次握手、安全认证的过程会消耗不少时间。建议使用Reactor混合模式采用 Boss 和 Worker 两个 EventLoopGroup。
- Reactor 线程模式适合处理耗时短的任务场景,耗时较长的 ChannelHandler 可以考虑维护一个业务线程池。避免 ChannelHandler 阻塞而造成 EventLoop 不可用。
- 如果业务逻辑执行时间较短,建议直接在 ChannelHandler 中执行。减少系统复杂度。
- 尽量少的ChannelHandler,尽量将业务独立出去。
ChannelPipeline
实际编码过程中,业务处理逻辑是 ChannelPipeline 中所定义的 ChannelHandler 完成的,Netty 服务编排层的核心组件 ChannelPipeline 和 ChannelHandler 为用户提供了 I/O 事件管理。
- 从ChannelPipeline注释上面抓的图
- channelPipeline采用责任链没事,Inbound入站、outbound出站。
- 每个 Channel 会绑定一个 ChannelPipeline,每一个 ChannelPipeline 都包含多个ChannelHandlerContext,所有 ChannelHandlerContext 之间组成了双向链表。ChannelPipeline 的双向链表分别维护了 HeadContext 和 TailContext 的头尾节点。自定义的 ChannelHandler 会插入到 Head 和 Tail 之间。
- ChannelHandlerContext包含了ChannelHandler 生命周期的所有事件。入站的时候是从headContext往tailContext执行,出站的时候tailContext往headContext执行。
serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch){
ChannelPipeline channelPipeline = ch.pipeline();
channelPipeline.addLast("idleHandler", new ServerIdleCheckHandler());
channelPipeline.addLast("frameDecoder",new FrameDecoder());
channelPipeline.addLast("frameEncoder",new FrameEncoder());
channelPipeline.addLast("protocolDecoder",new ProtocolDecoder());
channelPipeline.addLast("protocolEncoder",new ProtocolEncoder());
channelPipeline.addLast("loggingHandler",new LoggingHandler(LogLevel.INFO));
channelPipeline.addLast("protocolProcessHandler",new ServerProtocolProcessHandler());
}
});
- 运行效果
22:10:57 [work-3-1] FrameDecoder: running
22:10:57 [work-3-1] FrameEncoder: running
22:10:57 [work-3-1] ProtocolDecoder: running
22:10:57 [work-3-1] ServerProtocolProcessHandler: running
22:10:57 [work-3-1] ProtocolEncoder: running
ChannelPipeline异常处理
- pipleline是责任链模式,推荐在自定义handler末端添加同一的异常处理器。根据异常信息处理逻辑。
ByteBuf
Netty中的数据容器,Java Nio中也提供了ByteBuffer但是使用复杂。
Zero-Copy
Netty 的
Zero-copy
,零拷贝是一个耳熟能详的词语,在 Linux、Kafka、RocketMQ 等知名的产品中都有使用,通常用于提升 I/O 性能。
- Netty 这样的类库,往往对性能有着偏执的追求。为了避免 Java 垃圾收集器不可预测的行为以及额外的性能开销,这些产品一般倾向于使用 JVM 之外的内存来存储和管理数据。这样的数据,就是我们常说的堆外数据(off-heap data)。
- 使用堆外存储最常用的办法,就是使用 ByteBuffer 这个类来分配直接存储空间(direct buffer)。JVM 虚拟机会尽最大努力直接在直接存储空间上执行 IO 操作,避免数据在本地和 JVM 之间的拷贝。
- 频繁的内存拷贝是性能的主要障碍之一。所以为了极致的性能,应用程序通常也会尽量避免内存的拷贝。理想的状况下,一份数据只需要一份内存空间,这就是我们常说的零拷贝。
Keepalive
keepalive就是心跳,一个人的心跳证明人还活着,那么在网络通信的双方如何证明对端还活着着,两个服务之间使用心跳来检测对方是否还活着。
Netty中的设计模式
- 单例 ReadTimeoutException#INSTANCE
- 工厂 ReflectiveChannelFactory
- 策略 EventExecutorChooserFactory
- 装饰器 WrappedByteBuf
- 模版 AbstractTrafficShapingHandler
- 责任链 ChannelPipeline ChannelHandler
- 观察者 ChannelFuture#addListener
扩展
良好的实践
- 通过修改
线程名
方便我们在debug或者异常追踪,通过NioEventLoopGroup的构造方法来修改名称
EventLoopGroup bossGroup = new NioEventLoopGroup(0,new DefaultThreadFactory("boss"));
EventLoopGroup workerGroup = new NioEventLoopGroup(,new DefaultThreadFactory("work"));
- 修改
Handler
名称,如果不设置Handler的名称,会使用默认的类名。添加到pipleline到时候指定名字。
channelPipeline.addLast("idleHandler", new ServerIdleCheckHandler());
channelPipeline.addLast("frameDecoder",new FrameDecoder());
channelPipeline.addLast("frameEncoder",new FrameEncoder());
- 日志,这里从Netty自带的LoggingHandler跟踪源码,这里可以看到优先使用的是SLF4J。但是需要手动引入相应的jar依赖。
private static InternalLoggerFactory newDefaultFactory(String name) {
InternalLoggerFactory f;
try {
f = new Slf4JLoggerFactory(true);
f.newInstance(name).debug("Using SLF4J as the default logging framework");
} catch (Throwable ignore1) {
try {
f = Log4JLoggerFactory.INSTANCE;
f.newInstance(name).debug("Using Log4J as the default logging framework");
} catch (Throwable ignore2) {
try {
f = Log4J2LoggerFactory.INSTANCE;
f.newInstance(name).debug("Using Log4J2 as the default logging framework");
} catch (Throwable ignore3) {
f = JdkLoggerFactory.INSTANCE;
f.newInstance(name).debug("Using java.util.logging as the default logging framework");
}
}
}
return f;
}
- 在不同的平台切换Nio native实现,这里需要注意开发环境和生产环境。在生产环境还需要注意权限问题、并且准好号native的库。
// mac
EventLoopGroup bossGroup = new KQueueEventLoopGroup(0,new DefaultThreadFactory("boss"));
EventLoopGroup workerGroup = new KQueueEventLoopGroup(0,new DefaultThreadFactory("work"));
// linxu
EventLoopGroup bossGroup = new EpollEventLoopGroup(0,new DefaultThreadFactory("boss"));
EventLoopGroup workerGroup = new EpollEventLoopGroup(0,new DefaultThreadFactory("work"));
// 注意开发环境与生产环境,
为什么要使用netty而不使用Jdk自带的NIO
更好的api、更加稳定、可扩展性
- 相比nio,netty做的更多,做的更好。
- 使用jdk nio,我们使用 JDK NIO 编程需要了解很多复杂的概念,比如 Channels、Selectors、Sockets、Buffers 等、还需要解决沾包、半包的问题,还需要做大量的定制化需求。网络本身就很复杂,比如短线重连、心跳。
- netty解决了大量nio的bug,比如select空转导致cpu 100%,使用netty可以很好的规避大量存在的问题,毕竟jdk也是人写的。
- netty在扩展性上,在线程模型可以通过参数配置选择线程模型。
- JDK的NIO默认是水平触发,Netty是边缘触发(默认)和水平触发可以切换。
- Netty实现的垃圾回收更少、性能更好。
经典的三种I/O模式
BIO(阻塞I/O)、NIO(非阻塞I/O)、AIO(异步I/O)
- 阻塞于非阻塞
- 阻塞情况下,没有数据传输过来时,读操作会阻塞一直等待到有数据,缓冲区写满的时候,写做操也会阻塞。非阻塞遇到这种情况都是直接返回。
- 同步于异步
- 数据准备就绪需要自己去读取就是同步,数据就绪直接读取好回调给程序就是异步。
netty如何实现三种I/O模式
- BIO在netty中也叫OIO和AIO一起曾经都支持过。现在Netty仅支持NIO。
nio一定优于bio吗
- 在连接数少、并发低的场景下,BIO性能并不输NIO。
什么是水平触发、什么是边缘触发
边缘触发相当于高速模式,理论上效率更高,但是复杂度也高,所以现在大多应用(Redis等)还是默认水平触发,如果追求要更好的性能、同时有信心编码好,可以尝试使用边缘触发,例如nginx。边缘触发只支持非阻塞模式。
- 当被监控的文件有可读写事件发生时,epoll_wait()会通知处理程序去读写,如果这次没有把数据一次性全部读写完的话,水平触发:那么下次调用 epoll_wait()时通知你上次没读写完,如果一直不处理它会一直通知你;边缘触发:下次调用 epoll_wait()的时候不会通知你,也就是只通知一次,知道该文件上出现第二次可读写事件才会通知,效率比水平触发要高
Netty对Reactor的支持
nio在netty中的是由reactor实现的,Reactor是一种开发模式,在netty中Reactor有三种模式。
- 单线程模型:EventLoopGroup只包含一个EventLoop,Boss和Worker使用同一个EventLoopGroup
- 多线程模型:EventLoopGroup包含多个EventLoop,Boss和Worker使用同一个EventLoopGroup
- 主从多线程模型:EventLoopGroup包含多个EventLoop,Boss是主Reactor,Worker是从Reactor,它们分别使用不同的EventLoopGroup,主Reactor负责新的网络连接Channel创建,然后把Channel注册到从Reactor
Reactor单线程模式
- Reactor单线程模式所有的I/O操作都由一个线程完成。仅需要启动一个NioEventLoopGroup。
EventLoopGroup group = new NioEventLoopGroup(1);
ServerBootstrap b = new ServerBootstrap();
b.group(group)

- 一个线程支持的连接数是有限的,性能方面cpu容易跑满。
- 当多个事件同时触发的时候,容易出现柱塞的情况,会导致消息的积压、客户端请求超时。
- 线程在处理I/O事件,Select无法处理其他操作,比如建立连接、事件分发等操作。长时间的线程满负载,容易导致服务节点不可用。
非主从Reactor多线程模式
- 由于Reactor出现的严重的性能问题,因此出现了多线程模型,在创建NioEventLoopGroup的时候可以不指定大小,默认是2 * cpu的线程数量,也可以手动设置固定的线程数。
EventLoopGroup group = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(group)

主从Reactor多线程模式
- 现在主流的开发主要采用主从多线程模式,Boos是主Reactor,worker是从Reactor。它们分别使用不同的NioEventLoopGroup,主Reactor负责处理ON_ACCEPT,然后把Channel注册到Reactor上面,从Reactor负责Channel生命周期内的IO事件。
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)

c10k
它的意思是一台机器要维护 1 万个连接,就要创建 1 万个进程或者线程,那么操作系统是无法承受的。
select、poll、epoll
Select | Poll | Poll | |
---|---|---|---|
底层实现 | 数组 | 链表 | 哈希表 |
I/O | 每次调用线性遍历、时间复杂度O(n) | 每次调用线性遍历、时间复杂度O(n) | 时间通知方式、时间复杂度O(1) |
最大连接数 | 1024(x86)、2048(x64) | 无上限 | 无上限 |
Netty通用模式下NIO实现多路复用器是如何跨平台的
netty在不同平台单独实现了NIO,据说性能更好。windows iocp 、mac kqueue 、linux epool
- Netty通用模式一般指的是NioEnventLoopGroup、NioEventLoop、NioSelectSocketChannel、NioSocketChannel的使用场景下,实际上针对Linux和mac单独做了NIO的实现。
// 该方法在不同的平台上返回不同的类型。
sun.nio.ch.DefaultSelectorProvider.create()
package sun.nio.ch;
import java.nio.channels.spi.SelectorProvider;
public class DefaultSelectorProvider {
private DefaultSelectorProvider() {
}
public static SelectorProvider create() {
// 在mac/bsd的环境下返回KQueue
return new KQueueSelectorProvider();
}
}
Netty 是如何解决 epoll 空轮询的 Bug
在 JDK 中, Epoll 的实现是存在漏洞的,即使 Selector 轮询的事件列表为空,NIO 线程一样可以被唤醒,导致 CPU 100% 占用。这就是臭名昭著的 JDK epoll 空轮询的 Bug
// 每次执行 Select 操作之前记录当前时间 currentTimeNanos。
long time = System.nanoTime();
// time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos,
// 如果事件轮询的持续时间大于等于 timeoutMillis,那么说明是正常的,否则表明阻塞时间并未达到预期,可能触发了空轮询的 Bug。
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
// timeoutMillis elapsed without anything selected.
selectCnt = 1;
// 计数变量 selectCnt。在正常情况下,selectCnt 会重置,
// 否则会对 selectCnt 自增计数。当 selectCnt 达到 SELECTOR_AUTO_REBUILD_THRESHOLD(默认512) 阈值时,
// 会触发重建 Selector 对象。
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// The code exists in an extra method to ensure the method is not too big to inline as this
// branch is not very likely to get hit very frequently.
selector = selectRebuildSelector(selectCnt);
selectCnt = 1;
break;
}
currentTimeNanos = time;
TCP/IP中粘包、半包
TCP是流式协议,数据是无清晰边界的。拆包/粘包问题的存在,数据接收方很难界定数据包的边界在哪里,很难识别出一个完整的数据包。所以需要提供一种机制来识别数据包的界限,这也是解决拆包/粘包的唯一方法:定义应用层的通信协议。
粘包
- 数据发送方每次写入数据大小小于套接字缓存区大小。合并发送数据的时候。
半包
- 发送方时写入数据大小小于套接字缓存区大小。(发送的数据大于TCP协议中MTU最大传输单位)必须拆包。
如何解决
封装成帧(framing)
消息长度固定
- 每个数据报文都需要一个固定的长度。当接收方累计读取到固定长度的报文后,就认为已经获得一个完整的消息。当发送方的数据小于固定长度时,则需要空位补齐。
特定分隔符
- 发送报文时尾部需要添加特定分隔符,推荐的做法是将消息进行编码,例如 base64 编码,然后可以选择 64 个编码字符之外的字符作为特定分隔符。Redis 在通信过程中采用的就是换行分隔符
\n
、\r\n
。
- 发送报文时尾部需要添加特定分隔符,推荐的做法是将消息进行编码,例如 base64 编码,然后可以选择 64 个编码字符之外的字符作为特定分隔符。Redis 在通信过程中采用的就是换行分隔符
消息长度 + 消息内容
- 先解析固定长度的字段获取长度,然后读取后续的内容。长度理论上有限制,需要提前预知可能的最大长度。
Netty 对三种常用封帧方式的支持
Netty中解决粘包半包的编解码叫做一次解码器。
固定长度
FixedLengthFrameDecoder
分隔符
DelimiterBasedFrameDecoder
固定长度字段存个内容的长度信息
LengthFieldBasedFrameDecoder
LengthFieldPrepender
- LengthFieldBasedFrameDecoder按照指定的包长度偏移量对接受的数据进行编码。
- LengthFieldPrepender 在响应数据的时候从封装数据,在数据前面增加数据的长度。
二次编解码方式
Netty内置编解码器在实际工作中需要二次编解码,比如我们业务数据是json格式,就需要将原始数据转换成用户数据
- 在netty源码中内置了很多二次解码器java序列化、XML、JSON等等,都在codec包下。
- 比如使用protobuf二次解码器
ch.pipeline().addLast(new ProtobufVarint32FrameDecoder());
ch.pipeline().addLast(new ProtobufDecoder(PersonOuterClass.Person.getDefaultInstance()));
ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
ch.pipeline().addLast(new ProtobufEncoder());
Linux系统参数
- 修改linux对单一进程打开的文件句柄数量限制
ulimit -n [xxx]
尽量不要做临时的设置。
有TCP的keepalive,为什么还需要应用层的keepalive
TCP中的keepalive默认是关闭,因此探测包在传递中可能丢失,默认的超时时间太长,默认是7200+9*75秒,也就是2个多小时。TCP是一个传输层的协议,传输层的数据畅通并不一定操作系统进程所对应的服务畅通。