From 36867d7f0c624ea07c4d6de65336e14b67891c96 Mon Sep 17 00:00:00 2001 From: Robert Elek Date: Fri, 11 Aug 2017 11:21:59 +0700 Subject: [PATCH 1/2] improved websocket write promise handling - issue #402 --- .../socketio/handler/EncoderHandler.java | 78 +++++++++++++++---- 1 file changed, 63 insertions(+), 15 deletions(-) diff --git a/src/main/java/com/corundumstudio/socketio/handler/EncoderHandler.java b/src/main/java/com/corundumstudio/socketio/handler/EncoderHandler.java index 3f124cd..9d028ff 100644 --- a/src/main/java/com/corundumstudio/socketio/handler/EncoderHandler.java +++ b/src/main/java/com/corundumstudio/socketio/handler/EncoderHandler.java @@ -19,11 +19,16 @@ import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; import java.io.IOException; import java.net.URL; +import java.util.ArrayList; import java.util.Enumeration; +import java.util.List; import java.util.Queue; import java.util.jar.Attributes; import java.util.jar.Manifest; +import io.netty.channel.*; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GenericFutureListener; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,12 +45,7 @@ import com.corundumstudio.socketio.protocol.PacketEncoder; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufOutputStream; import io.netty.buffer.ByteBufUtil; -import io.netty.channel.Channel; -import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler.Sharable; -import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.ChannelOutboundHandlerAdapter; -import io.netty.channel.ChannelPromise; import io.netty.handler.codec.http.DefaultHttpContent; import io.netty.handler.codec.http.DefaultHttpResponse; import io.netty.handler.codec.http.HttpHeaderNames; @@ -223,13 +223,13 @@ public class EncoderHandler extends ChannelOutboundHandlerAdapter { } private void handleWebsocket(final OutPacketMessage msg, ChannelHandlerContext ctx, ChannelPromise promise) throws IOException { + ChannelFutureList writeFutureList = new ChannelFutureList(); + while (true) { Queue queue = msg.getClientHead().getPacketsQueue(msg.getTransport()); Packet packet = queue.poll(); if (packet == null) { - if (!promise.isDone()) { - promise.trySuccess(); - } + writeFutureList.setChannnelPromise(promise); break; } @@ -242,13 +242,8 @@ public class EncoderHandler extends ChannelOutboundHandlerAdapter { } if (out.isReadable()) { - if (!promise.isDone()) { - ctx.channel().writeAndFlush(res, promise); - } else { - ctx.channel().writeAndFlush(res); - } + writeFutureList.add(ctx.channel().writeAndFlush(res)); } else { - promise.trySuccess(); out.release(); } @@ -259,7 +254,7 @@ public class EncoderHandler extends ChannelOutboundHandlerAdapter { if (log.isTraceEnabled()) { log.trace("Out attachment: {} sessionId: {}", ByteBufUtil.hexDump(outBuf), msg.getSessionId()); } - ctx.channel().writeAndFlush(new BinaryWebSocketFrame(outBuf)); + writeFutureList.add(ctx.channel().writeAndFlush(new BinaryWebSocketFrame(outBuf))); } } } @@ -291,4 +286,57 @@ public class EncoderHandler extends ChannelOutboundHandlerAdapter { } } + /** + * Helper class for the handleWebsocket method, handles a list of ChannelFutures and + * sets the status of a promise when + * - any of the operations fail + * - all of the operations succeed + * The setChannelPromise method should be called after all the futures are added + */ + private class ChannelFutureList implements GenericFutureListener> { + + private List futureList = new ArrayList(); + private ChannelPromise promise = null; + + private void cleanup() { + promise = null; + for (ChannelFuture f : futureList) f.removeListener(this); + } + + private void validate() { + boolean allSuccess = true; + for (ChannelFuture f : futureList) { + if (f.isDone()) { + if (!f.isSuccess()) { + promise.tryFailure(f.cause()); + cleanup(); + return; + } + } + else { + allSuccess = false; + } + } + if (allSuccess) { + promise.trySuccess(); + cleanup(); + } + } + + public void add(ChannelFuture f) { + futureList.add(f); + f.addListener(this); + } + + public void setChannnelPromise(ChannelPromise p) { + promise = p; + validate(); + } + + @Override + public void operationComplete(Future voidFuture) throws Exception { + if (promise != null) validate(); + } + } + } From 66d769c4627a2fae098169b3fb8aaa2113be705b Mon Sep 17 00:00:00 2001 From: Robert Elek Date: Fri, 11 Aug 2017 11:31:38 +0700 Subject: [PATCH 2/2] stylistic fixes, method typo, import grouping --- .../socketio/handler/EncoderHandler.java | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/corundumstudio/socketio/handler/EncoderHandler.java b/src/main/java/com/corundumstudio/socketio/handler/EncoderHandler.java index 9d028ff..ec4d8d7 100644 --- a/src/main/java/com/corundumstudio/socketio/handler/EncoderHandler.java +++ b/src/main/java/com/corundumstudio/socketio/handler/EncoderHandler.java @@ -26,7 +26,6 @@ import java.util.Queue; import java.util.jar.Attributes; import java.util.jar.Manifest; -import io.netty.channel.*; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.GenericFutureListener; import org.slf4j.Logger; @@ -45,7 +44,13 @@ import com.corundumstudio.socketio.protocol.PacketEncoder; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufOutputStream; import io.netty.buffer.ByteBufUtil; +import io.netty.channel.Channel; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler.Sharable; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelOutboundHandlerAdapter; +import io.netty.channel.ChannelPromise; import io.netty.handler.codec.http.DefaultHttpContent; import io.netty.handler.codec.http.DefaultHttpResponse; import io.netty.handler.codec.http.HttpHeaderNames; @@ -229,7 +234,7 @@ public class EncoderHandler extends ChannelOutboundHandlerAdapter { Queue queue = msg.getClientHead().getPacketsQueue(msg.getTransport()); Packet packet = queue.poll(); if (packet == null) { - writeFutureList.setChannnelPromise(promise); + writeFutureList.setChannelPromise(promise); break; } @@ -328,7 +333,7 @@ public class EncoderHandler extends ChannelOutboundHandlerAdapter { f.addListener(this); } - public void setChannnelPromise(ChannelPromise p) { + public void setChannelPromise(ChannelPromise p) { promise = p; validate(); }