diff --git a/src/main/java/com/corundumstudio/socketio/AckCallback.java b/src/main/java/com/corundumstudio/socketio/AckCallback.java index 12677cc..472491e 100644 --- a/src/main/java/com/corundumstudio/socketio/AckCallback.java +++ b/src/main/java/com/corundumstudio/socketio/AckCallback.java @@ -25,19 +25,26 @@ package com.corundumstudio.socketio; */ public abstract class AckCallback { - protected Class resultClass; - protected int timeout = -1; + protected final Class resultClass; + protected final int timeout; + /** + * Create AckCallback + * + * @param resultClass - result class + */ public AckCallback(Class resultClass) { - this.resultClass = resultClass; + this(resultClass, -1); } /** - * Creates AckCallback + * Creates AckCallback with timeout * + * @param resultClass - result class * @param timeout - callback timeout in seconds */ - public AckCallback(int timeout) { + public AckCallback(Class resultClass, int timeout) { + this.resultClass = resultClass; this.timeout = timeout; } diff --git a/src/main/java/com/corundumstudio/socketio/PacketListener.java b/src/main/java/com/corundumstudio/socketio/PacketListener.java index 071eb29..04fb785 100644 --- a/src/main/java/com/corundumstudio/socketio/PacketListener.java +++ b/src/main/java/com/corundumstudio/socketio/PacketListener.java @@ -15,6 +15,7 @@ */ package com.corundumstudio.socketio; +import com.corundumstudio.socketio.ack.AckManager; import com.corundumstudio.socketio.namespace.Namespace; import com.corundumstudio.socketio.namespace.NamespacesHub; import com.corundumstudio.socketio.parser.Packet; diff --git a/src/main/java/com/corundumstudio/socketio/SocketIOPipelineFactory.java b/src/main/java/com/corundumstudio/socketio/SocketIOPipelineFactory.java index 85be842..05e0331 100644 --- a/src/main/java/com/corundumstudio/socketio/SocketIOPipelineFactory.java +++ b/src/main/java/com/corundumstudio/socketio/SocketIOPipelineFactory.java @@ -34,6 +34,7 @@ import org.jboss.netty.handler.ssl.SslHandler; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.corundumstudio.socketio.ack.AckManager; import com.corundumstudio.socketio.handler.AuthorizeHandler; import com.corundumstudio.socketio.handler.PacketHandler; import com.corundumstudio.socketio.handler.ResourceHandler; diff --git a/src/main/java/com/corundumstudio/socketio/VoidAckCallback.java b/src/main/java/com/corundumstudio/socketio/VoidAckCallback.java index 3730816..c245273 100644 --- a/src/main/java/com/corundumstudio/socketio/VoidAckCallback.java +++ b/src/main/java/com/corundumstudio/socketio/VoidAckCallback.java @@ -25,6 +25,10 @@ public abstract class VoidAckCallback extends AckCallback { super(Void.class); } + public VoidAckCallback(int timeout) { + super(Void.class, timeout); + } + @Override public final void onSuccess(Void result) { onSuccess(); diff --git a/src/main/java/com/corundumstudio/socketio/AckManager.java b/src/main/java/com/corundumstudio/socketio/ack/AckManager.java similarity index 85% rename from src/main/java/com/corundumstudio/socketio/AckManager.java rename to src/main/java/com/corundumstudio/socketio/ack/AckManager.java index 9c3a4e7..114c8d6 100644 --- a/src/main/java/com/corundumstudio/socketio/AckManager.java +++ b/src/main/java/com/corundumstudio/socketio/ack/AckManager.java @@ -13,9 +13,10 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.corundumstudio.socketio; +package com.corundumstudio.socketio.ack; import java.util.Map; +import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; @@ -24,6 +25,10 @@ import java.util.concurrent.atomic.AtomicLong; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.corundumstudio.socketio.AckCallback; +import com.corundumstudio.socketio.Disconnectable; +import com.corundumstudio.socketio.SocketIOClient; +import com.corundumstudio.socketio.ack.AckManager.AckEntry; import com.corundumstudio.socketio.parser.Packet; import com.corundumstudio.socketio.scheduler.CancelableScheduler; import com.corundumstudio.socketio.scheduler.SchedulerKey; @@ -43,6 +48,10 @@ public class AckManager implements Disconnectable { return index; } + public Set getAckIndexes() { + return ackCallbacks.keySet(); + } + public AckCallback getAckCallback(long index) { return ackCallbacks.get(index); } @@ -127,7 +136,7 @@ public class AckManager implements Disconnectable { if (callback.getTimeout() == -1) { return; } - SchedulerKey key = new SchedulerKey(Type.ACK_TIMEOUT, sessionId); + SchedulerKey key = new AckSchedulerKey(Type.ACK_TIMEOUT, sessionId, index); scheduler.schedule(key, new Runnable() { @Override public void run() { @@ -139,7 +148,13 @@ public class AckManager implements Disconnectable { @Override public void onDisconnect(BaseClient client) { - ackEntries.remove(client.getSessionId()); + AckEntry entry = ackEntries.remove(client.getSessionId()); + if (entry != null) { + for (Long index : entry.getAckIndexes()) { + SchedulerKey key = new AckSchedulerKey(Type.ACK_TIMEOUT, client.getSessionId(), index); + scheduler.cancel(key); + } + } } } diff --git a/src/main/java/com/corundumstudio/socketio/ack/AckSchedulerKey.java b/src/main/java/com/corundumstudio/socketio/ack/AckSchedulerKey.java new file mode 100644 index 0000000..9a11fc9 --- /dev/null +++ b/src/main/java/com/corundumstudio/socketio/ack/AckSchedulerKey.java @@ -0,0 +1,57 @@ +/** + * Copyright 2012 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.corundumstudio.socketio.ack; + +import java.util.UUID; + +import com.corundumstudio.socketio.scheduler.SchedulerKey; + +public class AckSchedulerKey extends SchedulerKey { + + private final long index; + + public AckSchedulerKey(Type type, UUID sessionId, long index) { + super(type, sessionId); + this.index = index; + } + + public long getIndex() { + return index; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = super.hashCode(); + result = prime * result + (int) (index ^ (index >>> 32)); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (!super.equals(obj)) + return false; + if (getClass() != obj.getClass()) + return false; + AckSchedulerKey other = (AckSchedulerKey) obj; + if (index != other.index) + return false; + return true; + } + +} diff --git a/src/main/java/com/corundumstudio/socketio/parser/Decoder.java b/src/main/java/com/corundumstudio/socketio/parser/Decoder.java index b73f781..13a828b 100644 --- a/src/main/java/com/corundumstudio/socketio/parser/Decoder.java +++ b/src/main/java/com/corundumstudio/socketio/parser/Decoder.java @@ -25,7 +25,7 @@ import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.util.CharsetUtil; import com.corundumstudio.socketio.AckCallback; -import com.corundumstudio.socketio.AckManager; +import com.corundumstudio.socketio.ack.AckManager; import com.corundumstudio.socketio.namespace.Namespace; public class Decoder { diff --git a/src/main/java/com/corundumstudio/socketio/transport/BaseClient.java b/src/main/java/com/corundumstudio/socketio/transport/BaseClient.java index 72b6f70..d47fd2d 100644 --- a/src/main/java/com/corundumstudio/socketio/transport/BaseClient.java +++ b/src/main/java/com/corundumstudio/socketio/transport/BaseClient.java @@ -25,9 +25,9 @@ import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ChannelFutureListener; -import com.corundumstudio.socketio.AckManager; import com.corundumstudio.socketio.DisconnectableHub; import com.corundumstudio.socketio.SocketIOClient; +import com.corundumstudio.socketio.ack.AckManager; import com.corundumstudio.socketio.namespace.Namespace; import com.corundumstudio.socketio.parser.Packet; import com.corundumstudio.socketio.parser.PacketType; diff --git a/src/main/java/com/corundumstudio/socketio/transport/FlashSocketTransport.java b/src/main/java/com/corundumstudio/socketio/transport/FlashSocketTransport.java index c11f0af..f49f3ee 100644 --- a/src/main/java/com/corundumstudio/socketio/transport/FlashSocketTransport.java +++ b/src/main/java/com/corundumstudio/socketio/transport/FlashSocketTransport.java @@ -18,10 +18,10 @@ package com.corundumstudio.socketio.transport; import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelHandler.Sharable; -import com.corundumstudio.socketio.AckManager; import com.corundumstudio.socketio.DisconnectableHub; import com.corundumstudio.socketio.HeartbeatHandler; import com.corundumstudio.socketio.SocketIOPipelineFactory; +import com.corundumstudio.socketio.ack.AckManager; import com.corundumstudio.socketio.handler.AuthorizeHandler; @Sharable diff --git a/src/main/java/com/corundumstudio/socketio/transport/WebSocketClient.java b/src/main/java/com/corundumstudio/socketio/transport/WebSocketClient.java index fc569d1..da38a84 100644 --- a/src/main/java/com/corundumstudio/socketio/transport/WebSocketClient.java +++ b/src/main/java/com/corundumstudio/socketio/transport/WebSocketClient.java @@ -20,8 +20,8 @@ import java.util.UUID; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelFuture; -import com.corundumstudio.socketio.AckManager; import com.corundumstudio.socketio.DisconnectableHub; +import com.corundumstudio.socketio.ack.AckManager; import com.corundumstudio.socketio.messages.WebSocketPacketMessage; import com.corundumstudio.socketio.parser.Packet; diff --git a/src/main/java/com/corundumstudio/socketio/transport/WebSocketTransport.java b/src/main/java/com/corundumstudio/socketio/transport/WebSocketTransport.java index ac6cfc7..a55416b 100644 --- a/src/main/java/com/corundumstudio/socketio/transport/WebSocketTransport.java +++ b/src/main/java/com/corundumstudio/socketio/transport/WebSocketTransport.java @@ -42,13 +42,13 @@ import org.jboss.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFa import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.corundumstudio.socketio.AckManager; import com.corundumstudio.socketio.CompositeIterable; import com.corundumstudio.socketio.Disconnectable; import com.corundumstudio.socketio.DisconnectableHub; import com.corundumstudio.socketio.HeartbeatHandler; import com.corundumstudio.socketio.SocketIOClient; import com.corundumstudio.socketio.SocketIOPipelineFactory; +import com.corundumstudio.socketio.ack.AckManager; import com.corundumstudio.socketio.handler.AuthorizeHandler; import com.corundumstudio.socketio.messages.PacketsMessage; diff --git a/src/main/java/com/corundumstudio/socketio/transport/XHRPollingClient.java b/src/main/java/com/corundumstudio/socketio/transport/XHRPollingClient.java index a37cc1a..599ac9a 100644 --- a/src/main/java/com/corundumstudio/socketio/transport/XHRPollingClient.java +++ b/src/main/java/com/corundumstudio/socketio/transport/XHRPollingClient.java @@ -20,8 +20,8 @@ import java.util.UUID; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelFuture; -import com.corundumstudio.socketio.AckManager; import com.corundumstudio.socketio.DisconnectableHub; +import com.corundumstudio.socketio.ack.AckManager; import com.corundumstudio.socketio.messages.XHRNewChannelMessage; import com.corundumstudio.socketio.messages.XHRPacketMessage; import com.corundumstudio.socketio.parser.Packet; diff --git a/src/main/java/com/corundumstudio/socketio/transport/XHRPollingTransport.java b/src/main/java/com/corundumstudio/socketio/transport/XHRPollingTransport.java index 6d31d98..a7fef7a 100644 --- a/src/main/java/com/corundumstudio/socketio/transport/XHRPollingTransport.java +++ b/src/main/java/com/corundumstudio/socketio/transport/XHRPollingTransport.java @@ -40,13 +40,13 @@ import org.jboss.netty.handler.codec.http.QueryStringDecoder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.corundumstudio.socketio.AckManager; import com.corundumstudio.socketio.CompositeIterable; import com.corundumstudio.socketio.Configuration; import com.corundumstudio.socketio.Disconnectable; import com.corundumstudio.socketio.DisconnectableHub; import com.corundumstudio.socketio.SocketIOClient; import com.corundumstudio.socketio.SocketIOPipelineFactory; +import com.corundumstudio.socketio.ack.AckManager; import com.corundumstudio.socketio.handler.AuthorizeHandler; import com.corundumstudio.socketio.messages.PacketsMessage; import com.corundumstudio.socketio.messages.XHRErrorMessage; diff --git a/src/test/java/com/corundumstudio/socketio/PacketHandlerTest.java b/src/test/java/com/corundumstudio/socketio/PacketHandlerTest.java index ad8a7b8..eb8577a 100644 --- a/src/test/java/com/corundumstudio/socketio/PacketHandlerTest.java +++ b/src/test/java/com/corundumstudio/socketio/PacketHandlerTest.java @@ -32,6 +32,7 @@ import org.jboss.netty.channel.UpstreamMessageEvent; import org.junit.Before; import org.junit.Test; +import com.corundumstudio.socketio.ack.AckManager; import com.corundumstudio.socketio.handler.PacketHandler; import com.corundumstudio.socketio.messages.PacketsMessage; import com.corundumstudio.socketio.namespace.Namespace; diff --git a/src/test/java/com/corundumstudio/socketio/parser/DecoderBaseTest.java b/src/test/java/com/corundumstudio/socketio/parser/DecoderBaseTest.java index a5de8a0..332dfea 100644 --- a/src/test/java/com/corundumstudio/socketio/parser/DecoderBaseTest.java +++ b/src/test/java/com/corundumstudio/socketio/parser/DecoderBaseTest.java @@ -19,8 +19,8 @@ import org.junit.Before; import mockit.Mocked; -import com.corundumstudio.socketio.AckManager; import com.corundumstudio.socketio.Configuration; +import com.corundumstudio.socketio.ack.AckManager; public class DecoderBaseTest {