Browse Source

Several bug fixed

master
Nikita 13 years ago
parent
commit
ace0c1c803
  1. 8
      src/main/java/com/corundumstudio/socketio/HeartbeatHandler.java
  2. 7
      src/main/java/com/corundumstudio/socketio/SocketIORouter.java
  3. 18
      src/main/java/com/corundumstudio/socketio/SocketIOServer.java
  4. 6
      src/main/java/com/corundumstudio/socketio/transport/XHRPollingTransport.java

8
src/main/java/com/corundumstudio/socketio/HeartbeatHandler.java

@ -33,15 +33,19 @@ public class HeartbeatHandler {
private final Logger log = LoggerFactory.getLogger(getClass());
// 'heartbeatDiff' needed to send heartbeat reply before client timeout occures
// (because client heartbeat == server heartbeat,)
private final int heartbeatIntervalDiffSecs;
private final int heartbeatIntervalSecs;
private final int heartbeatTimeoutSecs;
private final ScheduledExecutorService executorService;
private final Map<UUID, Future<?>> scheduledHeartbeatFutures = new ConcurrentHashMap<UUID, Future<?>>();
public HeartbeatHandler(int threadPoolSize, int heartbeatTimeoutSecs, int heartbeatIntervalSecs) {
public HeartbeatHandler(int threadPoolSize, int heartbeatTimeoutSecs, int heartbeatIntervalSecs, int heartbeatIntervalDiffSecs) {
this.executorService = Executors.newScheduledThreadPool(threadPoolSize);
this.heartbeatIntervalSecs = heartbeatIntervalSecs;
this.heartbeatIntervalDiffSecs = heartbeatIntervalDiffSecs;
this.heartbeatTimeoutSecs = heartbeatTimeoutSecs;
}
@ -52,7 +56,7 @@ public class HeartbeatHandler {
public void run() {
sendHeartbeat(client);
}
}, heartbeatIntervalSecs, TimeUnit.SECONDS);
}, heartbeatIntervalSecs-heartbeatIntervalDiffSecs, TimeUnit.SECONDS);
}
public void cancelHeartbeatCheck(SocketIOClient client) {

7
src/main/java/com/corundumstudio/socketio/SocketIORouter.java

@ -48,6 +48,7 @@ public class SocketIORouter {
private int heartbeatThreadPoolSize;
private int heartbeatTimeout;
private int heartbeatInterval;
private int heartbeatIntervalDiff;
private final int protocol = 1;
private final String connectPath = "/socket.io/" + protocol + "/";
@ -70,11 +71,15 @@ public class SocketIORouter {
}
public void start() {
heartbeatHandler = new HeartbeatHandler(heartbeatThreadPoolSize, heartbeatTimeout, heartbeatInterval);
heartbeatHandler = new HeartbeatHandler(heartbeatThreadPoolSize, heartbeatTimeout, heartbeatInterval, heartbeatIntervalDiff);
PacketListener packetListener = new PacketListener(socketIOHandler, this, heartbeatHandler);
xhrPollingTransport = new XHRPollingTransport(protocol, decoder, encoder, this, packetListener);
}
public void setHeartbeatIntervalDiff(int heartbeatIntervalDiff) {
this.heartbeatIntervalDiff = heartbeatIntervalDiff;
}
/**
* Heartbeat interval
*

18
src/main/java/com/corundumstudio/socketio/SocketIOServer.java

@ -19,14 +19,12 @@ import java.net.InetSocketAddress;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import org.codehaus.jackson.map.annotate.JsonSerialize;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.annotate.JsonSerialize;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.logging.InternalLoggerFactory;
import org.jboss.netty.logging.Slf4JLoggerFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -37,6 +35,7 @@ public class SocketIOServer {
private int heartbeatThreadPoolSize = 4;
private int heartbeatTimeout = 60;
private int heartbeatInterval = 25;
private int heartbeatIntervalDiff = 5;
private int bossThreadPoolSize = 8;
private int workerThreadPoolSize = 16;
@ -54,8 +53,6 @@ public class SocketIOServer {
}
public void start() {
InternalLoggerFactory.setDefaultFactory(new Slf4JLoggerFactory());
Executor bossExecutor = Executors.newFixedThreadPool(bossThreadPoolSize);
Executor workerExecutor = Executors.newFixedThreadPool(workerThreadPoolSize);
ChannelFactory factory = new NioServerSocketChannelFactory(bossExecutor, workerExecutor);
@ -65,6 +62,7 @@ public class SocketIOServer {
socketIORouter.setHeartbeatInterval(heartbeatInterval);
socketIORouter.setHeartbeatTimeout(heartbeatTimeout);
socketIORouter.setHeartbeatThreadPoolSize(heartbeatThreadPoolSize);
socketIORouter.setHeartbeatIntervalDiff(heartbeatIntervalDiff);
socketIORouter.start();
SocketIOUpstreamHandler upstreamHandler = new SocketIOUpstreamHandler(socketIORouter);
@ -84,6 +82,16 @@ public class SocketIOServer {
this.bossThreadPoolSize = bossThreadPoolSize;
}
/**
* Heartbeat interval difference, because server should send response a little bit earlier
*
* @param value
* - time in seconds
*/
public void setHeartbeatIntervalDiff(int heartbeatIntervalDiff) {
this.heartbeatIntervalDiff = heartbeatIntervalDiff;
}
/**
* Heartbeat interval
*

6
src/main/java/com/corundumstudio/socketio/transport/XHRPollingTransport.java

@ -195,8 +195,10 @@ public class XHRPollingTransport implements SocketIOTransport {
public void disconnect(UUID sessionId) {
XHRPollingClient client = sessionId2Client.remove(sessionId);
client.send(new Packet(PacketType.DISCONNECT));
socketIORouter.disconnect(client);
if (client != null) {
client.send(new Packet(PacketType.DISCONNECT));
socketIORouter.disconnect(client);
}
}
}
Loading…
Cancel
Save