netty analyst(2)-possible OOM for write

把netty引入项目后,检索了各种坑或者常见可能错误用法以避免犯错,其中不少文章提到:netty在响应请求端缓慢时,有可能OOM,例如文章http://www.wtoutiao.com/a/2331320.html。
鉴于之前没有深入研究netty的代码,所以不敢冒然使用各种文章提到的解决方案:(1)没有实际测试,所以不是100%肯定会出现;(2)各种文章提及的方案各有利弊,同时涉及到一些参数的设置,不言而喻,一旦有设置的需要,就有设置合理或者不合理的情况。总之,用的好锦上添花,用的不好适得其反。
首先翻阅源码查看到底会不会产生:

一 问题产生的原因:

首先netty的写入过程:可以划分为两个基本步骤:(1)write(2)flush.
(1) write的过程是将“数据请求”添加到ChannelOutboundBuffer,这个buffer是和每个socket具体绑定的。“数据请求”采用链的方式一一相接,在添加时候并无容量控制。

public void addMessage(Object msg, int size, ChannelPromise promise) {
Entry entry = Entry.newInstance(msg, size, total(msg), promise);
if (tailEntry == null) {
flushedEntry = null;
tailEntry = entry;
} else {
Entry tail = tailEntry;
tail.next = entry;
tailEntry = entry;
}
if (unflushedEntry == null) {
unflushedEntry = entry;
}

(2) flush:flush是将ChannelOutboundBuffer中的一批数据请求拿出来消费,即拷入socket的sendbuffer。能写入多少数据,则移除多少“数据请求”, 如果没有写完,并不会移除ChannelOutboundBuffer种的数据。

io.netty.channel.socket.nio.NioSocketChannel.doWrite(ChannelOutboundBuffer)

io.netty.channel.ChannelOutboundBuffer.removeBytes(long)

public void removeBytes(long writtenBytes) {
for (;;) {
Object msg = current();
if (!(msg instanceof ByteBuf)) {
assert writtenBytes == 0;
break;
}

final ByteBuf buf = (ByteBuf) msg;
final int readerIndex = buf.readerIndex();
final int readableBytes = buf.writerIndex() - readerIndex;

if (readableBytes <= writtenBytes) { if (writtenBytes != 0) { progress(readableBytes); writtenBytes -= readableBytes; } remove(); //remove the flush one. } else { // readableBytes > writtenBytes
if (writtenBytes != 0) {
buf.readerIndex(readerIndex + (int) writtenBytes);
progress(writtenBytes);
}
break;
}
}
clearNioBuffers();
}

当对端处理repsonse缓慢时,对端的received buffer会不断减小,此时TCP滑动窗口可发送size会慢慢变小,最终当size变为0时,send buffer会不断增大至full。此时如果继续flush,会出现2种情况:
(1) 如果采用同步方式,则线程阻塞挂起;
(2) 如果采用异步方式,则线程继续前行,写的函数返回0表示没有写入任何数据。
不言而喻,Netty使用的是异步方式。所以IO线程不会阻塞,而线程不阻塞则让write可以继续执行,最终数据积累越来越多以致OOM。

二 问题的重现

编写只写不读客户端,持续发送数据。观察heap变化

Socket socket = new Socket("10.224.2.116", 7676);
while(true){
socket.getOutputStream().write("FORMAT DATA".getBytes());
}

Java VisualVM

Java VisualVM2

Java VisualVM3

三 问题的解决

解决方式的基础:

netty提供了写的高低水位线,实现很简单,当写的请求的size超过高水位线时,设置unwritable,当水位线降低至低水位线时,设置writable.这里有2个地方要注意:
(1)请求的size不是请求的数目,而是请求的内容的size:这点必须如此,因为使用请求的数目,则忽略了请求的内容大小,在请求数目小时,占用的内容随着请求内容的变化而变化。
(2)当水位线降低至高水位线时,不是可写状态,因为很明显,如果仅仅用一个水位线来控制是否可读,则可能导致可写状态不断变化,并没有真正起到控制的作用。

private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
if (size == 0) {
return;
}

long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
if (newWriteBufferSize >= channel.config().getWriteBufferHighWaterMark()) {
setUnwritable(invokeLater);
}
}

private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) {
if (size == 0) {
return;
}

long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
if (notifyWritability && (newWriteBufferSize == 0
|| newWriteBufferSize <= channel.config().getWriteBufferLowWaterMark())) {
setWritable(invokeLater);
}
}
serverBootstrap.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 4*1024*1024);
serverBootstrap.childOption(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, 2*1024*1024);

解决方案的选择:

利用读写水位线。可以做两件事情:

(1) 第一是监控水位线的变化

@Override
public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception{
LOGGER.info(format(ctx, "channelWritabilityChanged() to:"+ctx.channel().isWritable()));
super.channelWritabilityChanged(ctx);
}

(2) 第二是控制读写。

控制读写是避免OOM的根本措施,可以采用如下几种方式:

a 直接丢弃,这种方式对于延时要求比较高的服务器比较合适,因为当积累到一定程度,响应时间必然延长以致对方timeout或者已经失去意义;
if(!item.getChannelHandlerContext().channel().isWritable()){
LOGGER.warn(“[tcp][handle][omit]response:”+item.getResponse().toString());
return;
}

b 关闭自动读,这种方式将读关闭,以让请求发送者减缓速度最终至停发。如果双方有自己的keepalive时,必然让双方连接断。这种方式本质是将难题交给了请求发送方;

channel().config().setAutoRead(false)

c 重试直至成功,这种方式等于把线程占用,如果每个连接都出现这种情况,则最终无线程可用。如果是工作线程,则后续发生取决于server自己具体的实现(基本划分2类,丢弃,让主线程即IO线程执行行最终占用IO线程);如果是IO线程,则将影响其他正常连接。
参考http://normanmaurer.me/presentations/2014-twitter-meetup-netty/slides.html#09.0

while(needsToWrite && channel.isWritable()) { channel.writeAndFlush(createMessage()); }

问题的影响

这种情况一般不多见,因为存在三种情况:

(1) 客户端接受的处理速度不会远低于发送请求的速度;
(2) 双方一般都有keepalive, 几秒没有数据写到客户端,双方就会断掉连接;
(3) 服务器不面对百万连接时,对于少数连接很难积累起来以致OOM。

所以考察服务器处理的并发连接,如果仅面临几个连接,可以将读写水位线调大点,但是如果面对N多连接,则相应调整小点。同时也要考虑读写水位线非常大也无必要,因为即使网络恢复或者对端恢复速度,最终处理到的响应是否早已失去意义。

发布者

傅, 健

程序员,Java、C++、开源爱好者. About me

发表评论