Browse Source

Merge pull request #472 from robymus/master

WebSocket encoder, improved promise handling
master
Nikita Koksharov 8 years ago
committed by GitHub
parent
commit
78fbe63dd2
  1. 73
      src/main/java/com/corundumstudio/socketio/handler/EncoderHandler.java

73
src/main/java/com/corundumstudio/socketio/handler/EncoderHandler.java

@ -19,11 +19,15 @@ import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
import java.io.IOException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
import java.util.Queue;
import java.util.jar.Attributes;
import java.util.jar.Manifest;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -41,6 +45,7 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufOutputStream;
import io.netty.buffer.ByteBufUtil;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
@ -223,13 +228,13 @@ public class EncoderHandler extends ChannelOutboundHandlerAdapter {
}
private void handleWebsocket(final OutPacketMessage msg, ChannelHandlerContext ctx, ChannelPromise promise) throws IOException {
ChannelFutureList writeFutureList = new ChannelFutureList();
while (true) {
Queue<Packet> queue = msg.getClientHead().getPacketsQueue(msg.getTransport());
Packet packet = queue.poll();
if (packet == null) {
if (!promise.isDone()) {
promise.trySuccess();
}
writeFutureList.setChannelPromise(promise);
break;
}
@ -242,13 +247,8 @@ public class EncoderHandler extends ChannelOutboundHandlerAdapter {
}
if (out.isReadable()) {
if (!promise.isDone()) {
ctx.channel().writeAndFlush(res, promise);
} else {
ctx.channel().writeAndFlush(res);
}
writeFutureList.add(ctx.channel().writeAndFlush(res));
} else {
promise.trySuccess();
out.release();
}
@ -259,7 +259,7 @@ public class EncoderHandler extends ChannelOutboundHandlerAdapter {
if (log.isTraceEnabled()) {
log.trace("Out attachment: {} sessionId: {}", ByteBufUtil.hexDump(outBuf), msg.getSessionId());
}
ctx.channel().writeAndFlush(new BinaryWebSocketFrame(outBuf));
writeFutureList.add(ctx.channel().writeAndFlush(new BinaryWebSocketFrame(outBuf)));
}
}
}
@ -291,4 +291,57 @@ public class EncoderHandler extends ChannelOutboundHandlerAdapter {
}
}
/**
* Helper class for the handleWebsocket method, handles a list of ChannelFutures and
* sets the status of a promise when
* - any of the operations fail
* - all of the operations succeed
* The setChannelPromise method should be called after all the futures are added
*/
private class ChannelFutureList implements GenericFutureListener<Future<Void>> {
private List<ChannelFuture> futureList = new ArrayList<ChannelFuture>();
private ChannelPromise promise = null;
private void cleanup() {
promise = null;
for (ChannelFuture f : futureList) f.removeListener(this);
}
private void validate() {
boolean allSuccess = true;
for (ChannelFuture f : futureList) {
if (f.isDone()) {
if (!f.isSuccess()) {
promise.tryFailure(f.cause());
cleanup();
return;
}
}
else {
allSuccess = false;
}
}
if (allSuccess) {
promise.trySuccess();
cleanup();
}
}
public void add(ChannelFuture f) {
futureList.add(f);
f.addListener(this);
}
public void setChannelPromise(ChannelPromise p) {
promise = p;
validate();
}
@Override
public void operationComplete(Future<Void> voidFuture) throws Exception {
if (promise != null) validate();
}
}
}
Loading…
Cancel
Save