Browse Source

"Forced socket disconnection" support added

master
Nikita 14 years ago
parent
commit
edd8f790c2
  1. 12
      src/main/java/com/corundumstudio/socketio/SocketIORouter.java
  2. 11
      src/main/java/com/corundumstudio/socketio/SocketIOServer.java
  3. 17
      src/main/java/com/corundumstudio/socketio/transport/XHRPollingTransport.java

12
src/main/java/com/corundumstudio/socketio/SocketIORouter.java

@ -39,8 +39,6 @@ public class SocketIORouter {
private final Logger log = LoggerFactory.getLogger(getClass()); private final Logger log = LoggerFactory.getLogger(getClass());
private int destroyBufferSize;
private int closeTimeoutSecs = 25; private int closeTimeoutSecs = 25;
private int heartbeatThreadPoolSize; private int heartbeatThreadPoolSize;
private int heartbeatTimeout; private int heartbeatTimeout;
@ -70,18 +68,8 @@ public class SocketIORouter {
heartbeatHandler = new HeartbeatHandler(heartbeatThreadPoolSize, heartbeatTimeout, heartbeatInterval); heartbeatHandler = new HeartbeatHandler(heartbeatThreadPoolSize, heartbeatTimeout, heartbeatInterval);
PacketListener packetListener = new PacketListener(socketIOHandler, this, heartbeatHandler); PacketListener packetListener = new PacketListener(socketIOHandler, this, heartbeatHandler);
xhrPollingTransport = new XHRPollingTransport(protocol, decoder, encoder, this, packetListener); xhrPollingTransport = new XHRPollingTransport(protocol, decoder, encoder, this, packetListener);
xhrPollingTransport.setDestroyBufferSize(destroyBufferSize);
} }
/**
* POST request content size limit
*
* @param destroyBufferSize - limit in bytes
*/
public void setDestroyBufferSize(int destroyBufferSize) {
this.destroyBufferSize = destroyBufferSize;
}
/** /**
* Heartbeat interval * Heartbeat interval
* *

11
src/main/java/com/corundumstudio/socketio/SocketIOServer.java

@ -28,7 +28,6 @@ public class SocketIOServer {
private final Logger log = LoggerFactory.getLogger(getClass()); private final Logger log = LoggerFactory.getLogger(getClass());
private int destroyBufferSize = 10000000;
private int heartbeatThreadPoolSize = 4; private int heartbeatThreadPoolSize = 4;
private int heartbeatTimeout = 15; private int heartbeatTimeout = 15;
private int heartbeatInterval = 20; private int heartbeatInterval = 20;
@ -53,7 +52,6 @@ public class SocketIOServer {
bootstrap = new ServerBootstrap(factory); bootstrap = new ServerBootstrap(factory);
socketIORouter = new SocketIORouter(listener, objectMapper); socketIORouter = new SocketIORouter(listener, objectMapper);
socketIORouter.setDestroyBufferSize(destroyBufferSize);
socketIORouter.setHeartbeatInterval(heartbeatInterval); socketIORouter.setHeartbeatInterval(heartbeatInterval);
socketIORouter.setHeartbeatTimeout(heartbeatTimeout); socketIORouter.setHeartbeatTimeout(heartbeatTimeout);
socketIORouter.setHeartbeatThreadPoolSize(heartbeatThreadPoolSize); socketIORouter.setHeartbeatThreadPoolSize(heartbeatThreadPoolSize);
@ -76,15 +74,6 @@ public class SocketIOServer {
this.bossThreadPoolSize = bossThreadPoolSize; this.bossThreadPoolSize = bossThreadPoolSize;
} }
/**
* POST request content size limit
*
* @param destroyBufferSize - limit in bytes
*/
public void setDestroyBufferSize(int destroyBufferSize) {
this.destroyBufferSize = destroyBufferSize;
}
/** /**
* Heartbeat interval * Heartbeat interval
* *

17
src/main/java/com/corundumstudio/socketio/transport/XHRPollingTransport.java

@ -49,7 +49,6 @@ public class XHRPollingTransport implements SocketIOTransport {
private final Map<UUID, XHRPollingClient> sessionId2Client = new ConcurrentHashMap<UUID, XHRPollingClient>(); private final Map<UUID, XHRPollingClient> sessionId2Client = new ConcurrentHashMap<UUID, XHRPollingClient>();
private int destroyBufferSize;
private final SocketIORouter socketIORouter; private final SocketIORouter socketIORouter;
private final PacketListener packetListener; private final PacketListener packetListener;
private final Decoder decoder; private final Decoder decoder;
@ -65,10 +64,6 @@ public class XHRPollingTransport implements SocketIOTransport {
this.packetListener = packetListener; this.packetListener = packetListener;
} }
public void setDestroyBufferSize(int destroyBufferSize) {
this.destroyBufferSize = destroyBufferSize;
}
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
Object msg = e.getMessage(); Object msg = e.getMessage();
if (msg instanceof HttpRequest) { if (msg instanceof HttpRequest) {
@ -85,13 +80,6 @@ public class XHRPollingTransport implements SocketIOTransport {
} }
private void onPost(QueryStringDecoder queryDecoder, Channel channel, HttpRequest msg) { private void onPost(QueryStringDecoder queryDecoder, Channel channel, HttpRequest msg) {
if (msg.getContent().readableBytes() >= destroyBufferSize) {
log.warn("Too big POST request: {} bytes, from ip: {}. Channel closed!",
new Object[] {msg.getContent().readableBytes(), channel.getRemoteAddress()});
channel.close();
return;
}
String path = queryDecoder.getPath(); String path = queryDecoder.getPath();
if (!path.startsWith(pollingPath)) { if (!path.startsWith(pollingPath)) {
log.warn("Wrong POST request path: {}, from ip: {}. Channel closed!", log.warn("Wrong POST request path: {}, from ip: {}. Channel closed!",
@ -151,6 +139,9 @@ public class XHRPollingTransport implements SocketIOTransport {
client = createClient(sessionId); client = createClient(sessionId);
} }
client.doReconnect(channel, msg); client.doReconnect(channel, msg);
if (queryDecoder.getParameters().containsKey("disconnect")) {
disconnect(sessionId);
}
} else { } else {
sendError(channel, msg, sessionId); sendError(channel, msg, sessionId);
} }
@ -181,7 +172,6 @@ public class XHRPollingTransport implements SocketIOTransport {
} }
private void sendHttpResponse(Channel channel, HttpRequest req, HttpResponse res) { private void sendHttpResponse(Channel channel, HttpRequest req, HttpResponse res) {
// Generate an error page if response status code is not OK (200).
if (res.getStatus().getCode() != 200) { if (res.getStatus().getCode() != 200) {
res.setContent( res.setContent(
ChannelBuffers.copiedBuffer( ChannelBuffers.copiedBuffer(
@ -189,7 +179,6 @@ public class XHRPollingTransport implements SocketIOTransport {
HttpHeaders.setContentLength(res, res.getContent().readableBytes()); HttpHeaders.setContentLength(res, res.getContent().readableBytes());
} }
// Send the response and close the connection if necessary.
ChannelFuture f = channel.write(res); ChannelFuture f = channel.write(res);
if (!HttpHeaders.isKeepAlive(req) || res.getStatus().getCode() != 200) { if (!HttpHeaders.isKeepAlive(req) || res.getStatus().getCode() != 200) {
f.addListener(ChannelFutureListener.CLOSE); f.addListener(ChannelFutureListener.CLOSE);

Loading…
Cancel
Save