diff --git a/src/main/java/com/corundumstudio/socketio/SocketIOChannelInitializer.java b/src/main/java/com/corundumstudio/socketio/SocketIOChannelInitializer.java index 795757a..24f360c 100644 --- a/src/main/java/com/corundumstudio/socketio/SocketIOChannelInitializer.java +++ b/src/main/java/com/corundumstudio/socketio/SocketIOChannelInitializer.java @@ -15,6 +15,7 @@ */ package com.corundumstudio.socketio; +import com.corundumstudio.socketio.scheduler.HashedWheelTimeoutScheduler; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInitializer; @@ -82,7 +83,7 @@ public class SocketIOChannelInitializer extends ChannelInitializer impl private EncoderHandler encoderHandler; private WrongUrlHandler wrongUrlHandler; - private CancelableScheduler scheduler = new HashedWheelScheduler(); + private CancelableScheduler scheduler = new HashedWheelTimeoutScheduler(); private InPacketHandler packetHandler; private SSLContext sslContext; diff --git a/src/main/java/com/corundumstudio/socketio/handler/ClientHead.java b/src/main/java/com/corundumstudio/socketio/handler/ClientHead.java index 99c7f85..0f9c6db 100644 --- a/src/main/java/com/corundumstudio/socketio/handler/ClientHead.java +++ b/src/main/java/com/corundumstudio/socketio/handler/ClientHead.java @@ -120,7 +120,6 @@ public class ClientHead { } public void schedulePingTimeout() { - cancelPingTimeout(); SchedulerKey key = new SchedulerKey(Type.PING_TIMEOUT, sessionId); disconnectScheduler.schedule(key, new Runnable() { @Override diff --git a/src/main/java/com/corundumstudio/socketio/scheduler/HashedWheelTimeoutScheduler.java b/src/main/java/com/corundumstudio/socketio/scheduler/HashedWheelTimeoutScheduler.java new file mode 100644 index 0000000..aff253f --- /dev/null +++ b/src/main/java/com/corundumstudio/socketio/scheduler/HashedWheelTimeoutScheduler.java @@ -0,0 +1,120 @@ +/** + * Modified version of HashedWheelScheduler specially for timeouts handling. + * Difference: + * - handling old timeout with same key after adding new one + * fixes multithreaded problem that appears in highly concurrent non-atomic sequence cancel() -> schedule() + * + * (c) Alim Akbashev, 2015-02-11 + */ + +/** + * Copyright 2012 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.corundumstudio.socketio.scheduler; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.util.HashedWheelTimer; +import io.netty.util.Timeout; +import io.netty.util.TimerTask; +import io.netty.util.internal.PlatformDependent; + +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; + +public class HashedWheelTimeoutScheduler implements CancelableScheduler { + + private final ConcurrentMap scheduledFutures = PlatformDependent.newConcurrentHashMap(); + private final HashedWheelTimer executorService = new HashedWheelTimer(); + + private volatile ChannelHandlerContext ctx; + + @Override + public void update(ChannelHandlerContext ctx) { + this.ctx = ctx; + } + + public void cancel(SchedulerKey key) { + Timeout timeout = scheduledFutures.remove(key); + if (timeout != null) { + timeout.cancel(); + } + } + + public void schedule(final Runnable runnable, long delay, TimeUnit unit) { + executorService.newTimeout(new TimerTask() { + @Override + public void run(Timeout timeout) throws Exception { + runnable.run(); + } + }, delay, unit); + } + + public void scheduleCallback(final SchedulerKey key, final Runnable runnable, long delay, TimeUnit unit) { + Timeout timeout = executorService.newTimeout(new TimerTask() { + @Override + public void run(Timeout timeout) throws Exception { + ctx.executor().execute(new Runnable() { + @Override + public void run() { + try { + runnable.run(); + } finally { + scheduledFutures.remove(key); + } + } + }); + } + }, delay, unit); + + replaceScheduledFuture(key, timeout); + } + + public void schedule(final SchedulerKey key, final Runnable runnable, long delay, TimeUnit unit) { + Timeout timeout = executorService.newTimeout(new TimerTask() { + @Override + public void run(Timeout timeout) throws Exception { + try { + runnable.run(); + } finally { + scheduledFutures.remove(key); + } + } + }, delay, unit); + + replaceScheduledFuture(key, timeout); + } + + public void shutdown() { + executorService.stop(); + } + + private void replaceScheduledFuture(final SchedulerKey key, final Timeout newTimeout) { + final Timeout oldTimeout; + + if (newTimeout.isExpired()) { + // no need to put already expired timeout to scheduledFutures map. + // simply remove old timeout + oldTimeout = scheduledFutures.remove(key); + } else { + oldTimeout = scheduledFutures.put(key, newTimeout); + } + + // if there was old timeout, cancel it + if (oldTimeout != null) { + oldTimeout.cancel(); + } + } +}