0%

Java-Netty

Netty是一款用于快速开发高性能的网络应用程序的Java框架,正是因为有 Netty 的存在,网络编程领域 Java 才得以与 C++ 并肩而立。

  • Netty 官网给出了有关 Netty 的整体功能模块结构

  • Core核心层

提供底层网络通信抽象和实现,其中包括了可扩展的时间模型、通信API、支持零拷贝的buf等。

  • Protocol Support 协议支持层

协议支持层基本上覆盖了主流协议的编解码实现,比如HTTP、SSL、WebSocket

  • Transport Service 传输服务层

传输服务层提供了网络传输能力的定义和实现方法,支持 Socket、HTTP 隧道等。Netty 对 TCP、UDP 等数据传输做了抽象和封装

Netty流程

从功能上理解顺序

  • 启动服务 -> 构建连接 -> 接受数据 -> 业务处理 -> 发送数据 -> 断开连接 -> 关闭服务

案例

  • 初始化线程池
  • 初始化channel
  • 绑定端口并启动
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79

@Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ctx.write(msg);
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
ctx.flush();
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
}
}


/**
* Echoes back any received data from a client.
*/
public final class EchoServer {

static final boolean SSL = System.getProperty("ssl") != null;
static final int PORT = Integer.parseInt(System.getProperty("port", "8007"));

public static void main(String[] args) throws Exception {
// Configure SSL.
final SslContext sslCtx;
if (SSL) {
SelfSignedCertificate ssc = new SelfSignedCertificate();
sslCtx = SslContextBuilder.forServer(ssc.certificate(), ssc.privateKey()).build();
} else {
sslCtx = null;
}

// Configure the server.
// 在官方的example中默认是Reactor主从多线程模式
// 1.配置线程池
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
final EchoServerHandler serverHandler = new EchoServerHandler();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // 2.初始化channel类型
.option(ChannelOption.SO_BACKLOG, 100) // 2.1.设置channel参数
.handler(new LoggingHandler(LogLevel.INFO))
// 注册channelhandler,在netty中通过ChannelPipeline 去注册多个 ChannelHandler
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
if (sslCtx != null) {
p.addLast(sslCtx.newHandler(ch.alloc()));
}
//p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
}
});

// Start the server.
// 3.启动 通过bind() 方法会真正触发启动,sync() 方法则会阻塞
ChannelFuture f = b.bind(PORT).sync();
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}

