Browse Source

Packet channelPromise handling fixed. #263

master
Nikita 10 years ago
parent
commit
ef66c35288
  1. 17
      src/main/java/com/corundumstudio/socketio/handler/ClientHead.java
  2. 38
      src/main/java/com/corundumstudio/socketio/handler/EncoderHandler.java
  3. 4
      src/main/java/com/corundumstudio/socketio/messages/OutPacketMessage.java

17
src/main/java/com/corundumstudio/socketio/handler/ClientHead.java

@ -15,13 +15,6 @@
*/
package com.corundumstudio.socketio.handler;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.util.AttributeKey;
import io.netty.util.internal.PlatformDependent;
import java.net.SocketAddress;
import java.util.HashMap;
import java.util.Map;
@ -29,8 +22,6 @@ import java.util.Map.Entry;
import java.util.Queue;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
@ -53,6 +44,13 @@ import com.corundumstudio.socketio.store.Store;
import com.corundumstudio.socketio.store.StoreFactory;
import com.corundumstudio.socketio.transport.NamespaceClient;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.util.AttributeKey;
import io.netty.util.internal.PlatformDependent;
public class ClientHead {
private final Logger log = LoggerFactory.getLogger(getClass());
@ -147,7 +145,6 @@ public class ClientHead {
}
private ChannelFuture sendPackets(Transport transport, Channel channel) {
// TODO promise handling
return channel.writeAndFlush(new OutPacketMessage(this, transport));
}

38
src/main/java/com/corundumstudio/socketio/handler/EncoderHandler.java

@ -109,7 +109,7 @@ public class EncoderHandler extends ChannelOutboundHandlerAdapter {
}
}
private void write(XHROptionsMessage msg, ChannelHandlerContext ctx) {
private void write(XHROptionsMessage msg, ChannelHandlerContext ctx, ChannelPromise promise) {
HttpResponse res = new DefaultHttpResponse(HTTP_1_1, HttpResponseStatus.OK);
HttpHeaders.addHeader(res, "Set-Cookie", "io=" + msg.getSessionId());
@ -118,16 +118,16 @@ public class EncoderHandler extends ChannelOutboundHandlerAdapter {
addOriginHeaders(ctx.channel(), res);
ByteBuf out = encoder.allocateBuffer(ctx.alloc());
sendMessage(msg, ctx.channel(), out, res);
sendMessage(msg, ctx.channel(), out, res, promise);
}
private void write(XHRPostMessage msg, ChannelHandlerContext ctx) {
private void write(XHRPostMessage msg, ChannelHandlerContext ctx, ChannelPromise promise) {
ByteBuf out = encoder.allocateBuffer(ctx.alloc());
out.writeBytes(OK);
sendMessage(msg, ctx.channel(), out, "text/html");
sendMessage(msg, ctx.channel(), out, "text/html", promise);
}
private void sendMessage(HttpMessage msg, Channel channel, ByteBuf out, String type) {
private void sendMessage(HttpMessage msg, Channel channel, ByteBuf out, String type, ChannelPromise promise) {
HttpResponse res = new DefaultHttpResponse(HTTP_1_1, HttpResponseStatus.OK);
res.headers().add(CONTENT_TYPE, type).add("Set-Cookie", "io=" + msg.getSessionId())
@ -143,10 +143,10 @@ public class EncoderHandler extends ChannelOutboundHandlerAdapter {
res.headers().add("X-XSS-Protection", "0");
}
sendMessage(msg, channel, out, res);
sendMessage(msg, channel, out, res, promise);
}
private void sendMessage(HttpMessage msg, Channel channel, ByteBuf out, HttpResponse res) {
private void sendMessage(HttpMessage msg, Channel channel, ByteBuf out, HttpResponse res, ChannelPromise promise) {
channel.write(res);
if (log.isTraceEnabled()) {
@ -159,7 +159,7 @@ public class EncoderHandler extends ChannelOutboundHandlerAdapter {
out.release();
}
channel.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT).addListener(ChannelFutureListener.CLOSE);
channel.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT, promise).addListener(ChannelFutureListener.CLOSE);
}
private void addOriginHeaders(Channel channel, HttpResponse res) {
@ -191,23 +191,24 @@ public class EncoderHandler extends ChannelOutboundHandlerAdapter {
if (msg instanceof OutPacketMessage) {
OutPacketMessage m = (OutPacketMessage) msg;
if (m.getTransport() == Transport.WEBSOCKET) {
handleWebsocket((OutPacketMessage) msg, ctx);
handleWebsocket((OutPacketMessage) msg, ctx, promise);
}
if (m.getTransport() == Transport.POLLING) {
handleHTTP((OutPacketMessage) msg, ctx);
handleHTTP((OutPacketMessage) msg, ctx, promise);
}
} else if (msg instanceof XHROptionsMessage) {
write((XHROptionsMessage) msg, ctx);
write((XHROptionsMessage) msg, ctx, promise);
} else if (msg instanceof XHRPostMessage) {
write((XHRPostMessage) msg, ctx);
write((XHRPostMessage) msg, ctx, promise);
}
}
private void handleWebsocket(final OutPacketMessage msg, ChannelHandlerContext ctx) throws IOException {
private void handleWebsocket(final OutPacketMessage msg, ChannelHandlerContext ctx, ChannelPromise promise) throws IOException {
while (true) {
Queue<Packet> queue = msg.getClientHead().getPacketsQueue(msg.getTransport());
Packet packet = queue.poll();
if (packet == null) {
promise.setSuccess();
break;
}
@ -218,9 +219,11 @@ public class EncoderHandler extends ChannelOutboundHandlerAdapter {
if (log.isTraceEnabled()) {
log.trace("Out message: {} sessionId: {}", out.toString(CharsetUtil.UTF_8), msg.getSessionId());
}
if (out.isReadable()) {
ctx.channel().writeAndFlush(res);
ctx.channel().writeAndFlush(res, promise);
} else {
promise.setSuccess();
out.release();
}
@ -236,13 +239,14 @@ public class EncoderHandler extends ChannelOutboundHandlerAdapter {
}
}
private void handleHTTP(OutPacketMessage msg, ChannelHandlerContext ctx) throws IOException {
private void handleHTTP(OutPacketMessage msg, ChannelHandlerContext ctx, ChannelPromise promise) throws IOException {
Channel channel = ctx.channel();
Attribute<Boolean> attr = channel.attr(WRITE_ONCE);
Queue<Packet> queue = msg.getClientHead().getPacketsQueue(msg.getTransport());
if (!channel.isActive() || queue.isEmpty() || !attr.compareAndSet(null, true)) {
promise.setSuccess();
return;
}
@ -255,10 +259,10 @@ public class EncoderHandler extends ChannelOutboundHandlerAdapter {
if (jsonpIndex == null) {
type = "text/plain";
}
sendMessage(msg, channel, out, type);
sendMessage(msg, channel, out, type, promise);
} else {
encoder.encodePackets(queue, out, ctx.alloc(), 50);
sendMessage(msg, channel, out, "application/octet-stream");
sendMessage(msg, channel, out, "application/octet-stream", promise);
}
}

4
src/main/java/com/corundumstudio/socketio/messages/OutPacketMessage.java

@ -20,8 +20,8 @@ import com.corundumstudio.socketio.handler.ClientHead;
public class OutPacketMessage extends HttpMessage {
ClientHead clientHead;
Transport transport;
private final ClientHead clientHead;
private final Transport transport;
public OutPacketMessage(ClientHead clientHead, Transport transport) {
super(clientHead.getOrigin(), clientHead.getSessionId());

Loading…
Cancel
Save