From 9f18780398901495554a98289a5f341eba0b6e7e Mon Sep 17 00:00:00 2001 From: Nikita Date: Thu, 17 May 2012 17:12:25 +0400 Subject: [PATCH] Issue #7 fixed --- .../socketio/AuthorizeHandler.java | 13 ++-- .../socketio/Configuration.java | 10 +++ .../socketio/HeartbeatHandler.java | 24 ++++---- .../socketio/SocketIOPipelineFactory.java | 11 ++-- .../{ => scheduler}/CancelableScheduler.java | 10 +-- .../socketio/scheduler/SchedulerKey.java | 61 +++++++++++++++++++ .../transport/WebSocketTransport.java | 16 ++--- .../transport/XHRPollingTransport.java | 47 ++++++++------ 8 files changed, 141 insertions(+), 51 deletions(-) rename src/main/java/com/corundumstudio/socketio/{ => scheduler}/CancelableScheduler.java (83%) create mode 100644 src/main/java/com/corundumstudio/socketio/scheduler/SchedulerKey.java diff --git a/src/main/java/com/corundumstudio/socketio/AuthorizeHandler.java b/src/main/java/com/corundumstudio/socketio/AuthorizeHandler.java index fbd9f25..5fd2434 100644 --- a/src/main/java/com/corundumstudio/socketio/AuthorizeHandler.java +++ b/src/main/java/com/corundumstudio/socketio/AuthorizeHandler.java @@ -46,13 +46,16 @@ import org.slf4j.LoggerFactory; import com.corundumstudio.socketio.messages.AuthorizeMessage; import com.corundumstudio.socketio.parser.Packet; import com.corundumstudio.socketio.parser.PacketType; +import com.corundumstudio.socketio.scheduler.CancelableScheduler; +import com.corundumstudio.socketio.scheduler.SchedulerKey; +import com.corundumstudio.socketio.scheduler.SchedulerKey.Type; @Sharable public class AuthorizeHandler extends SimpleChannelUpstreamHandler implements Disconnectable { private final Logger log = LoggerFactory.getLogger(getClass()); - private final CancelableScheduler disconnectScheduler; + private final CancelableScheduler disconnectScheduler; private final Set authorizedSessionIds = Collections.newSetFromMap(new ConcurrentHashMap()); @@ -61,7 +64,7 @@ public class AuthorizeHandler extends SimpleChannelUpstreamHandler implements Di private final Configuration configuration; private final SocketIOListener socketIOListener; - public AuthorizeHandler(String connectPath, SocketIOListener socketIOListener, CancelableScheduler scheduler, Configuration configuration) { + public AuthorizeHandler(String connectPath, SocketIOListener socketIOListener, CancelableScheduler scheduler, Configuration configuration) { super(); this.connectPath = connectPath; this.socketIOListener = socketIOListener; @@ -122,7 +125,8 @@ public class AuthorizeHandler extends SimpleChannelUpstreamHandler implements Di future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { - disconnectScheduler.schedule(sessionId, new Runnable() { + SchedulerKey key = new SchedulerKey(Type.AUTHORIZE, sessionId); + disconnectScheduler.schedule(key, new Runnable() { @Override public void run() { authorizedSessionIds.remove(sessionId); @@ -138,7 +142,8 @@ public class AuthorizeHandler extends SimpleChannelUpstreamHandler implements Di } public void connect(SocketIOClient client) { - disconnectScheduler.cancel(client.getSessionId()); + SchedulerKey key = new SchedulerKey(Type.AUTHORIZE, client.getSessionId()); + disconnectScheduler.cancel(key); client.send(new Packet(PacketType.CONNECT)); socketIOListener.onConnect(client); } diff --git a/src/main/java/com/corundumstudio/socketio/Configuration.java b/src/main/java/com/corundumstudio/socketio/Configuration.java index 958ff65..c843fd8 100644 --- a/src/main/java/com/corundumstudio/socketio/Configuration.java +++ b/src/main/java/com/corundumstudio/socketio/Configuration.java @@ -28,6 +28,9 @@ public class Configuration { private Executor workerExecutor = Executors.newCachedThreadPool(); private boolean allowCustomRequests = false; + + private int pollingDuration = 20; + private int heartbeatThreadPoolSize = Runtime.getRuntime().availableProcessors() * 2; private int heartbeatTimeout = 60; private int heartbeatInterval = 25; @@ -175,4 +178,11 @@ public class Configuration { this.allowCustomRequests = allowCustomRequests; } + public int getPollingDuration() { + return pollingDuration; + } + public void setPollingDuration(int pollingDuration) { + this.pollingDuration = pollingDuration; + } + } diff --git a/src/main/java/com/corundumstudio/socketio/HeartbeatHandler.java b/src/main/java/com/corundumstudio/socketio/HeartbeatHandler.java index 39740b7..7f123db 100644 --- a/src/main/java/com/corundumstudio/socketio/HeartbeatHandler.java +++ b/src/main/java/com/corundumstudio/socketio/HeartbeatHandler.java @@ -15,7 +15,6 @@ */ package com.corundumstudio.socketio; -import java.util.UUID; import java.util.concurrent.TimeUnit; import org.slf4j.Logger; @@ -23,36 +22,39 @@ import org.slf4j.LoggerFactory; import com.corundumstudio.socketio.parser.Packet; import com.corundumstudio.socketio.parser.PacketType; +import com.corundumstudio.socketio.scheduler.CancelableScheduler; +import com.corundumstudio.socketio.scheduler.SchedulerKey; +import com.corundumstudio.socketio.scheduler.SchedulerKey.Type; public class HeartbeatHandler { private final Logger log = LoggerFactory.getLogger(getClass()); - private final CancelableScheduler scheduler; + private final CancelableScheduler scheduler; private final Configuration configuration; - public HeartbeatHandler(Configuration configuration, CancelableScheduler scheduler) { + public HeartbeatHandler(Configuration configuration, CancelableScheduler scheduler) { this.configuration = configuration; this.scheduler = scheduler; } public void onHeartbeat(final SocketIOClient client) { - scheduler.cancel(client.getSessionId()); + if (!configuration.isHeartbeatsEnabled()) { + return; + } + scheduler.cancel(new SchedulerKey(Type.HEARBEAT_TIMEOUT, client.getSessionId())); scheduler.schedule(new Runnable() { public void run() { - sendHeartbeat(client); + client.send(new Packet(PacketType.HEARTBEAT)); + scheduleClientHeartbeatCheck(client); } }, configuration.getHeartbeatInterval(), TimeUnit.SECONDS); } - public void sendHeartbeat(SocketIOClient client) { - client.send(new Packet(PacketType.HEARTBEAT)); - scheduleClientHeartbeatCheck(client); - } - private void scheduleClientHeartbeatCheck(final SocketIOClient client) { - scheduler.schedule(new Runnable() { + SchedulerKey key = new SchedulerKey(Type.HEARBEAT_TIMEOUT, client.getSessionId()); + scheduler.schedule(key, new Runnable() { public void run() { client.disconnect(); log.debug("Client with sessionId: {} disconnected due to heartbeat timeout", client.getSessionId()); diff --git a/src/main/java/com/corundumstudio/socketio/SocketIOPipelineFactory.java b/src/main/java/com/corundumstudio/socketio/SocketIOPipelineFactory.java index 468aa47..9f39777 100644 --- a/src/main/java/com/corundumstudio/socketio/SocketIOPipelineFactory.java +++ b/src/main/java/com/corundumstudio/socketio/SocketIOPipelineFactory.java @@ -17,8 +17,6 @@ package com.corundumstudio.socketio; import static org.jboss.netty.channel.Channels.pipeline; -import java.util.UUID; - import org.codehaus.jackson.map.ObjectMapper; import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipelineFactory; @@ -30,6 +28,7 @@ import org.slf4j.LoggerFactory; import com.corundumstudio.socketio.parser.Decoder; import com.corundumstudio.socketio.parser.Encoder; +import com.corundumstudio.socketio.scheduler.CancelableScheduler; import com.corundumstudio.socketio.transport.WebSocketTransport; import com.corundumstudio.socketio.transport.XHRPollingTransport; @@ -54,13 +53,13 @@ public class SocketIOPipelineFactory implements ChannelPipelineFactory, Disconne private SocketIOEncoder socketIOEncoder; private SocketIOListener socketIOHandler; - private CancelableScheduler scheduler; + private CancelableScheduler scheduler; private PacketHandler packetHandler; public void start(Configuration configuration) { this.socketIOHandler = configuration.getListener(); - scheduler = new CancelableScheduler(configuration.getHeartbeatThreadPoolSize()); + scheduler = new CancelableScheduler(configuration.getHeartbeatThreadPoolSize()); ObjectMapper objectMapper = configuration.getObjectMapper(); Encoder encoder = new Encoder(objectMapper); @@ -73,8 +72,8 @@ public class SocketIOPipelineFactory implements ChannelPipelineFactory, Disconne packetHandler = new PacketHandler(packetListener, decoder); authorizeHandler = new AuthorizeHandler(connectPath, socketIOHandler, scheduler, configuration); - xhrPollingTransport = new XHRPollingTransport(connectPath, this, scheduler, heartbeatHandler, authorizeHandler, configuration); - webSocketTransport = new WebSocketTransport(connectPath, this, authorizeHandler); + xhrPollingTransport = new XHRPollingTransport(connectPath, this, scheduler, authorizeHandler, configuration); + webSocketTransport = new WebSocketTransport(connectPath, this, authorizeHandler, heartbeatHandler); socketIOEncoder = new SocketIOEncoder(objectMapper, encoder); } diff --git a/src/main/java/com/corundumstudio/socketio/CancelableScheduler.java b/src/main/java/com/corundumstudio/socketio/scheduler/CancelableScheduler.java similarity index 83% rename from src/main/java/com/corundumstudio/socketio/CancelableScheduler.java rename to src/main/java/com/corundumstudio/socketio/scheduler/CancelableScheduler.java index 2561bed..dbad299 100644 --- a/src/main/java/com/corundumstudio/socketio/CancelableScheduler.java +++ b/src/main/java/com/corundumstudio/socketio/scheduler/CancelableScheduler.java @@ -13,7 +13,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package com.corundumstudio.socketio; +package com.corundumstudio.socketio.scheduler; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -22,16 +22,16 @@ import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -public class CancelableScheduler { +public class CancelableScheduler { - private final Map> scheduledFutures = new ConcurrentHashMap>(); + private final Map> scheduledFutures = new ConcurrentHashMap>(); private final ScheduledExecutorService executorService; public CancelableScheduler(int threadPoolSize) { executorService = Executors.newScheduledThreadPool(threadPoolSize); } - public void cancel(T key) { + public void cancel(SchedulerKey key) { Future future = scheduledFutures.remove(key); if (future != null) { future.cancel(false); @@ -42,7 +42,7 @@ public class CancelableScheduler { executorService.schedule(runnable, delay, unit); } - public void schedule(final T key, final Runnable runnable, long delay, TimeUnit unit) { + public void schedule(final SchedulerKey key, final Runnable runnable, long delay, TimeUnit unit) { Future future = executorService.schedule(new Runnable() { @Override public void run() { diff --git a/src/main/java/com/corundumstudio/socketio/scheduler/SchedulerKey.java b/src/main/java/com/corundumstudio/socketio/scheduler/SchedulerKey.java new file mode 100644 index 0000000..39da6fe --- /dev/null +++ b/src/main/java/com/corundumstudio/socketio/scheduler/SchedulerKey.java @@ -0,0 +1,61 @@ +/** + * Copyright 2012 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.corundumstudio.socketio.scheduler; + +import java.util.UUID; + +public class SchedulerKey { + + public enum Type {NOOP, HEARBEAT_TIMEOUT, CLOSE_TIMEOUT, AUTHORIZE}; + + private Type type; + private UUID sessionId; + + public SchedulerKey(Type type, UUID sessionId) { + this.type = type; + this.sessionId = sessionId; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + + ((sessionId == null) ? 0 : sessionId.hashCode()); + result = prime * result + ((type == null) ? 0 : type.hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + SchedulerKey other = (SchedulerKey) obj; + if (sessionId == null) { + if (other.sessionId != null) + return false; + } else if (!sessionId.equals(other.sessionId)) + return false; + if (type != other.type) + return false; + return true; + } + +} diff --git a/src/main/java/com/corundumstudio/socketio/transport/WebSocketTransport.java b/src/main/java/com/corundumstudio/socketio/transport/WebSocketTransport.java index 99a394d..93f81c7 100644 --- a/src/main/java/com/corundumstudio/socketio/transport/WebSocketTransport.java +++ b/src/main/java/com/corundumstudio/socketio/transport/WebSocketTransport.java @@ -22,11 +22,11 @@ import java.util.concurrent.ConcurrentHashMap; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelHandler.Sharable; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.Channels; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.SimpleChannelUpstreamHandler; -import org.jboss.netty.channel.ChannelHandler.Sharable; import org.jboss.netty.handler.codec.http.HttpHeaders; import org.jboss.netty.handler.codec.http.HttpRequest; import org.jboss.netty.handler.codec.http.QueryStringDecoder; @@ -34,12 +34,12 @@ import org.jboss.netty.handler.codec.http.websocketx.CloseWebSocketFrame; import org.jboss.netty.handler.codec.http.websocketx.TextWebSocketFrame; import org.jboss.netty.handler.codec.http.websocketx.WebSocketServerHandshaker; import org.jboss.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory; -import org.jboss.netty.util.CharsetUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.corundumstudio.socketio.AuthorizeHandler; import com.corundumstudio.socketio.Disconnectable; +import com.corundumstudio.socketio.HeartbeatHandler; import com.corundumstudio.socketio.SocketIOClient; import com.corundumstudio.socketio.messages.PacketsMessage; @@ -51,15 +51,18 @@ public class WebSocketTransport extends SimpleChannelUpstreamHandler implements private final Map sessionId2Client = new ConcurrentHashMap(); private final Map channelId2Client = new ConcurrentHashMap(); + private final HeartbeatHandler heartbeatHandler; private final AuthorizeHandler authorizeHandler; private final Disconnectable disconnectable; private final String path; - public WebSocketTransport(String connectPath, Disconnectable disconnectable, AuthorizeHandler authorizeHandler) { + public WebSocketTransport(String connectPath, Disconnectable disconnectable, + AuthorizeHandler authorizeHandler, HeartbeatHandler heartbeatHandler) { this.path = connectPath + "websocket"; this.authorizeHandler = authorizeHandler; this.disconnectable = disconnectable; + this.heartbeatHandler = heartbeatHandler; } @Override @@ -109,11 +112,6 @@ public class WebSocketTransport extends SimpleChannelUpstreamHandler implements private void receivePackets(ChannelHandlerContext ctx, ChannelBuffer channelBuffer) throws IOException { WebSocketClient client = channelId2Client.get(ctx.getChannel().getId()); - if (log.isTraceEnabled()) { - String content = channelBuffer.toString(CharsetUtil.UTF_8); - log.trace("In message: {} sessionId: {}", new Object[] {content, client.getSessionId()}); - } - Channels.fireMessageReceived(ctx.getChannel(), new PacketsMessage(client, channelBuffer)); } @@ -129,6 +127,8 @@ public class WebSocketTransport extends SimpleChannelUpstreamHandler implements channelId2Client.put(channel.getId(), client); sessionId2Client.put(sessionId, client); authorizeHandler.connect(client); + + heartbeatHandler.onHeartbeat(client); } private String getWebSocketLocation(HttpRequest req) { diff --git a/src/main/java/com/corundumstudio/socketio/transport/XHRPollingTransport.java b/src/main/java/com/corundumstudio/socketio/transport/XHRPollingTransport.java index 5989553..54306cc 100644 --- a/src/main/java/com/corundumstudio/socketio/transport/XHRPollingTransport.java +++ b/src/main/java/com/corundumstudio/socketio/transport/XHRPollingTransport.java @@ -24,11 +24,11 @@ import java.util.concurrent.TimeUnit; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.ChannelFutureListener; +import org.jboss.netty.channel.ChannelHandler.Sharable; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.Channels; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.SimpleChannelUpstreamHandler; -import org.jboss.netty.channel.ChannelHandler.Sharable; import org.jboss.netty.handler.codec.http.HttpHeaders; import org.jboss.netty.handler.codec.http.HttpMethod; import org.jboss.netty.handler.codec.http.HttpRequest; @@ -37,10 +37,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.corundumstudio.socketio.AuthorizeHandler; -import com.corundumstudio.socketio.CancelableScheduler; import com.corundumstudio.socketio.Configuration; import com.corundumstudio.socketio.Disconnectable; -import com.corundumstudio.socketio.HeartbeatHandler; import com.corundumstudio.socketio.SocketIOClient; import com.corundumstudio.socketio.messages.PacketsMessage; import com.corundumstudio.socketio.messages.XHRErrorMessage; @@ -49,6 +47,9 @@ import com.corundumstudio.socketio.parser.ErrorAdvice; import com.corundumstudio.socketio.parser.ErrorReason; import com.corundumstudio.socketio.parser.Packet; import com.corundumstudio.socketio.parser.PacketType; +import com.corundumstudio.socketio.scheduler.CancelableScheduler; +import com.corundumstudio.socketio.scheduler.SchedulerKey; +import com.corundumstudio.socketio.scheduler.SchedulerKey.Type; @Sharable public class XHRPollingTransport extends SimpleChannelUpstreamHandler implements Disconnectable { @@ -56,22 +57,20 @@ public class XHRPollingTransport extends SimpleChannelUpstreamHandler implements private final Logger log = LoggerFactory.getLogger(getClass()); private final Map sessionId2Client = new ConcurrentHashMap(); - private final CancelableScheduler disconnectScheduler; + private final CancelableScheduler scheduler; private final AuthorizeHandler authorizeHandler; - private final HeartbeatHandler heartbeatHandler; private final Disconnectable disconnectable; private final Configuration configuration; private final String path; - public XHRPollingTransport(String connectPath, Disconnectable disconnectable, CancelableScheduler scheduler, - HeartbeatHandler heartbeatHandler, AuthorizeHandler authorizeHandler, Configuration configuration) { + public XHRPollingTransport(String connectPath, Disconnectable disconnectable, CancelableScheduler scheduler, + AuthorizeHandler authorizeHandler, Configuration configuration) { this.path = connectPath + "xhr-polling/"; this.authorizeHandler = authorizeHandler; this.configuration = configuration; - this.heartbeatHandler = heartbeatHandler; this.disconnectable = disconnectable; - this.disconnectScheduler = scheduler; + this.scheduler = scheduler; } public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { @@ -86,8 +85,6 @@ public class XHRPollingTransport extends SimpleChannelUpstreamHandler implements if (parts.length > 3) { UUID sessionId = UUID.fromString(parts[4]); - scheduleDisconnect(channel, sessionId); - if (HttpMethod.POST.equals(req.getMethod())) { onPost(sessionId, channel, req); } else if (HttpMethod.GET.equals(req.getMethod())) { @@ -108,13 +105,28 @@ public class XHRPollingTransport extends SimpleChannelUpstreamHandler implements ctx.sendUpstream(e); } + private void scheduleNoop(Channel channel, final UUID sessionId) { + SchedulerKey key = new SchedulerKey(Type.NOOP, sessionId); + scheduler.cancel(key); + scheduler.schedule(key, new Runnable() { + @Override + public void run() { + XHRPollingClient client = sessionId2Client.get(sessionId); + if (client != null) { + client.send(new Packet(PacketType.NOOP)); + } + } + }, configuration.getPollingDuration(), TimeUnit.SECONDS); + } + private void scheduleDisconnect(Channel channel, final UUID sessionId) { - disconnectScheduler.cancel(sessionId); + final SchedulerKey key = new SchedulerKey(Type.CLOSE_TIMEOUT, sessionId); + scheduler.cancel(key); ChannelFuture future = channel.getCloseFuture(); future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { - disconnectScheduler.schedule(sessionId, new Runnable() { + scheduler.schedule(key, new Runnable() { @Override public void run() { XHRPollingClient client = sessionId2Client.get(sessionId); @@ -146,6 +158,7 @@ public class XHRPollingTransport extends SimpleChannelUpstreamHandler implements sendError(channel, req, sessionId); return; } + String origin = req.getHeader(HttpHeaders.Names.ORIGIN); XHRPollingClient client = sessionId2Client.get(sessionId); if (client == null) { @@ -153,17 +166,17 @@ public class XHRPollingTransport extends SimpleChannelUpstreamHandler implements } client.update(channel, origin); + + scheduleDisconnect(channel, sessionId); + scheduleNoop(channel, sessionId); } - private XHRPollingClient createClient(String origin, Channel channel, UUID sessionId) { + private XHRPollingClient createClient(String origin, Channel channel, UUID sessionId) { XHRPollingClient client = new XHRPollingClient(authorizeHandler, sessionId); sessionId2Client.put(sessionId, client); client.update(channel, origin); authorizeHandler.connect(client); - if (configuration.isHeartbeatsEnabled()) { - heartbeatHandler.sendHeartbeat(client); - } log.debug("Client for sessionId: {} was created", sessionId); return client; }