bind方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
private ChannelFuture doBind(final SocketAddress localAddress) {
// 1.调用 initAndRegister()方法初始化Channel,接受返回到ChannelFuture。
final ChannelFuture regFuture = initAndRegister();
// 2.调用 ChannelFuture.channel方法,返回Channel。
final Channel channel = regFuture.channel();
// 3.调用 ChannelFuture.cause方法判断 initAndRegister()是否存在异常。存在异常则直接返回
if (regFuture.cause() != null) {
return regFuture;
}
// 4.调用 ChannelFuture.isDone方法,判断initAndRegister()方法是否执行完成。
// 如果执行完成则调用bind0方法,如果没有执行完成,ChannelFuture 添加一个ChannelFutureListener回调监听,当initAndRegister()方法执行完成后回调operationComplete方法,调用调用 ChannelFuture.cause()方法判断initAndRegister()是否存在异常,无异常在执行bind0方法。
if (regFuture.isDone()) {
// At this point we know that the registration was complete and successful.
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// Registration future is almost always fulfilled already, but just in case it's not.
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
// Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an
// IllegalStateException once we try to access the EventLoop of the Channel.
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
// See https://github.com/netty/netty/issues/2586
promise.registered();

doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}

Channel

Netty网络通信组件、用于执行网络I/O操作。

  • 提供网络相关配置信息

  • Channel参数

    • SO_KEEPALIVE

      • 设置为 true 代表启用了 TCP SO_KEEPALIVE 属性,TCP 会主动探测连接状态,即连接保活
    • SO_BACKLOG

      • 已完成三次握手的请求队列最大长度,同一时刻服务端可能会处理多个连接,在高并发海量连接的场景下,该参数应适当调大
    • TCP_NODELAY

      • Netty 默认是 true,表示立即发送数据。如果设置为 false 表示启用 Nagle 算法,该算法会将 TCP 网络数据包累积到一定量才会发送,虽然可以减少报文发送的数量,但是会造成一定的数据延迟。Netty 为了最小化数据传输的延迟,默认禁用了 Nagle 算法
    • SO_SNDBUF TCP

      • 数据发送缓冲区大小
    • SO_RCVBUF

      • TCP数据接收缓冲区大小,TCP数据接收缓冲区大小
    • SO_LINGER

      • 设置延迟关闭的时间,等待缓冲区中的数据发送完成
    • CONNECT_TIMEOUT_MILLIS

      • 连接超时时间
  • 常用的Channel类型

    • NioSocketChannel,异步的客户端 TCP Socket 连接。

    • NioServerSocketChannel,异步的服务器端 TCP Socket 连接。

    • NioDatagramChannel,异步的 UDP 连接。

    • NioSctpChannel,异步的客户端 Sctp 连接。

    • NioSctpServerChannel,异步的 Sctp 服务器端连接,这些通道涵盖了 UDP 和 TCP 网络 IO 以及文件 IO。

EventLoop

EventLoop并不是Netty独有的,它是一种事件等待和处理的程序模型,可以解决多线程资源消耗高的问题。

  • 在 Netty 中 EventLoop 可以理解为 Reactor 线程模型的事件处理引擎,每个 EventLoop 线程都维护一个 Selector 选择器和任务队列 taskQueue。它主要负责处理 I/O 事件、普通任务和定时任务。Netty 中推荐使用 NioEventLoop 作为实现类。

  • 上图是EventLoop常见模型
    • 每次出现事件的时候,Eventloop会将事件存放在Event Queue事件队列中,通过轮询取出事件执行或者将事件分发给响应的事件监听执行,事件执行的方式通常分为立即执行、延后执行、定期执行几种。
run方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
@Override
protected void run() {
for (;;) {
try {
try {
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;

case SelectStrategy.BUSY_WAIT:
// fall-through to SELECT since the busy-wait is not supported with NIO

case SelectStrategy.SELECT: // 处理select事件
// 轮训io事件
select(wakenUp.getAndSet(false));

// 'wakenUp.compareAndSet(false, true)' is always evaluated
// before calling 'selector.wakeup()' to reduce the wake-up
// overhead. (Selector.wakeup() is an expensive operation.)
//
// However, there is a race condition in this approach.
// The race condition is triggered when 'wakenUp' is set to
// true too early.
//
// 'wakenUp' is set to true too early if:
// 1) Selector is waken up between 'wakenUp.set(false)' and
// 'selector.select(...)'. (BAD)
// 2) Selector is waken up between 'selector.select(...)' and
// 'if (wakenUp.get()) { ... }'. (OK)
//
// In the first case, 'wakenUp' is set to true and the
// following 'selector.select(...)' will wake up immediately.
// Until 'wakenUp' is set to false again in the next round,
// 'wakenUp.compareAndSet(false, true)' will fail, and therefore
// any attempt to wake up the Selector will fail, too, causing
// the following 'selector.select(...)' call to block
// unnecessarily.
//
// To fix this problem, we wake up the selector again if wakenUp
// is true immediately after selector.select(...).
// It is inefficient in that it wakes up the selector for both
// the first case (BAD - wake-up required) and the second case
// (OK - no wake-up required).

if (wakenUp.get()) {
selector.wakeup();
}
// fall through
default:
}
} catch (IOException e) {
// If we receive an IOException here its because the Selector is messed up. Let's rebuild
// the selector and retry. https://github.com/netty/netty/issues/8566
rebuildSelector0();
handleLoopException(e);
continue;
}

cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
processSelectedKeys(); // 处理io事件
} finally {
// Ensure we always run tasks.
runAllTasks(); // 处理所有任务
}
} else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys(); // 处理io事件
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}
EventLoop良好的实践
  • 网络连接建立过程中三次握手、安全认证的过程会消耗不少时间。建议使用Reactor混合模式采用 Boss 和 Worker 两个 EventLoopGroup。
  • Reactor 线程模式适合处理耗时短的任务场景,耗时较长的 ChannelHandler 可以考虑维护一个业务线程池。避免 ChannelHandler 阻塞而造成 EventLoop 不可用。
  • 如果业务逻辑执行时间较短,建议直接在 ChannelHandler 中执行。减少系统复杂度。
  • 尽量少的ChannelHandler,尽量将业务独立出去。

