From 3baf706a41dc4734b95f7a48dc6e1bcdd736f52a Mon Sep 17 00:00:00 2001 From: Nikita Date: Wed, 23 May 2012 17:53:00 +0400 Subject: [PATCH] AckCallback with timeout introduced --- .../corundumstudio/socketio/AckCallback.java | 48 ++++++++++++++++++ .../corundumstudio/socketio/AckManager.java | 49 ++++++++++++++++--- .../socketio/SocketIOClient.java | 8 +-- .../socketio/SocketIOPipelineFactory.java | 3 +- .../socketio/scheduler/SchedulerKey.java | 2 +- .../socketio/transport/BaseClient.java | 9 ++-- .../transport/XHRPollingTransport.java | 4 +- 7 files changed, 105 insertions(+), 18 deletions(-) create mode 100644 src/main/java/com/corundumstudio/socketio/AckCallback.java diff --git a/src/main/java/com/corundumstudio/socketio/AckCallback.java b/src/main/java/com/corundumstudio/socketio/AckCallback.java new file mode 100644 index 0000000..df9fb63 --- /dev/null +++ b/src/main/java/com/corundumstudio/socketio/AckCallback.java @@ -0,0 +1,48 @@ +/** + * Copyright 2012 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.corundumstudio.socketio; + +public abstract class AckCallback { + + private int timeout = -1; + + public AckCallback() { + } + + /** + * Creates AckCallback + * + * @param timeout - callback timeout in seconds + */ + public AckCallback(int timeout) { + this.timeout = timeout; + } + + public int getTimeout() { + return timeout; + } + + public abstract void onSuccess(); + + /** + * Invoked only once then timeout defined + * + */ + public void onTimeout() { + + } + +} diff --git a/src/main/java/com/corundumstudio/socketio/AckManager.java b/src/main/java/com/corundumstudio/socketio/AckManager.java index e9516db..696ee52 100644 --- a/src/main/java/com/corundumstudio/socketio/AckManager.java +++ b/src/main/java/com/corundumstudio/socketio/AckManager.java @@ -21,28 +21,48 @@ import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import com.corundumstudio.socketio.parser.Packet; +import com.corundumstudio.socketio.scheduler.CancelableScheduler; +import com.corundumstudio.socketio.scheduler.SchedulerKey; +import com.corundumstudio.socketio.scheduler.SchedulerKey.Type; public class AckManager implements Disconnectable { private final AtomicLong ackIndex = new AtomicLong(); - private final Map ackCallbacks = new ConcurrentHashMap(); + private final Map ackCallbacks = new ConcurrentHashMap(); private final ConcurrentMap> clientCallbackIds = new ConcurrentHashMap>(); + private final CancelableScheduler scheduler; + + public AckManager(CancelableScheduler scheduler) { + super(); + this.scheduler = scheduler; + } + public void onAck(SocketIOClient client, Packet packet) { - Runnable callback = ackCallbacks.remove(packet.getAckId()); + SchedulerKey key = new SchedulerKey(Type.ACK_TIMEOUT, client.getSessionId()); + scheduler.cancel(key); + AckCallback callback = removeCallback(client.getSessionId(), packet.getAckId()); + if (callback != null) { + callback.onSuccess(); + } + } + + private AckCallback removeCallback(UUID sessionId, long index) { + AckCallback callback = ackCallbacks.remove(index); if (callback != null) { - Set callbackIds = clientCallbackIds.get(client.getSessionId()); + Set callbackIds = clientCallbackIds.get(sessionId); if (callbackIds != null) { - callbackIds.remove(packet.getAckId()); + callbackIds.remove(index); } - callback.run(); } + return callback; } - public long registerAck(UUID sessionId, Runnable callback) { + public long registerAck(UUID sessionId, final AckCallback callback) { Set callbackIds = clientCallbackIds.get(sessionId); if (callbackIds == null) { callbackIds = Collections.newSetFromMap(new ConcurrentHashMap()); @@ -54,9 +74,26 @@ public class AckManager implements Disconnectable { long index = ackIndex.incrementAndGet(); callbackIds.add(index); ackCallbacks.put(index, callback); + + scheduleTimeout(index, sessionId, callback); + return index; } + private void scheduleTimeout(final long index, final UUID sessionId, final AckCallback callback) { + if (callback.getTimeout() == -1) { + return; + } + SchedulerKey key = new SchedulerKey(Type.ACK_TIMEOUT, sessionId); + scheduler.schedule(key, new Runnable() { + @Override + public void run() { + removeCallback(sessionId, index); + callback.onTimeout(); + } + }, callback.getTimeout(), TimeUnit.SECONDS); + } + @Override public void onDisconnect(SocketIOClient client) { Set callbackIds = clientCallbackIds.remove(client.getSessionId()); diff --git a/src/main/java/com/corundumstudio/socketio/SocketIOClient.java b/src/main/java/com/corundumstudio/socketio/SocketIOClient.java index c23af5d..f4259db 100644 --- a/src/main/java/com/corundumstudio/socketio/SocketIOClient.java +++ b/src/main/java/com/corundumstudio/socketio/SocketIOClient.java @@ -42,7 +42,7 @@ public interface SocketIOClient { * @param message - message to send * @param ackCallback - ack callback */ - void sendMessage(String message, Runnable ackCallback); + void sendMessage(String message, AckCallback ackCallback); /** * Send object. Object will be encoded to json-format. @@ -57,7 +57,7 @@ public interface SocketIOClient { * @param object - object to send * @param ackCallback - ack callback */ - void sendJsonObject(Object object, Runnable ackCallback); + void sendJsonObject(Object object, AckCallback ackCallback); /** * Send packet @@ -72,7 +72,7 @@ public interface SocketIOClient { * @param packet - packet to send * @param ackCallback - ack callback */ - void send(Packet packet, Runnable ackCallback); + void send(Packet packet, AckCallback ackCallback); /** * Disconnect client @@ -95,7 +95,7 @@ public interface SocketIOClient { * @param data - event data * @param ackCallback - ack callback */ - void sendEvent(String name, Object data, Runnable ackCallback); + void sendEvent(String name, Object data, AckCallback ackCallback); SocketAddress getRemoteAddress(); diff --git a/src/main/java/com/corundumstudio/socketio/SocketIOPipelineFactory.java b/src/main/java/com/corundumstudio/socketio/SocketIOPipelineFactory.java index a7ab2ba..f7ec03c 100644 --- a/src/main/java/com/corundumstudio/socketio/SocketIOPipelineFactory.java +++ b/src/main/java/com/corundumstudio/socketio/SocketIOPipelineFactory.java @@ -47,7 +47,7 @@ public class SocketIOPipelineFactory implements ChannelPipelineFactory, Disconne private final int protocol = 1; - private AckManager ackManager = new AckManager(); + private AckManager ackManager; private AuthorizeHandler authorizeHandler; private XHRPollingTransport xhrPollingTransport; @@ -68,6 +68,7 @@ public class SocketIOPipelineFactory implements ChannelPipelineFactory, Disconne Encoder encoder = new Encoder(objectMapper); Decoder decoder = new Decoder(objectMapper); + ackManager = new AckManager(scheduler); heartbeatHandler = new HeartbeatHandler(configuration, scheduler); PacketListener packetListener = new PacketListener(socketIOHandler, this, heartbeatHandler, ackManager); diff --git a/src/main/java/com/corundumstudio/socketio/scheduler/SchedulerKey.java b/src/main/java/com/corundumstudio/socketio/scheduler/SchedulerKey.java index bd07582..ed4a4ba 100644 --- a/src/main/java/com/corundumstudio/socketio/scheduler/SchedulerKey.java +++ b/src/main/java/com/corundumstudio/socketio/scheduler/SchedulerKey.java @@ -19,7 +19,7 @@ import java.util.UUID; public class SchedulerKey { - public enum Type {NOOP, HEARBEAT_TIMEOUT, CLOSE_TIMEOUT, AUTHORIZE}; + public enum Type {POLLING, HEARBEAT_TIMEOUT, CLOSE_TIMEOUT, AUTHORIZE, ACK_TIMEOUT}; private Type type; private UUID sessionId; diff --git a/src/main/java/com/corundumstudio/socketio/transport/BaseClient.java b/src/main/java/com/corundumstudio/socketio/transport/BaseClient.java index b61cbc7..faccfb2 100644 --- a/src/main/java/com/corundumstudio/socketio/transport/BaseClient.java +++ b/src/main/java/com/corundumstudio/socketio/transport/BaseClient.java @@ -21,6 +21,7 @@ import java.util.UUID; import org.jboss.netty.channel.Channel; +import com.corundumstudio.socketio.AckCallback; import com.corundumstudio.socketio.AckManager; import com.corundumstudio.socketio.SocketIOClient; import com.corundumstudio.socketio.parser.Packet; @@ -51,7 +52,7 @@ abstract class BaseClient implements SocketIOClient { } @Override - public void sendEvent(String name, Object data, Runnable ackCallback) { + public void sendEvent(String name, Object data, AckCallback ackCallback) { Packet packet = new Packet(PacketType.EVENT); packet.setName(name); packet.setArgs(Collections.singletonList(data)); @@ -59,7 +60,7 @@ abstract class BaseClient implements SocketIOClient { } @Override - public void sendMessage(String message, Runnable ackCallback) { + public void sendMessage(String message, AckCallback ackCallback) { Packet packet = new Packet(PacketType.MESSAGE); packet.setData(message); send(packet, ackCallback); @@ -85,14 +86,14 @@ abstract class BaseClient implements SocketIOClient { } @Override - public void send(Packet packet, Runnable ackCallback) { + public void send(Packet packet, AckCallback ackCallback) { long index = ackManager.registerAck(sessionId, ackCallback); packet.setId(index); send(packet); } @Override - public void sendJsonObject(Object object, Runnable ackCallback) { + public void sendJsonObject(Object object, AckCallback ackCallback) { Packet packet = new Packet(PacketType.JSON); packet.setData(object); send(packet, ackCallback); diff --git a/src/main/java/com/corundumstudio/socketio/transport/XHRPollingTransport.java b/src/main/java/com/corundumstudio/socketio/transport/XHRPollingTransport.java index 7d92c23..28d22d1 100644 --- a/src/main/java/com/corundumstudio/socketio/transport/XHRPollingTransport.java +++ b/src/main/java/com/corundumstudio/socketio/transport/XHRPollingTransport.java @@ -109,7 +109,7 @@ public class XHRPollingTransport extends SimpleChannelUpstreamHandler implements } private void scheduleNoop(Channel channel, final UUID sessionId) { - SchedulerKey key = new SchedulerKey(Type.NOOP, sessionId); + SchedulerKey key = new SchedulerKey(Type.POLLING, sessionId); scheduler.cancel(key); scheduler.schedule(key, new Runnable() { @Override @@ -199,7 +199,7 @@ public class XHRPollingTransport extends SimpleChannelUpstreamHandler implements UUID sessionId = xhrClient.getSessionId(); sessionId2Client.remove(sessionId); - SchedulerKey noopKey = new SchedulerKey(Type.NOOP, sessionId); + SchedulerKey noopKey = new SchedulerKey(Type.POLLING, sessionId); scheduler.cancel(noopKey); SchedulerKey closeTimeoutKey = new SchedulerKey(Type.CLOSE_TIMEOUT, sessionId); scheduler.cancel(closeTimeoutKey);