|
|
@ -16,8 +16,9 @@ |
|
|
|
package com.corundumstudio.socketio.scheduler; |
|
|
|
|
|
|
|
import io.netty.channel.ChannelHandlerContext; |
|
|
|
import io.netty.util.concurrent.EventExecutor; |
|
|
|
import io.netty.util.concurrent.ScheduledFuture; |
|
|
|
import io.netty.util.HashedWheelTimer; |
|
|
|
import io.netty.util.Timeout; |
|
|
|
import io.netty.util.TimerTask; |
|
|
|
|
|
|
|
import java.util.Map; |
|
|
|
import java.util.concurrent.ConcurrentHashMap; |
|
|
@ -25,52 +26,58 @@ import java.util.concurrent.TimeUnit; |
|
|
|
|
|
|
|
public class HashedWheelScheduler implements CancelableScheduler { |
|
|
|
|
|
|
|
private final Map<SchedulerKey, ScheduledFuture<?>> scheduledFutures = new ConcurrentHashMap<SchedulerKey, ScheduledFuture<?>>(); |
|
|
|
private final Map<SchedulerKey, Timeout> scheduledFutures = new ConcurrentHashMap<SchedulerKey, Timeout>(); |
|
|
|
private final HashedWheelTimer executorService = new HashedWheelTimer(); |
|
|
|
|
|
|
|
private volatile EventExecutor executor; |
|
|
|
private volatile ChannelHandlerContext ctx; |
|
|
|
|
|
|
|
@Override |
|
|
|
public void update(ChannelHandlerContext ctx) { |
|
|
|
this.executor = ctx.executor(); |
|
|
|
this.ctx = ctx; |
|
|
|
} |
|
|
|
|
|
|
|
public void cancel(SchedulerKey key) { |
|
|
|
ScheduledFuture<?> timeout = scheduledFutures.remove(key); |
|
|
|
Timeout timeout = scheduledFutures.remove(key); |
|
|
|
if (timeout != null) { |
|
|
|
timeout.cancel(false); |
|
|
|
timeout.cancel(); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
public void schedule(final Runnable runnable, long delay, TimeUnit unit) { |
|
|
|
executor.schedule(new Runnable() { |
|
|
|
executorService.newTimeout(new TimerTask() { |
|
|
|
@Override |
|
|
|
public void run() { |
|
|
|
public void run(Timeout timeout) throws Exception { |
|
|
|
runnable.run(); |
|
|
|
} |
|
|
|
}, delay, unit); |
|
|
|
} |
|
|
|
|
|
|
|
public void scheduleCallback(final SchedulerKey key, final Runnable runnable, long delay, TimeUnit unit) { |
|
|
|
ScheduledFuture<?> timeout = executor.schedule(new Runnable() { |
|
|
|
Timeout timeout = executorService.newTimeout(new TimerTask() { |
|
|
|
@Override |
|
|
|
public void run() { |
|
|
|
try { |
|
|
|
runnable.run(); |
|
|
|
} finally { |
|
|
|
scheduledFutures.remove(key); |
|
|
|
} |
|
|
|
public void run(Timeout timeout) throws Exception { |
|
|
|
ctx.executor().execute(new Runnable() { |
|
|
|
@Override |
|
|
|
public void run() { |
|
|
|
try { |
|
|
|
runnable.run(); |
|
|
|
} finally { |
|
|
|
scheduledFutures.remove(key); |
|
|
|
} |
|
|
|
} |
|
|
|
}); |
|
|
|
} |
|
|
|
}, delay, unit); |
|
|
|
|
|
|
|
if (!timeout.isDone()) { |
|
|
|
if (!timeout.isExpired()) { |
|
|
|
scheduledFutures.put(key, timeout); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
public void schedule(final SchedulerKey key, final Runnable runnable, long delay, TimeUnit unit) { |
|
|
|
ScheduledFuture<?> timeout = executor.schedule(new Runnable() { |
|
|
|
Timeout timeout = executorService.newTimeout(new TimerTask() { |
|
|
|
@Override |
|
|
|
public void run() { |
|
|
|
public void run(Timeout timeout) throws Exception { |
|
|
|
try { |
|
|
|
runnable.run(); |
|
|
|
} finally { |
|
|
@ -79,9 +86,13 @@ public class HashedWheelScheduler implements CancelableScheduler { |
|
|
|
} |
|
|
|
}, delay, unit); |
|
|
|
|
|
|
|
if (!timeout.isDone()) { |
|
|
|
if (!timeout.isExpired()) { |
|
|
|
scheduledFutures.put(key, timeout); |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
public void shutdown() { |
|
|
|
executorService.stop(); |
|
|
|
} |
|
|
|
|
|
|
|
} |