ChannelPipeline

实际编码过程中,业务处理逻辑是 ChannelPipeline 中所定义的 ChannelHandler 完成的,Netty 服务编排层的核心组件 ChannelPipeline 和 ChannelHandler 为用户提供了 I/O 事件管理。

  • 从ChannelPipeline注释上面抓的图

  • channelPipeline采用责任链没事,Inbound入站、outbound出站。

  • 每个 Channel 会绑定一个 ChannelPipeline,每一个 ChannelPipeline 都包含多个ChannelHandlerContext,所有 ChannelHandlerContext 之间组成了双向链表。ChannelPipeline 的双向链表分别维护了 HeadContext 和 TailContext 的头尾节点。自定义的 ChannelHandler 会插入到 Head 和 Tail 之间。
  • ChannelHandlerContext包含了ChannelHandler 生命周期的所有事件。入站的时候是从headContext往tailContext执行,出站的时候tailContext往headContext执行。
1
2
3
4
5
6
7
8
9
10
11
12
13
serverBootstrap.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch){
ChannelPipeline channelPipeline = ch.pipeline();
channelPipeline.addLast("idleHandler", new ServerIdleCheckHandler());
channelPipeline.addLast("frameDecoder",new FrameDecoder());
channelPipeline.addLast("frameEncoder",new FrameEncoder());
channelPipeline.addLast("protocolDecoder",new ProtocolDecoder());
channelPipeline.addLast("protocolEncoder",new ProtocolEncoder());
channelPipeline.addLast("loggingHandler",new LoggingHandler(LogLevel.INFO));
channelPipeline.addLast("protocolProcessHandler",new ServerProtocolProcessHandler());
}
});
  • 运行效果
1
2
3
4
5
6

22:10:57 [work-3-1] FrameDecoder: running
22:10:57 [work-3-1] FrameEncoder: running
22:10:57 [work-3-1] ProtocolDecoder: running
22:10:57 [work-3-1] ServerProtocolProcessHandler: running
22:10:57 [work-3-1] ProtocolEncoder: running
ChannelPipeline异常处理
  • pipleline是责任链模式,推荐在自定义handler末端添加同一的异常处理器。根据异常信息处理逻辑。

ByteBuf

Netty中的数据容器,Java Nio中也提供了ByteBuffer但是使用复杂。

Zero-Copy

NettyZero-copy ,零拷贝是一个耳熟能详的词语,在 Linux、Kafka、RocketMQ 等知名的产品中都有使用,通常用于提升 I/O 性能。

  • Netty 这样的类库,往往对性能有着偏执的追求。为了避免 Java 垃圾收集器不可预测的行为以及额外的性能开销,这些产品一般倾向于使用 JVM 之外的内存来存储和管理数据。这样的数据,就是我们常说的堆外数据(off-heap data)。
  • 使用堆外存储最常用的办法,就是使用 ByteBuffer 这个类来分配直接存储空间(direct buffer)。JVM 虚拟机会尽最大努力直接在直接存储空间上执行 IO 操作,避免数据在本地和 JVM 之间的拷贝。
  • 频繁的内存拷贝是性能的主要障碍之一。所以为了极致的性能,应用程序通常也会尽量避免内存的拷贝。理想的状况下,一份数据只需要一份内存空间,这就是我们常说的零拷贝。

Keepalive

keepalive就是心跳,一个人的心跳证明人还活着,那么在网络通信的双方如何证明对端还活着着,两个服务之间使用心跳来检测对方是否还活着。

Netty中的设计模式

  • 单例 ReadTimeoutException#INSTANCE
  • 工厂 ReflectiveChannelFactory
  • 策略 EventExecutorChooserFactory
  • 装饰器 WrappedByteBuf
  • 模版 AbstractTrafficShapingHandler
  • 责任链 ChannelPipeline ChannelHandler
  • 观察者 ChannelFuture#addListener

