diff --git a/src/main/java/com/corundumstudio/socketio/AckCallback.java b/src/main/java/com/corundumstudio/socketio/AckCallback.java
new file mode 100644
index 0000000..df9fb63
--- /dev/null
+++ b/src/main/java/com/corundumstudio/socketio/AckCallback.java
@@ -0,0 +1,48 @@
+/**
+ * Copyright 2012 Nikita Koksharov
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.corundumstudio.socketio;
+
+public abstract class AckCallback {
+
+ private int timeout = -1;
+
+ public AckCallback() {
+ }
+
+ /**
+ * Creates AckCallback
+ *
+ * @param timeout - callback timeout in seconds
+ */
+ public AckCallback(int timeout) {
+ this.timeout = timeout;
+ }
+
+ public int getTimeout() {
+ return timeout;
+ }
+
+ public abstract void onSuccess();
+
+ /**
+ * Invoked only once then timeout
defined
+ *
+ */
+ public void onTimeout() {
+
+ }
+
+}
diff --git a/src/main/java/com/corundumstudio/socketio/AckManager.java b/src/main/java/com/corundumstudio/socketio/AckManager.java
index e9516db..696ee52 100644
--- a/src/main/java/com/corundumstudio/socketio/AckManager.java
+++ b/src/main/java/com/corundumstudio/socketio/AckManager.java
@@ -21,28 +21,48 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import com.corundumstudio.socketio.parser.Packet;
+import com.corundumstudio.socketio.scheduler.CancelableScheduler;
+import com.corundumstudio.socketio.scheduler.SchedulerKey;
+import com.corundumstudio.socketio.scheduler.SchedulerKey.Type;
public class AckManager implements Disconnectable {
private final AtomicLong ackIndex = new AtomicLong();
- private final Map ackCallbacks = new ConcurrentHashMap();
+ private final Map ackCallbacks = new ConcurrentHashMap();
private final ConcurrentMap> clientCallbackIds = new ConcurrentHashMap>();
+ private final CancelableScheduler scheduler;
+
+ public AckManager(CancelableScheduler scheduler) {
+ super();
+ this.scheduler = scheduler;
+ }
+
public void onAck(SocketIOClient client, Packet packet) {
- Runnable callback = ackCallbacks.remove(packet.getAckId());
+ SchedulerKey key = new SchedulerKey(Type.ACK_TIMEOUT, client.getSessionId());
+ scheduler.cancel(key);
+ AckCallback callback = removeCallback(client.getSessionId(), packet.getAckId());
+ if (callback != null) {
+ callback.onSuccess();
+ }
+ }
+
+ private AckCallback removeCallback(UUID sessionId, long index) {
+ AckCallback callback = ackCallbacks.remove(index);
if (callback != null) {
- Set callbackIds = clientCallbackIds.get(client.getSessionId());
+ Set callbackIds = clientCallbackIds.get(sessionId);
if (callbackIds != null) {
- callbackIds.remove(packet.getAckId());
+ callbackIds.remove(index);
}
- callback.run();
}
+ return callback;
}
- public long registerAck(UUID sessionId, Runnable callback) {
+ public long registerAck(UUID sessionId, final AckCallback callback) {
Set callbackIds = clientCallbackIds.get(sessionId);
if (callbackIds == null) {
callbackIds = Collections.newSetFromMap(new ConcurrentHashMap());
@@ -54,9 +74,26 @@ public class AckManager implements Disconnectable {
long index = ackIndex.incrementAndGet();
callbackIds.add(index);
ackCallbacks.put(index, callback);
+
+ scheduleTimeout(index, sessionId, callback);
+
return index;
}
+ private void scheduleTimeout(final long index, final UUID sessionId, final AckCallback callback) {
+ if (callback.getTimeout() == -1) {
+ return;
+ }
+ SchedulerKey key = new SchedulerKey(Type.ACK_TIMEOUT, sessionId);
+ scheduler.schedule(key, new Runnable() {
+ @Override
+ public void run() {
+ removeCallback(sessionId, index);
+ callback.onTimeout();
+ }
+ }, callback.getTimeout(), TimeUnit.SECONDS);
+ }
+
@Override
public void onDisconnect(SocketIOClient client) {
Set callbackIds = clientCallbackIds.remove(client.getSessionId());
diff --git a/src/main/java/com/corundumstudio/socketio/SocketIOClient.java b/src/main/java/com/corundumstudio/socketio/SocketIOClient.java
index c23af5d..f4259db 100644
--- a/src/main/java/com/corundumstudio/socketio/SocketIOClient.java
+++ b/src/main/java/com/corundumstudio/socketio/SocketIOClient.java
@@ -42,7 +42,7 @@ public interface SocketIOClient {
* @param message - message to send
* @param ackCallback - ack callback
*/
- void sendMessage(String message, Runnable ackCallback);
+ void sendMessage(String message, AckCallback ackCallback);
/**
* Send object. Object will be encoded to json-format.
@@ -57,7 +57,7 @@ public interface SocketIOClient {
* @param object - object to send
* @param ackCallback - ack callback
*/
- void sendJsonObject(Object object, Runnable ackCallback);
+ void sendJsonObject(Object object, AckCallback ackCallback);
/**
* Send packet
@@ -72,7 +72,7 @@ public interface SocketIOClient {
* @param packet - packet to send
* @param ackCallback - ack callback
*/
- void send(Packet packet, Runnable ackCallback);
+ void send(Packet packet, AckCallback ackCallback);
/**
* Disconnect client
@@ -95,7 +95,7 @@ public interface SocketIOClient {
* @param data - event data
* @param ackCallback - ack callback
*/
- void sendEvent(String name, Object data, Runnable ackCallback);
+ void sendEvent(String name, Object data, AckCallback ackCallback);
SocketAddress getRemoteAddress();
diff --git a/src/main/java/com/corundumstudio/socketio/SocketIOPipelineFactory.java b/src/main/java/com/corundumstudio/socketio/SocketIOPipelineFactory.java
index a7ab2ba..f7ec03c 100644
--- a/src/main/java/com/corundumstudio/socketio/SocketIOPipelineFactory.java
+++ b/src/main/java/com/corundumstudio/socketio/SocketIOPipelineFactory.java
@@ -47,7 +47,7 @@ public class SocketIOPipelineFactory implements ChannelPipelineFactory, Disconne
private final int protocol = 1;
- private AckManager ackManager = new AckManager();
+ private AckManager ackManager;
private AuthorizeHandler authorizeHandler;
private XHRPollingTransport xhrPollingTransport;
@@ -68,6 +68,7 @@ public class SocketIOPipelineFactory implements ChannelPipelineFactory, Disconne
Encoder encoder = new Encoder(objectMapper);
Decoder decoder = new Decoder(objectMapper);
+ ackManager = new AckManager(scheduler);
heartbeatHandler = new HeartbeatHandler(configuration, scheduler);
PacketListener packetListener = new PacketListener(socketIOHandler, this, heartbeatHandler, ackManager);
diff --git a/src/main/java/com/corundumstudio/socketio/scheduler/SchedulerKey.java b/src/main/java/com/corundumstudio/socketio/scheduler/SchedulerKey.java
index bd07582..ed4a4ba 100644
--- a/src/main/java/com/corundumstudio/socketio/scheduler/SchedulerKey.java
+++ b/src/main/java/com/corundumstudio/socketio/scheduler/SchedulerKey.java
@@ -19,7 +19,7 @@ import java.util.UUID;
public class SchedulerKey {
- public enum Type {NOOP, HEARBEAT_TIMEOUT, CLOSE_TIMEOUT, AUTHORIZE};
+ public enum Type {POLLING, HEARBEAT_TIMEOUT, CLOSE_TIMEOUT, AUTHORIZE, ACK_TIMEOUT};
private Type type;
private UUID sessionId;
diff --git a/src/main/java/com/corundumstudio/socketio/transport/BaseClient.java b/src/main/java/com/corundumstudio/socketio/transport/BaseClient.java
index b61cbc7..faccfb2 100644
--- a/src/main/java/com/corundumstudio/socketio/transport/BaseClient.java
+++ b/src/main/java/com/corundumstudio/socketio/transport/BaseClient.java
@@ -21,6 +21,7 @@ import java.util.UUID;
import org.jboss.netty.channel.Channel;
+import com.corundumstudio.socketio.AckCallback;
import com.corundumstudio.socketio.AckManager;
import com.corundumstudio.socketio.SocketIOClient;
import com.corundumstudio.socketio.parser.Packet;
@@ -51,7 +52,7 @@ abstract class BaseClient implements SocketIOClient {
}
@Override
- public void sendEvent(String name, Object data, Runnable ackCallback) {
+ public void sendEvent(String name, Object data, AckCallback ackCallback) {
Packet packet = new Packet(PacketType.EVENT);
packet.setName(name);
packet.setArgs(Collections.singletonList(data));
@@ -59,7 +60,7 @@ abstract class BaseClient implements SocketIOClient {
}
@Override
- public void sendMessage(String message, Runnable ackCallback) {
+ public void sendMessage(String message, AckCallback ackCallback) {
Packet packet = new Packet(PacketType.MESSAGE);
packet.setData(message);
send(packet, ackCallback);
@@ -85,14 +86,14 @@ abstract class BaseClient implements SocketIOClient {
}
@Override
- public void send(Packet packet, Runnable ackCallback) {
+ public void send(Packet packet, AckCallback ackCallback) {
long index = ackManager.registerAck(sessionId, ackCallback);
packet.setId(index);
send(packet);
}
@Override
- public void sendJsonObject(Object object, Runnable ackCallback) {
+ public void sendJsonObject(Object object, AckCallback ackCallback) {
Packet packet = new Packet(PacketType.JSON);
packet.setData(object);
send(packet, ackCallback);
diff --git a/src/main/java/com/corundumstudio/socketio/transport/XHRPollingTransport.java b/src/main/java/com/corundumstudio/socketio/transport/XHRPollingTransport.java
index 7d92c23..28d22d1 100644
--- a/src/main/java/com/corundumstudio/socketio/transport/XHRPollingTransport.java
+++ b/src/main/java/com/corundumstudio/socketio/transport/XHRPollingTransport.java
@@ -109,7 +109,7 @@ public class XHRPollingTransport extends SimpleChannelUpstreamHandler implements
}
private void scheduleNoop(Channel channel, final UUID sessionId) {
- SchedulerKey key = new SchedulerKey(Type.NOOP, sessionId);
+ SchedulerKey key = new SchedulerKey(Type.POLLING, sessionId);
scheduler.cancel(key);
scheduler.schedule(key, new Runnable() {
@Override
@@ -199,7 +199,7 @@ public class XHRPollingTransport extends SimpleChannelUpstreamHandler implements
UUID sessionId = xhrClient.getSessionId();
sessionId2Client.remove(sessionId);
- SchedulerKey noopKey = new SchedulerKey(Type.NOOP, sessionId);
+ SchedulerKey noopKey = new SchedulerKey(Type.POLLING, sessionId);
scheduler.cancel(noopKey);
SchedulerKey closeTimeoutKey = new SchedulerKey(Type.CLOSE_TIMEOUT, sessionId);
scheduler.cancel(closeTimeoutKey);