Browse Source

improved websocket write promise handling - issue #402

master
Robert Elek 8 years ago
parent
commit
36867d7f0c
  1. 78
      src/main/java/com/corundumstudio/socketio/handler/EncoderHandler.java

78
src/main/java/com/corundumstudio/socketio/handler/EncoderHandler.java

@ -19,11 +19,16 @@ import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
import java.io.IOException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
import java.util.Queue;
import java.util.jar.Attributes;
import java.util.jar.Manifest;
import io.netty.channel.*;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -40,12 +45,7 @@ import com.corundumstudio.socketio.protocol.PacketEncoder;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufOutputStream;
import io.netty.buffer.ByteBufUtil;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.DefaultHttpContent;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.HttpHeaderNames;
@ -223,13 +223,13 @@ public class EncoderHandler extends ChannelOutboundHandlerAdapter {
}
private void handleWebsocket(final OutPacketMessage msg, ChannelHandlerContext ctx, ChannelPromise promise) throws IOException {
ChannelFutureList writeFutureList = new ChannelFutureList();
while (true) {
Queue<Packet> queue = msg.getClientHead().getPacketsQueue(msg.getTransport());
Packet packet = queue.poll();
if (packet == null) {
if (!promise.isDone()) {
promise.trySuccess();
}
writeFutureList.setChannnelPromise(promise);
break;
}
@ -242,13 +242,8 @@ public class EncoderHandler extends ChannelOutboundHandlerAdapter {
}
if (out.isReadable()) {
if (!promise.isDone()) {
ctx.channel().writeAndFlush(res, promise);
} else {
ctx.channel().writeAndFlush(res);
}
writeFutureList.add(ctx.channel().writeAndFlush(res));
} else {
promise.trySuccess();
out.release();
}
@ -259,7 +254,7 @@ public class EncoderHandler extends ChannelOutboundHandlerAdapter {
if (log.isTraceEnabled()) {
log.trace("Out attachment: {} sessionId: {}", ByteBufUtil.hexDump(outBuf), msg.getSessionId());
}
ctx.channel().writeAndFlush(new BinaryWebSocketFrame(outBuf));
writeFutureList.add(ctx.channel().writeAndFlush(new BinaryWebSocketFrame(outBuf)));
}
}
}
@ -291,4 +286,57 @@ public class EncoderHandler extends ChannelOutboundHandlerAdapter {
}
}
/**
* Helper class for the handleWebsocket method, handles a list of ChannelFutures and
* sets the status of a promise when
* - any of the operations fail
* - all of the operations succeed
* The setChannelPromise method should be called after all the futures are added
*/
private class ChannelFutureList implements GenericFutureListener<Future<Void>> {
private List<ChannelFuture> futureList = new ArrayList<ChannelFuture>();
private ChannelPromise promise = null;
private void cleanup() {
promise = null;
for (ChannelFuture f : futureList) f.removeListener(this);
}
private void validate() {
boolean allSuccess = true;
for (ChannelFuture f : futureList) {
if (f.isDone()) {
if (!f.isSuccess()) {
promise.tryFailure(f.cause());
cleanup();
return;
}
}
else {
allSuccess = false;
}
}
if (allSuccess) {
promise.trySuccess();
cleanup();
}
}
public void add(ChannelFuture f) {
futureList.add(f);
f.addListener(this);
}
public void setChannnelPromise(ChannelPromise p) {
promise = p;
validate();
}
@Override
public void operationComplete(Future<Void> voidFuture) throws Exception {
if (promise != null) validate();
}
}
}
Loading…
Cancel
Save