扩展

良好的实践

  • 通过修改线程名方便我们在debug或者异常追踪,通过NioEventLoopGroup的构造方法来修改名称
1
2
EventLoopGroup bossGroup = new NioEventLoopGroup(0,new DefaultThreadFactory("boss"));
EventLoopGroup workerGroup = new NioEventLoopGroup(,new DefaultThreadFactory("work"));
  • 修改Handler名称,如果不设置Handler的名称,会使用默认的类名。添加到pipleline到时候指定名字。
1
2
3
 channelPipeline.addLast("idleHandler", new ServerIdleCheckHandler());
channelPipeline.addLast("frameDecoder",new FrameDecoder());
channelPipeline.addLast("frameEncoder",new FrameEncoder());
  • 日志,这里从Netty自带的LoggingHandler跟踪源码,这里可以看到优先使用的是SLF4J。但是需要手动引入相应的jar依赖。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private static InternalLoggerFactory newDefaultFactory(String name) {
InternalLoggerFactory f;
try {
f = new Slf4JLoggerFactory(true);
f.newInstance(name).debug("Using SLF4J as the default logging framework");
} catch (Throwable ignore1) {
try {
f = Log4JLoggerFactory.INSTANCE;
f.newInstance(name).debug("Using Log4J as the default logging framework");
} catch (Throwable ignore2) {
try {
f = Log4J2LoggerFactory.INSTANCE;
f.newInstance(name).debug("Using Log4J2 as the default logging framework");
} catch (Throwable ignore3) {
f = JdkLoggerFactory.INSTANCE;
f.newInstance(name).debug("Using java.util.logging as the default logging framework");
}
}
}
return f;
}
  • 在不同的平台切换Nio native实现,这里需要注意开发环境和生产环境。在生产环境还需要注意权限问题、并且准好号native的库。
1
2
3
4
5
6
7
// mac 
EventLoopGroup bossGroup = new KQueueEventLoopGroup(0,new DefaultThreadFactory("boss"));
EventLoopGroup workerGroup = new KQueueEventLoopGroup(0,new DefaultThreadFactory("work"));
// linxu
EventLoopGroup bossGroup = new EpollEventLoopGroup(0,new DefaultThreadFactory("boss"));
EventLoopGroup workerGroup = new EpollEventLoopGroup(0,new DefaultThreadFactory("work"));
// 注意开发环境与生产环境,

为什么要使用netty而不使用Jdk自带的NIO

更好的api、更加稳定、可扩展性

  • 相比nio,netty做的更多,做的更好。
  • 使用jdk nio,我们使用 JDK NIO 编程需要了解很多复杂的概念,比如 Channels、Selectors、Sockets、Buffers 等、还需要解决沾包、半包的问题,还需要做大量的定制化需求。网络本身就很复杂,比如短线重连、心跳。
  • netty解决了大量nio的bug,比如select空转导致cpu 100%,使用netty可以很好的规避大量存在的问题,毕竟jdk也是人写的。
  • netty在扩展性上,在线程模型可以通过参数配置选择线程模型。
  • JDK的NIO默认是水平触发,Netty是边缘触发(默认)和水平触发可以切换。
  • Netty实现的垃圾回收更少、性能更好。

经典的三种I/O模式

BIO(阻塞I/O)、NIO(非阻塞I/O)、AIO(异步I/O)

  • 阻塞于非阻塞
    • 阻塞情况下,没有数据传输过来时,读操作会阻塞一直等待到有数据,缓冲区写满的时候,写做操也会阻塞。非阻塞遇到这种情况都是直接返回。
  • 同步于异步
    • 数据准备就绪需要自己去读取就是同步,数据就绪直接读取好回调给程序就是异步。
netty如何实现三种I/O模式
  • BIO在netty中也叫OIO和AIO一起曾经都支持过。现在Netty仅支持NIO。
nio一定优于bio吗
  • 在连接数少、并发低的场景下,BIO性能并不输NIO。
什么是水平触发、什么是边缘触发

