Browse Source

DecoderBufferSize added, some warns logging added

master
Nikita 14 years ago
parent
commit
57a1f924a9
  1. 1
      README.md
  2. 12
      src/main/java/com/corundumstudio/socketio/SocketIORouter.java
  3. 11
      src/main/java/com/corundumstudio/socketio/SocketIOServer.java
  4. 32
      src/main/java/com/corundumstudio/socketio/transport/XHRPollingTransport.java

1
README.md

@ -30,6 +30,5 @@ Currently it supports only xhr-polling transport.
SocketIOServer server = new SocketIOServer(); SocketIOServer server = new SocketIOServer();
server.setHostname("localhost"); server.setHostname("localhost");
server.setPort(81); server.setPort(81);
server.setObjectMapper(createObjectMapper());
server.setListener(handler); server.setListener(handler);
server.start(); server.start();

12
src/main/java/com/corundumstudio/socketio/SocketIORouter.java

@ -39,6 +39,8 @@ public class SocketIORouter {
private final Logger log = LoggerFactory.getLogger(getClass()); private final Logger log = LoggerFactory.getLogger(getClass());
private int destroyBufferSize;
private int closeTimeoutSecs = 25; private int closeTimeoutSecs = 25;
private int heartbeatThreadPoolSize; private int heartbeatThreadPoolSize;
private int heartbeatTimeout; private int heartbeatTimeout;
@ -68,6 +70,16 @@ public class SocketIORouter {
heartbeatHandler = new HeartbeatHandler(heartbeatThreadPoolSize, heartbeatTimeout, heartbeatInterval); heartbeatHandler = new HeartbeatHandler(heartbeatThreadPoolSize, heartbeatTimeout, heartbeatInterval);
PacketListener packetListener = new PacketListener(socketIOHandler, this, heartbeatHandler); PacketListener packetListener = new PacketListener(socketIOHandler, this, heartbeatHandler);
xhrPollingTransport = new XHRPollingTransport(protocol, decoder, encoder, this, packetListener); xhrPollingTransport = new XHRPollingTransport(protocol, decoder, encoder, this, packetListener);
xhrPollingTransport.setDestroyBufferSize(destroyBufferSize);
}
/**
* POST request content size limit
*
* @param destroyBufferSize - limit in bytes
*/
public void setDestroyBufferSize(int destroyBufferSize) {
this.destroyBufferSize = destroyBufferSize;
} }
/** /**

11
src/main/java/com/corundumstudio/socketio/SocketIOServer.java

@ -28,6 +28,7 @@ public class SocketIOServer {
private final Logger log = LoggerFactory.getLogger(getClass()); private final Logger log = LoggerFactory.getLogger(getClass());
private int destroyBufferSize = 10000000;
private int heartbeatThreadPoolSize = 4; private int heartbeatThreadPoolSize = 4;
private int heartbeatTimeout = 15; private int heartbeatTimeout = 15;
private int heartbeatInterval = 20; private int heartbeatInterval = 20;
@ -52,6 +53,7 @@ public class SocketIOServer {
bootstrap = new ServerBootstrap(factory); bootstrap = new ServerBootstrap(factory);
socketIORouter = new SocketIORouter(listener, objectMapper); socketIORouter = new SocketIORouter(listener, objectMapper);
socketIORouter.setDestroyBufferSize(destroyBufferSize);
socketIORouter.setHeartbeatInterval(heartbeatInterval); socketIORouter.setHeartbeatInterval(heartbeatInterval);
socketIORouter.setHeartbeatTimeout(heartbeatTimeout); socketIORouter.setHeartbeatTimeout(heartbeatTimeout);
socketIORouter.setHeartbeatThreadPoolSize(heartbeatThreadPoolSize); socketIORouter.setHeartbeatThreadPoolSize(heartbeatThreadPoolSize);
@ -73,6 +75,15 @@ public class SocketIOServer {
public void setBossThreadPoolSize(int bossThreadPoolSize) { public void setBossThreadPoolSize(int bossThreadPoolSize) {
this.bossThreadPoolSize = bossThreadPoolSize; this.bossThreadPoolSize = bossThreadPoolSize;
} }
/**
* POST request content size limit
*
* @param destroyBufferSize - limit in bytes
*/
public void setDestroyBufferSize(int destroyBufferSize) {
this.destroyBufferSize = destroyBufferSize;
}
/** /**
* Heartbeat interval * Heartbeat interval

32
src/main/java/com/corundumstudio/socketio/transport/XHRPollingTransport.java

@ -49,6 +49,7 @@ public class XHRPollingTransport implements SocketIOTransport {
private final Map<UUID, XHRPollingClient> sessionId2Client = new ConcurrentHashMap<UUID, XHRPollingClient>(); private final Map<UUID, XHRPollingClient> sessionId2Client = new ConcurrentHashMap<UUID, XHRPollingClient>();
private int destroyBufferSize;
private final SocketIORouter socketIORouter; private final SocketIORouter socketIORouter;
private final PacketListener packetListener; private final PacketListener packetListener;
private final Decoder decoder; private final Decoder decoder;
@ -64,6 +65,10 @@ public class XHRPollingTransport implements SocketIOTransport {
this.packetListener = packetListener; this.packetListener = packetListener;
} }
public void setDestroyBufferSize(int destroyBufferSize) {
this.destroyBufferSize = destroyBufferSize;
}
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
Object msg = e.getMessage(); Object msg = e.getMessage();
if (msg instanceof HttpRequest) { if (msg instanceof HttpRequest) {
@ -80,18 +85,31 @@ public class XHRPollingTransport implements SocketIOTransport {
} }
private void onPost(QueryStringDecoder queryDecoder, Channel channel, HttpRequest msg) throws IOException { private void onPost(QueryStringDecoder queryDecoder, Channel channel, HttpRequest msg) throws IOException {
if (msg.getContent().readableBytes() >= destroyBufferSize) {
log.warn("Too big POST request: {} bytes, from ip: {}. Channel closed!",
new Object[] {msg.getContent().readableBytes(), channel.getRemoteAddress()});
channel.close();
return;
}
String path = queryDecoder.getPath(); String path = queryDecoder.getPath();
if (!path.startsWith(pollingPath)) { if (!path.startsWith(pollingPath)) {
log.warn("Wrong POST request path: {}, from ip: {}. Channel closed!",
new Object[] {path, channel.getRemoteAddress()});
channel.close();
return; return;
} }
String[] parts = path.split("/"); String[] parts = path.split("/");
if (parts.length > 3) { if (parts.length > 3) {
UUID sessionId = UUID.fromString(parts[4]); UUID sessionId = UUID.fromString(parts[4]);
XHRPollingClient client = sessionId2Client.get(sessionId); XHRPollingClient client = sessionId2Client.get(sessionId);
if (client == null) { if (client == null) {
// client was disconnected
log.debug("Client with sessionId: {} was already disconnected. Channel closed!", sessionId);
channel.close();
return; return;
} }
String content = msg.getContent().toString(CharsetUtil.UTF_8); String content = msg.getContent().toString(CharsetUtil.UTF_8);
log.trace("Request content: {}", content); log.trace("Request content: {}", content);
List<Packet> packets = decoder.decodePayload(content); List<Packet> packets = decoder.decodePayload(content);
@ -104,14 +122,22 @@ public class XHRPollingTransport implements SocketIOTransport {
HttpResponse resp = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK); HttpResponse resp = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
resp.addHeader("Access-Control-Allow-Origin", "*"); resp.addHeader("Access-Control-Allow-Origin", "*");
sendHttpResponse(channel, msg, resp); sendHttpResponse(channel, msg, resp);
} else {
log.warn("Wrong POST request path: {}, from ip: {}. Channel closed!",
new Object[] {path, channel.getRemoteAddress()});
channel.close();
} }
} }
private void onGet(QueryStringDecoder queryDecoder, Channel channel, HttpRequest msg) throws IOException { private void onGet(QueryStringDecoder queryDecoder, Channel channel, HttpRequest msg) throws IOException {
String path = queryDecoder.getPath(); String path = queryDecoder.getPath();
if (!path.startsWith(pollingPath)) { if (!path.startsWith(pollingPath)) {
log.warn("Wrong GET request path: {}, from ip: {}. Channel closed!",
new Object[] {path, channel.getRemoteAddress()});
channel.close();
return; return;
} }
String[] parts = path.split("/"); String[] parts = path.split("/");
if (parts.length > 3) { if (parts.length > 3) {
UUID sessionId = UUID.fromString(parts[4]); UUID sessionId = UUID.fromString(parts[4]);
@ -124,6 +150,10 @@ public class XHRPollingTransport implements SocketIOTransport {
} else { } else {
sendError(channel, msg, sessionId); sendError(channel, msg, sessionId);
} }
} else {
log.warn("Wrong GET request path: {}, from ip: {}. Channel closed!",
new Object[] {path, channel.getRemoteAddress()});
channel.close();
} }
} }

Loading…
Cancel
Save