Browse Source

Authorize and XHR transport bugs fixed

master
Nikita 12 years ago
parent
commit
212e43365c
  1. 23
      src/main/java/com/corundumstudio/socketio/SocketIOEncoder.java
  2. 1
      src/main/java/com/corundumstudio/socketio/SocketIOServer.java
  3. 34
      src/main/java/com/corundumstudio/socketio/handler/AuthorizeHandler.java
  4. 7
      src/main/java/com/corundumstudio/socketio/handler/PacketHandler.java
  5. 25
      src/main/java/com/corundumstudio/socketio/transport/FlashPolicyHandler.java
  6. 13
      src/main/java/com/corundumstudio/socketio/transport/XHRPollingTransport.java

23
src/main/java/com/corundumstudio/socketio/SocketIOEncoder.java

@ -34,6 +34,7 @@ import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.util.CharsetUtil;
@ -87,7 +88,7 @@ public class SocketIOEncoder extends ChannelOutboundHandlerAdapter implements Me
*/
public boolean tryToWrite(Channel channel) {
Channel prevVal = lastChannel.get();
return !prevVal.equals(channel)
return !channel.equals(prevVal)
&& lastChannel.compareAndSet(prevVal, channel);
}
@ -127,26 +128,32 @@ public class SocketIOEncoder extends ChannelOutboundHandlerAdapter implements Me
private void sendMessage(String origin, UUID sessionId, Channel channel,
ByteBuf message) {
HttpResponse res = new DefaultHttpResponse(HTTP_1_1, HttpResponseStatus.OK);
addHeaders(origin, res);
HttpHeaders.setContentLength(res, message.readableBytes());
HttpResponse res = createHttpResponse(origin, message);
channel.write(res);
if (log.isTraceEnabled()) {
log.trace("Out message: {} - sessionId: {}",
new Object[] { message.toString(CharsetUtil.UTF_8), sessionId });
new Object[] { message.toString(CharsetUtil.UTF_8), sessionId });
}
ChannelFuture f = channel.write(res);
channel.write(message);
ChannelFuture f = channel.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
f.addListener(ChannelFutureListener.CLOSE);
}
private void addHeaders(String origin, HttpResponse res) {
private HttpResponse createHttpResponse(String origin, ByteBuf message) {
HttpResponse res = new DefaultHttpResponse(HTTP_1_1, HttpResponseStatus.OK);
HttpHeaders.addHeader(res, CONTENT_TYPE, "text/plain; charset=UTF-8");
HttpHeaders.addHeader(res, CONNECTION, KEEP_ALIVE);
if (origin != null) {
HttpHeaders.addHeader(res, ACCESS_CONTROL_ALLOW_ORIGIN, origin);
HttpHeaders.addHeader(res, ACCESS_CONTROL_ALLOW_CREDENTIALS, "true");
}
HttpHeaders.setContentLength(res, message.readableBytes());
return res;
}
@Override
@ -176,7 +183,7 @@ public class SocketIOEncoder extends ChannelOutboundHandlerAdapter implements Me
@Override
public void handle(XHROutMessage xhrPostMessage, Channel channel) {
sendMessage(xhrPostMessage.getOrigin(), null, channel, channel.alloc().buffer(0, 0));
sendMessage(xhrPostMessage.getOrigin(), xhrPostMessage.getSessionId(), channel, channel.alloc().buffer(0, 0));
}
@Override

1
src/main/java/com/corundumstudio/socketio/SocketIOServer.java

@ -96,6 +96,7 @@ public class SocketIOServer implements ClientListeners {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
pipelineFactory.start(configCopy, namespacesHub);
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.option(ChannelOption.TCP_NODELAY, true)

34
src/main/java/com/corundumstudio/socketio/handler/AuthorizeHandler.java

@ -21,6 +21,7 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.FullHttpRequest;
@ -55,7 +56,7 @@ import com.corundumstudio.socketio.scheduler.SchedulerKey.Type;
import com.corundumstudio.socketio.transport.BaseClient;
@Sharable
public class AuthorizeHandler extends SimpleChannelInboundHandler<FullHttpRequest> implements Disconnectable {
public class AuthorizeHandler extends ChannelInboundHandlerAdapter implements Disconnectable {
private final Logger log = LoggerFactory.getLogger(getClass());
@ -75,20 +76,25 @@ public class AuthorizeHandler extends SimpleChannelInboundHandler<FullHttpReques
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, FullHttpRequest msg) throws Exception {
HttpRequest req = (HttpRequest) msg;
Channel channel = ctx.channel();
QueryStringDecoder queryDecoder = new QueryStringDecoder(req.getUri());
if (!configuration.isAllowCustomRequests()
&& !queryDecoder.path().startsWith(connectPath)) {
HttpResponse res = new DefaultHttpResponse(HTTP_1_1, HttpResponseStatus.BAD_REQUEST);
ChannelFuture f = channel.write(res);
f.addListener(ChannelFutureListener.CLOSE);
}
if (queryDecoder.path().equals(connectPath)) {
String origin = req.headers().get(HttpHeaders.Names.ORIGIN);
authorize(channel, origin, queryDecoder.parameters());
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof HttpRequest) {
HttpRequest req = (HttpRequest) msg;
Channel channel = ctx.channel();
QueryStringDecoder queryDecoder = new QueryStringDecoder(req.getUri());
if (!configuration.isAllowCustomRequests()
&& !queryDecoder.path().startsWith(connectPath)) {
HttpResponse res = new DefaultHttpResponse(HTTP_1_1, HttpResponseStatus.BAD_REQUEST);
ChannelFuture f = channel.write(res);
f.addListener(ChannelFutureListener.CLOSE);
return;
}
if (queryDecoder.path().equals(connectPath)) {
String origin = req.headers().get(HttpHeaders.Names.ORIGIN);
authorize(channel, origin, queryDecoder.parameters());
return;
}
}
ctx.fireChannelRead(msg);
}
private void authorize(Channel channel, String origin, Map<String, List<String>> params)

7
src/main/java/com/corundumstudio/socketio/handler/PacketHandler.java

@ -50,9 +50,8 @@ public class PacketHandler extends SimpleChannelInboundHandler<PacketsMessage> {
}
@Override
protected void channelRead0(io.netty.channel.ChannelHandlerContext ctx, PacketsMessage msg)
throws Exception {
PacketsMessage message = (PacketsMessage) msg;
protected void channelRead0(io.netty.channel.ChannelHandlerContext ctx, PacketsMessage message)
throws Exception {
ByteBuf content = message.getContent();
BaseClient client = message.getClient();
@ -75,7 +74,7 @@ public class PacketHandler extends SimpleChannelInboundHandler<PacketsMessage> {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable e) throws Exception {
log.error("Exception occurs", e.getCause());
log.error("Exception occurs", e);
}
}

25
src/main/java/com/corundumstudio/socketio/transport/FlashPolicyHandler.java

@ -19,15 +19,13 @@ import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
import io.netty.channel.ChannelHandler.Sharable;
import com.corundumstudio.socketio.SocketIOPipelineFactory;
@Sharable
public class FlashPolicyHandler extends SimpleChannelInboundHandler<ByteBuf> {
public class FlashPolicyHandler extends ChannelInboundHandlerAdapter {
private final ByteBuf requestBuffer = Unpooled.copiedBuffer("<policy-file-request/>", CharsetUtil.UTF_8);
private final ByteBuf responseBuffer = Unpooled.copiedBuffer(
@ -39,14 +37,17 @@ public class FlashPolicyHandler extends SimpleChannelInboundHandler<ByteBuf> {
+ "</cross-domain-policy>", CharsetUtil.UTF_8);
@Override
protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception {
ByteBuf data = msg.slice(0, requestBuffer.readableBytes());
if (data.equals(requestBuffer)) {
ChannelFuture f = ctx.write(responseBuffer);
f.addListener(ChannelFutureListener.CLOSE);
return;
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
ByteBuf message = (ByteBuf) msg;
ByteBuf data = message.slice(0, requestBuffer.readableBytes());
if (data.equals(requestBuffer)) {
ChannelFuture f = ctx.write(responseBuffer);
f.addListener(ChannelFutureListener.CLOSE);
return;
}
ctx.pipeline().remove(this);
}
ctx.pipeline().remove(SocketIOPipelineFactory.FLASH_POLICY_HANDLER);
ctx.fireChannelRead(msg);
}

13
src/main/java/com/corundumstudio/socketio/transport/XHRPollingTransport.java

@ -104,7 +104,7 @@ public class XHRPollingTransport extends BaseTransport {
if (queryDecoder.parameters().containsKey("disconnect")) {
BaseClient client = sessionId2Client.get(sessionId);
client.onChannelDisconnect();
ctx.write(new XHROutMessage(origin));
ctx.channel().write(new XHROutMessage(origin, sessionId));
} else if (HttpMethod.POST.equals(req.getMethod())) {
onPost(sessionId, ctx, origin, req.content());
} else if (HttpMethod.GET.equals(req.getMethod())) {
@ -113,7 +113,7 @@ public class XHRPollingTransport extends BaseTransport {
} else {
log.warn("Wrong {} method request path: {}, from ip: {}. Channel closed!",
new Object[] {req.getMethod(), path, ctx.channel().remoteAddress()});
ctx.close();
ctx.channel().close();
}
}
@ -157,12 +157,13 @@ public class XHRPollingTransport extends BaseTransport {
XHRPollingClient client = sessionId2Client.get(sessionId);
if (client == null) {
log.debug("Client with sessionId: {} was already disconnected. Channel closed!", sessionId);
ctx.close();
ctx.channel().close();
return;
}
ctx.write(new XHROutMessage(origin));
ctx.fireChannelRead(new PacketsMessage(client, content));
// release POST response before message processing
ctx.channel().writeAndFlush(new XHROutMessage(origin, sessionId));
ctx.pipeline().fireChannelRead(new PacketsMessage(client, content));
}
private void onGet(UUID sessionId, ChannelHandlerContext ctx, String origin) {
@ -198,7 +199,7 @@ public class XHRPollingTransport extends BaseTransport {
Packet packet = new Packet(PacketType.ERROR);
packet.setReason(ErrorReason.CLIENT_NOT_HANDSHAKEN);
packet.setAdvice(ErrorAdvice.RECONNECT);
ctx.write(new XHRErrorMessage(packet, origin));
ctx.channel().write(new XHRErrorMessage(packet, origin));
}
@Override

Loading…
Cancel
Save