边缘触发相当于高速模式,理论上效率更高,但是复杂度也高,所以现在大多应用(Redis等)还是默认水平触发,如果追求要更好的性能、同时有信心编码好,可以尝试使用边缘触发,例如nginx。边缘触发只支持非阻塞模式。

  • 当被监控的文件有可读写事件发生时,epoll_wait()会通知处理程序去读写,如果这次没有把数据一次性全部读写完的话,水平触发:那么下次调用 epoll_wait()时通知你上次没读写完,如果一直不处理它会一直通知你;边缘触发:下次调用 epoll_wait()的时候不会通知你,也就是只通知一次,知道该文件上出现第二次可读写事件才会通知,效率比水平触发要高

Netty对Reactor的支持

nio在netty中的是由reactor实现的,Reactor是一种开发模式,在netty中Reactor有三种模式。

  • 单线程模型:EventLoopGroup只包含一个EventLoop,Boss和Worker使用同一个EventLoopGroup
  • 多线程模型:EventLoopGroup包含多个EventLoop,Boss和Worker使用同一个EventLoopGroup
  • 主从多线程模型:EventLoopGroup包含多个EventLoop,Boss是主Reactor,Worker是从Reactor,它们分别使用不同的EventLoopGroup,主Reactor负责新的网络连接Channel创建,然后把Channel注册到从Reactor
Reactor单线程模式
  • Reactor单线程模式所有的I/O操作都由一个线程完成。仅需要启动一个NioEventLoopGroup。
1
2
3
EventLoopGroup group = new NioEventLoopGroup(1);
ServerBootstrap b = new ServerBootstrap();
b.group(group)

![image-20211120161222982](/Users/zengqingfeng/Library/Application Support/typora-user-images/image-20211120161222982.png)

  • 一个线程支持的连接数是有限的,性能方面cpu容易跑满。
  • 当多个事件同时触发的时候,容易出现柱塞的情况,会导致消息的积压、客户端请求超时。
  • 线程在处理I/O事件,Select无法处理其他操作,比如建立连接、事件分发等操作。长时间的线程满负载,容易导致服务节点不可用。
非主从Reactor多线程模式
  • 由于Reactor出现的严重的性能问题,因此出现了多线程模型,在创建NioEventLoopGroup的时候可以不指定大小,默认是2 * cpu的线程数量,也可以手动设置固定的线程数。
1
2
3
EventLoopGroup group = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(group)

![image-20211120161554909](/Users/zengqingfeng/Library/Application Support/typora-user-images/image-20211120161554909.png)

主从Reactor多线程模式
  • 现在主流的开发主要采用主从多线程模式,Boos是主Reactor,worker是从Reactor。它们分别使用不同的NioEventLoopGroup,主Reactor负责处理ON_ACCEPT,然后把Channel注册到Reactor上面,从Reactor负责Channel生命周期内的IO事件。
1
2
3
4
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)

![image-20211120161838884](/Users/zengqingfeng/Library/Application Support/typora-user-images/image-20211120161838884.png)

c10k

它的意思是一台机器要维护 1 万个连接,就要创建 1 万个进程或者线程,那么操作系统是无法承受的。

select、poll、epoll

Select Poll Poll
底层实现 数组 链表 哈希表
I/O 每次调用线性遍历、时间复杂度O(n) 每次调用线性遍历、时间复杂度O(n) 时间通知方式、时间复杂度O(1)
最大连接数 1024(x86)、2048(x64) 无上限 无上限

Netty通用模式下NIO实现多路复用器是如何跨平台的

netty在不同平台单独实现了NIO,据说性能更好。windows iocp 、mac kqueue 、linux epool

  • Netty通用模式一般指的是NioEnventLoopGroup、NioEventLoop、NioSelectSocketChannel、NioSocketChannel的使用场景下,实际上针对Linux和mac单独做了NIO的实现。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 该方法在不同的平台上返回不同的类型。
sun.nio.ch.DefaultSelectorProvider.create()


package sun.nio.ch;
import java.nio.channels.spi.SelectorProvider;

public class DefaultSelectorProvider {
private DefaultSelectorProvider() {
}

public static SelectorProvider create() {
// 在mac/bsd的环境下返回KQueue
return new KQueueSelectorProvider();
}
}

Netty 是如何解决 epoll 空轮询的 Bug

在 JDK 中, Epoll 的实现是存在漏洞的,即使 Selector 轮询的事件列表为空,NIO 线程一样可以被唤醒,导致 CPU 100% 占用。这就是臭名昭著的 JDK epoll 空轮询的 Bug

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 每次执行 Select 操作之前记录当前时间 currentTimeNanos。
long time = System.nanoTime();
// time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos,
// 如果事件轮询的持续时间大于等于 timeoutMillis,那么说明是正常的,否则表明阻塞时间并未达到预期,可能触发了空轮询的 Bug。
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
// timeoutMillis elapsed without anything selected.
selectCnt = 1;
// 计数变量 selectCnt。在正常情况下,selectCnt 会重置,
// 否则会对 selectCnt 自增计数。当 selectCnt 达到 SELECTOR_AUTO_REBUILD_THRESHOLD(默认512) 阈值时,
// 会触发重建 Selector 对象。
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// The code exists in an extra method to ensure the method is not too big to inline as this
// branch is not very likely to get hit very frequently.
selector = selectRebuildSelector(selectCnt);
selectCnt = 1;
break;
}
currentTimeNanos = time;

TCP/IP中粘包、半包

TCP是流式协议,数据是无清晰边界的。拆包/粘包问题的存在,数据接收方很难界定数据包的边界在哪里,很难识别出一个完整的数据包。所以需要提供一种机制来识别数据包的界限,这也是解决拆包/粘包的唯一方法:定义应用层的通信协议。

粘包
  • 数据发送方每次写入数据大小小于套接字缓存区大小。合并发送数据的时候。
半包
  • 发送方时写入数据大小小于套接字缓存区大小。(发送的数据大于TCP协议中MTU最大传输单位)必须拆包。
如何解决

封装成帧(framing)

  • 消息长度固定

    • 每个数据报文都需要一个固定的长度。当接收方累计读取到固定长度的报文后,就认为已经获得一个完整的消息。当发送方的数据小于固定长度时,则需要空位补齐。
  • 特定分隔符

    • 发送报文时尾部需要添加特定分隔符,推荐的做法是将消息进行编码,例如 base64 编码,然后可以选择 64 个编码字符之外的字符作为特定分隔符。Redis 在通信过程中采用的就是换行分隔符 \n \r\n
  • 消息长度 + 消息内容

    • 先解析固定长度的字段获取长度,然后读取后续的内容。长度理论上有限制,需要提前预知可能的最大长度。

Netty 对三种常用封帧方式的支持

Netty中解决粘包半包的编解码叫做一次解码器。

  • 固定长度 FixedLengthFrameDecoder

  • 分隔符 DelimiterBasedFrameDecoder

  • 固定长度字段存个内容的长度信息 LengthFieldBasedFrameDecoder LengthFieldPrepender

    • LengthFieldBasedFrameDecoder按照指定的包长度偏移量对接受的数据进行编码。

    • LengthFieldPrepender 在响应数据的时候从封装数据,在数据前面增加数据的长度。

二次编解码方式

Netty内置编解码器在实际工作中需要二次编解码,比如我们业务数据是json格式,就需要将原始数据转换成用户数据

  • 在netty源码中内置了很多二次解码器java序列化、XML、JSON等等,都在codec包下。

  • 比如使用protobuf二次解码器
1
2
3
4
ch.pipeline().addLast(new ProtobufVarint32FrameDecoder());
ch.pipeline().addLast(new ProtobufDecoder(PersonOuterClass.Person.getDefaultInstance()));
ch.pipeline().addLast(new ProtobufVarint32LengthFieldPrepender());
ch.pipeline().addLast(new ProtobufEncoder());

Linux系统参数

  • 修改linux对单一进程打开的文件句柄数量限制
    • ulimit -n [xxx]尽量不要做临时的设置。

有TCP的keepalive,为什么还需要应用层的keepalive

TCP中的keepalive默认是关闭,因此探测包在传递中可能丢失,默认的超时时间太长,默认是7200+9*75秒,也就是2个多小时。TCP是一个传输层的协议,传输层的数据畅通并不一定操作系统进程所对应的服务畅通。