From 9a37a6ad9fbbecdcab23666a641ce7e5e53c76f7 Mon Sep 17 00:00:00 2001 From: Nikita Koksharov Date: Fri, 11 Mar 2016 14:47:16 +0300 Subject: [PATCH 1/7] Update README.md --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index e947cef..93f97b2 100644 --- a/README.md +++ b/README.md @@ -17,8 +17,8 @@ Features * Supports namespaces and rooms * Supports ack (acknowledgment of received data) * Supports SSL -* Supports client store (Memory, [Redisson](https://github.com/mrniko/redisson), [Hazelcast](http://www.hazelcast.com/)) -* Supports distributed broadcast across netty-socketio nodes ([Redisson](https://github.com/mrniko/redisson), [Hazelcast](http://www.hazelcast.com/)) +* Supports client store (Memory, [Redisson](http://redisson.org), [Hazelcast](http://www.hazelcast.com/)) +* Supports distributed broadcast across netty-socketio nodes ([Redisson](http://redisson.org), [Hazelcast](http://www.hazelcast.com/)) * Supports OSGi * Supports Spring * Lock-free and thread-safe implementation From 9334b509859573031f0a924c817931dff13b89dc Mon Sep 17 00:00:00 2001 From: Leonid Khachaturov Date: Tue, 15 Mar 2016 15:04:27 +0100 Subject: [PATCH 2/7] Fix compatibility with standard Markdown --- README.md | 52 ++++++++++++++++++++++++++-------------------------- 1 file changed, 26 insertions(+), 26 deletions(-) diff --git a/README.md b/README.md index 93f97b2..3b39039 100644 --- a/README.md +++ b/README.md @@ -48,13 +48,13 @@ Zipwhip: [zipwhip.com](https://zipwhip.com/) Recent Releases ================================ -####Please Note: trunk is current development branch. +#### Please Note: trunk is current development branch. -####4-Mar-2015 - version 1.7.10 released +#### 4-Mar-2015 - version 1.7.10 released Fixed - netty updated to 4.1.0.CR3 version Fixed - binary packet parsing (thanks to Winston Li) -####6-Feb-2015 - version 1.7.9 released +#### 6-Feb-2015 - version 1.7.9 released Feature - Compression support Fixed - DotNET client request handling Fixed - Packet length format parsing @@ -63,46 +63,46 @@ Fixed - Polling clients sporatically get prematurely disconnected (thanks to lpa Fixed - connections stay open forever if server sent `close` packet Fixed - compatibility with Redisson latest version -####30-Nov-2015 - version 1.7.8 released +#### 30-Nov-2015 - version 1.7.8 released Improvement - `WebSocketServerHandshaker.allowExtensions` is `true` now Improvement - SessionID cookie implementation (thanks to @ryandietrich) Fixed - clientRooms leak (thanks to @andreaspalm) Fixed - ExceptionListener not used for errors in JSON parsing Fixed - "silent channel" attack -####26-Mar-2015 - version 1.6.7 released +#### 26-Mar-2015 - version 1.6.7 released Improvement - `useStrictOrdering` param added for websocket packets strict ordering Improvement - `FAIL_ON_EMPTY_BEANS = false` option setted in json decoder -####18-Feb-2015 - version 1.7.7 released +#### 18-Feb-2015 - version 1.7.7 released Improvement - no need to add jackson lib if you use own JsonSupport impl Fixed - SocketIO client 1.3.x support Fixed - Charset encoding handling (thanks to alim-akbashev) -####17-Jan-2015 - version 1.7.6 released +#### 17-Jan-2015 - version 1.7.6 released Improvement - `SocketIONamespace.getName()` added Fixed - WebSocket frames aggregation Fixed - WebSocket buffer release Fixed - `Unexpected end-of-input in VALUE_STRING` error Fixed - Access-Control-Allow-Credentials is TRUE for requests with origin header -####05-Dec-2014 - version 1.7.5 released +#### 05-Dec-2014 - version 1.7.5 released Feature - `Configuration.sslProtocol` param added Fixed - BinaryEvent ack handling Fixed - BinaryEvent non b64 encoding/decoding Fixed - buffer leak during packet encoding -####15-Nov-2014 - version 1.7.4 released +#### 15-Nov-2014 - version 1.7.4 released Fixed - packet encoding Fixed - BinaryEvent encoding/decoding Fixed - unchallenged connections handling -####29-Sep-2014 - version 1.6.6 released +#### 29-Sep-2014 - version 1.6.6 released Feature - `origin` setting added Feature - `crossDomainPolicy` setting added Feature - `SocketIOServer.startAsync` method added -####24-Sep-2014 - version 1.7.3 released +#### 24-Sep-2014 - version 1.7.3 released Feature - Epoll support Improvement - BinaryEvent support Fixed - SocketIOClient disconnect handling @@ -111,23 +111,23 @@ Fixed - NPE then no transport defined during auth Fixed - ping timeout for polling transport Fixed - buffer leak in PacketEncoder -####22-Aug-2014 - version 1.7.2 released +#### 22-Aug-2014 - version 1.7.2 released Fixed - wrong outgoing message encoding using websocket transport Fixed - NPE in websocket transport Fixed - multiple packet decoding in polling transport Fixed - buffer leak -####07-Jul-2014 - version 1.7.1 released +#### 07-Jul-2014 - version 1.7.1 released Feature - ability to set custom `Access-Control-Allow-Origin` via Configuration.origin Fixed - connection via CLI socket.io-client -####28-Jun-2014 - version 1.7.0 released +#### 28-Jun-2014 - version 1.7.0 released Feature - Socket.IO 1.0 protocol support. Thanks to the new protocol decoding/encoding has speedup __Dropped__ - `SocketIOClient.sendMessage`, `SocketIOClient.sendJsonObject` methods and corresponding listeners __Dropped__ - Flashsocket transport support __Dropped__ - protocol version 0.7 ... 0.9.16 -####13-May-2014 - version 1.6.5 released +#### 13-May-2014 - version 1.6.5 released Improvement - single packet encoding optimized, used mostly in WebSocket transport. Encoding time reduced up to 40% (thanks to Viktor Endersz) Improvement - rooms handling optimized Improvement - ExceptionListener.exceptionCaught method added @@ -137,7 +137,7 @@ Feature - maxFramePayloadLength setting added Feature - getAllClients and getClient methods added to SocketIONamespace Fixed - SocketIOServer.getAllClients returns wrong clients amount -####25-Mar-2014 - version 1.6.4 released +#### 25-Mar-2014 - version 1.6.4 released Fixed - message release problem Fixed - problem with exception listener configuration redefinition __Breaking api change__ - DataListener.onData now throws Exception @@ -145,7 +145,7 @@ Improvement - data parameter added to exception listener Improvement - ability to setup socket configuration Improvement - Configuration.autoAck parameter added -####06-Mar-2014 - version 1.6.3 released +#### 06-Mar-2014 - version 1.6.3 released Fixed - AckCallback handling during client disconnect Fixed - unauthorized handshake HTTP code changed to 401 __Breaking api change__ - Configuration.heartbeatThreadPoolSize setting removed @@ -153,7 +153,7 @@ Feature - annotated Spring beans support via _SpringAnnotationScanner_ Feature - common exception listener Improvement - _ScheduledExecutorService_ replaced with _HashedWheelTimer_ -####08-Feb-2014 - version 1.6.2 released +#### 08-Feb-2014 - version 1.6.2 released Fixed - wrong namespace client disconnect handling Fixed - exception in onConnect/onDisconnect/isAuthorized methods leads to server hang __Breaking api change__ - SocketIOClient.sendEvent methods signature changed @@ -162,11 +162,11 @@ Improvement - multi type events ack support via _MultiTypeAckCallback_ Improvement - SocketIOClient.getHandshakeData method added Improvement - Jedis replaced with [Redisson](https://github.com/mrniko/redisson) -####14-Jan-2014 - version 1.6.1 released +#### 14-Jan-2014 - version 1.6.1 released Fixed - JDK 1.6+ compatibility Feature - authorization support -####19-Dec-2013 - version 1.6.0 released +#### 19-Dec-2013 - version 1.6.0 released Fixed - XHR-pooling transport regression Fixed - Websocket transport regression Fixed - namespace NPE in PacketHandler @@ -177,33 +177,33 @@ Feature - OSGi support (thanks to rdevera) Improvement - XHR-pooling optimization Improvement - SocketIOClient.getAllRooms method added -####07-Dec-2013 - version 1.5.4 released +#### 07-Dec-2013 - version 1.5.4 released Fixed - flash policy "request leak" after page reload (thanks to ntrp) Fixed - websocket swf loading (thanks to ntrp) Fixed - wrong urls causes a potential DDoS Fixed - Event.class package visibility changed to avoid direct usage Improvement - Simplified Jackson modules registration -####24-Oct-2013 - version 1.5.2 released +#### 24-Oct-2013 - version 1.5.2 released Fixed - NPE during shutdown Improvement - isEmpty method added to Namespace -####13-Oct-2013 - version 1.5.1 released +#### 13-Oct-2013 - version 1.5.1 released Fixed - wrong ack timeout callback invocation Fixed - bigdecimal serialization for JSON Fixed - infinity loop during packet handling exception Fixed - 'client not found' handling -####27-Aug-2013 - version 1.5.0 released +#### 27-Aug-2013 - version 1.5.0 released Improvement - encoding buffers allocation optimization. Improvement - encoding buffers now pooled in memory to reduce GC pressure (netty 4.x feature). -####03-Aug-2013 - version 1.0.1 released +#### 03-Aug-2013 - version 1.0.1 released Fixed - error on unknown property during deserialization. Fixed - memory leak in long polling transport. Improvement - logging error info with inbound data. -####07-Jun-2013 - version 1.0.0 released +#### 07-Jun-2013 - version 1.0.0 released First stable release. From 1f7446248456838cc1dc4f4a3b2140d38e1e921e Mon Sep 17 00:00:00 2001 From: whg Date: Fri, 15 Apr 2016 21:36:20 +0800 Subject: [PATCH 3/7] 'TODO refactor to enum' in PubSubStore interface --- .../socketio/BroadcastOperations.java | 4 ++-- .../socketio/SocketIOChannelInitializer.java | 6 ++---- .../socketio/handler/AuthorizeHandler.java | 4 ++-- .../socketio/namespace/Namespace.java | 9 +++++---- .../store/pubsub/BaseStoreFactory.java | 20 +++++++++---------- .../socketio/store/pubsub/PubSubStore.java | 16 ++------------- .../socketio/store/pubsub/PubSubType.java | 16 +++++++++++++++ 7 files changed, 39 insertions(+), 36 deletions(-) create mode 100644 src/main/java/com/corundumstudio/socketio/store/pubsub/PubSubType.java diff --git a/src/main/java/com/corundumstudio/socketio/BroadcastOperations.java b/src/main/java/com/corundumstudio/socketio/BroadcastOperations.java index 2529355..d1ded5d 100644 --- a/src/main/java/com/corundumstudio/socketio/BroadcastOperations.java +++ b/src/main/java/com/corundumstudio/socketio/BroadcastOperations.java @@ -30,7 +30,7 @@ import com.corundumstudio.socketio.protocol.Packet; import com.corundumstudio.socketio.protocol.PacketType; import com.corundumstudio.socketio.store.StoreFactory; import com.corundumstudio.socketio.store.pubsub.DispatchMessage; -import com.corundumstudio.socketio.store.pubsub.PubSubStore; +import com.corundumstudio.socketio.store.pubsub.PubSubType; /** * Fully thread-safe. @@ -62,7 +62,7 @@ public class BroadcastOperations implements ClientOperations { private void dispatch(Packet packet) { for (Entry> entry : namespaceRooms.entrySet()) { for (String room : entry.getValue()) { - storeFactory.pubSubStore().publish(PubSubStore.DISPATCH, new DispatchMessage(room, packet, entry.getKey())); + storeFactory.pubSubStore().publish(PubSubType.DISPATCH, new DispatchMessage(room, packet, entry.getKey())); } } } diff --git a/src/main/java/com/corundumstudio/socketio/SocketIOChannelInitializer.java b/src/main/java/com/corundumstudio/socketio/SocketIOChannelInitializer.java index 4512848..28417b3 100644 --- a/src/main/java/com/corundumstudio/socketio/SocketIOChannelInitializer.java +++ b/src/main/java/com/corundumstudio/socketio/SocketIOChannelInitializer.java @@ -42,7 +42,7 @@ import com.corundumstudio.socketio.scheduler.CancelableScheduler; import com.corundumstudio.socketio.scheduler.HashedWheelTimeoutScheduler; import com.corundumstudio.socketio.store.StoreFactory; import com.corundumstudio.socketio.store.pubsub.DisconnectMessage; -import com.corundumstudio.socketio.store.pubsub.PubSubStore; +import com.corundumstudio.socketio.store.pubsub.PubSubType; import com.corundumstudio.socketio.transport.PollingTransport; import com.corundumstudio.socketio.transport.WebSocketTransport; @@ -54,9 +54,7 @@ import io.netty.handler.codec.http.HttpContentCompressor; import io.netty.handler.codec.http.HttpMessage; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpRequestDecoder; -import io.netty.handler.codec.http.HttpResponse; import io.netty.handler.codec.http.HttpResponseEncoder; -import io.netty.handler.codec.http.HttpUtil; import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler; import io.netty.handler.ssl.SslHandler; @@ -224,7 +222,7 @@ public class SocketIOChannelInitializer extends ChannelInitializer impl authorizeHandler.onDisconnect(client); configuration.getStoreFactory().onDisconnect(client); - configuration.getStoreFactory().pubSubStore().publish(PubSubStore.DISCONNECT, new DisconnectMessage(client.getSessionId())); + configuration.getStoreFactory().pubSubStore().publish(PubSubType.DISCONNECT, new DisconnectMessage(client.getSessionId())); log.debug("Client with sessionId: {} disconnected", client.getSessionId()); } diff --git a/src/main/java/com/corundumstudio/socketio/handler/AuthorizeHandler.java b/src/main/java/com/corundumstudio/socketio/handler/AuthorizeHandler.java index 8d6aa97..c8660fa 100644 --- a/src/main/java/com/corundumstudio/socketio/handler/AuthorizeHandler.java +++ b/src/main/java/com/corundumstudio/socketio/handler/AuthorizeHandler.java @@ -45,7 +45,7 @@ import com.corundumstudio.socketio.scheduler.SchedulerKey; import com.corundumstudio.socketio.scheduler.SchedulerKey.Type; import com.corundumstudio.socketio.store.StoreFactory; import com.corundumstudio.socketio.store.pubsub.ConnectMessage; -import com.corundumstudio.socketio.store.pubsub.PubSubStore; +import com.corundumstudio.socketio.store.pubsub.PubSubType; import io.netty.channel.Channel; import io.netty.channel.ChannelFutureListener; @@ -223,7 +223,7 @@ public class AuthorizeHandler extends ChannelInboundHandlerAdapter implements Di packet.setSubType(PacketType.CONNECT); client.send(packet); - configuration.getStoreFactory().pubSubStore().publish(PubSubStore.CONNECT, new ConnectMessage(client.getSessionId())); + configuration.getStoreFactory().pubSubStore().publish(PubSubType.CONNECT, new ConnectMessage(client.getSessionId())); SocketIOClient nsClient = client.addNamespaceClient(ns); ns.onConnect(nsClient); diff --git a/src/main/java/com/corundumstudio/socketio/namespace/Namespace.java b/src/main/java/com/corundumstudio/socketio/namespace/Namespace.java index c5de8dd..5b6d72c 100644 --- a/src/main/java/com/corundumstudio/socketio/namespace/Namespace.java +++ b/src/main/java/com/corundumstudio/socketio/namespace/Namespace.java @@ -46,6 +46,7 @@ import com.corundumstudio.socketio.protocol.Packet; import com.corundumstudio.socketio.store.StoreFactory; import com.corundumstudio.socketio.store.pubsub.JoinLeaveMessage; import com.corundumstudio.socketio.store.pubsub.PubSubStore; +import com.corundumstudio.socketio.store.pubsub.PubSubType; import com.corundumstudio.socketio.transport.NamespaceClient; /** @@ -172,7 +173,7 @@ public class Namespace implements SocketIONamespace { allClients.remove(client.getSessionId()); leave(getName(), client.getSessionId()); - storeFactory.pubSubStore().publish(PubSubStore.LEAVE, new JoinLeaveMessage(client.getSessionId(), getName(), getName())); + storeFactory.pubSubStore().publish(PubSubType.LEAVE, new JoinLeaveMessage(client.getSessionId(), getName(), getName())); try { for (DisconnectListener listener : disconnectListeners) { @@ -190,7 +191,7 @@ public class Namespace implements SocketIONamespace { public void onConnect(SocketIOClient client) { join(getName(), client.getSessionId()); - storeFactory.pubSubStore().publish(PubSubStore.JOIN, new JoinLeaveMessage(client.getSessionId(), getName(), getName())); + storeFactory.pubSubStore().publish(PubSubType.JOIN, new JoinLeaveMessage(client.getSessionId(), getName(), getName())); try { for (ConnectListener listener : connectListeners) { @@ -248,7 +249,7 @@ public class Namespace implements SocketIONamespace { public void joinRoom(String room, UUID sessionId) { join(room, sessionId); - storeFactory.pubSubStore().publish(PubSubStore.JOIN, new JoinLeaveMessage(sessionId, room, getName())); + storeFactory.pubSubStore().publish(PubSubType.JOIN, new JoinLeaveMessage(sessionId, room, getName())); } public void dispatch(String room, Packet packet) { @@ -283,7 +284,7 @@ public class Namespace implements SocketIONamespace { public void leaveRoom(String room, UUID sessionId) { leave(room, sessionId); - storeFactory.pubSubStore().publish(PubSubStore.LEAVE, new JoinLeaveMessage(sessionId, room, getName())); + storeFactory.pubSubStore().publish(PubSubType.LEAVE, new JoinLeaveMessage(sessionId, room, getName())); } private void leave(ConcurrentMap> map, K room, V sessionId) { diff --git a/src/main/java/com/corundumstudio/socketio/store/pubsub/BaseStoreFactory.java b/src/main/java/com/corundumstudio/socketio/store/pubsub/BaseStoreFactory.java index 871611f..78c6007 100644 --- a/src/main/java/com/corundumstudio/socketio/store/pubsub/BaseStoreFactory.java +++ b/src/main/java/com/corundumstudio/socketio/store/pubsub/BaseStoreFactory.java @@ -36,48 +36,48 @@ public abstract class BaseStoreFactory implements StoreFactory { @Override public void init(final NamespacesHub namespacesHub, final AuthorizeHandler authorizeHandler, JsonSupport jsonSupport) { - pubSubStore().subscribe(PubSubStore.DISCONNECT, new PubSubListener() { + pubSubStore().subscribe(PubSubType.DISCONNECT, new PubSubListener() { @Override public void onMessage(DisconnectMessage msg) { - log.debug("{} sessionId: {}", PubSubStore.DISCONNECT, msg.getSessionId()); + log.debug("{} sessionId: {}", PubSubType.DISCONNECT, msg.getSessionId()); } }, DisconnectMessage.class); - pubSubStore().subscribe(PubSubStore.CONNECT, new PubSubListener() { + pubSubStore().subscribe(PubSubType.CONNECT, new PubSubListener() { @Override public void onMessage(ConnectMessage msg) { authorizeHandler.connect(msg.getSessionId()); - log.debug("{} sessionId: {}", PubSubStore.CONNECT, msg.getSessionId()); + log.debug("{} sessionId: {}", PubSubType.CONNECT, msg.getSessionId()); } }, ConnectMessage.class); - pubSubStore().subscribe(PubSubStore.DISPATCH, new PubSubListener() { + pubSubStore().subscribe(PubSubType.DISPATCH, new PubSubListener() { @Override public void onMessage(DispatchMessage msg) { String name = msg.getRoom(); namespacesHub.get(msg.getNamespace()).dispatch(name, msg.getPacket()); - log.debug("{} packet: {}", PubSubStore.DISPATCH, msg.getPacket()); + log.debug("{} packet: {}", PubSubType.DISPATCH, msg.getPacket()); } }, DispatchMessage.class); - pubSubStore().subscribe(PubSubStore.JOIN, new PubSubListener() { + pubSubStore().subscribe(PubSubType.JOIN, new PubSubListener() { @Override public void onMessage(JoinLeaveMessage msg) { String name = msg.getRoom(); namespacesHub.get(msg.getNamespace()).join(name, msg.getSessionId()); - log.debug("{} sessionId: {}", PubSubStore.JOIN, msg.getSessionId()); + log.debug("{} sessionId: {}", PubSubType.JOIN, msg.getSessionId()); } }, JoinLeaveMessage.class); - pubSubStore().subscribe(PubSubStore.LEAVE, new PubSubListener() { + pubSubStore().subscribe(PubSubType.LEAVE, new PubSubListener() { @Override public void onMessage(JoinLeaveMessage msg) { String name = msg.getRoom(); namespacesHub.get(msg.getNamespace()).leave(name, msg.getSessionId()); - log.debug("{} sessionId: {}", PubSubStore.LEAVE, msg.getSessionId()); + log.debug("{} sessionId: {}", PubSubType.LEAVE, msg.getSessionId()); } }, JoinLeaveMessage.class); } diff --git a/src/main/java/com/corundumstudio/socketio/store/pubsub/PubSubStore.java b/src/main/java/com/corundumstudio/socketio/store/pubsub/PubSubStore.java index c845db1..998931e 100644 --- a/src/main/java/com/corundumstudio/socketio/store/pubsub/PubSubStore.java +++ b/src/main/java/com/corundumstudio/socketio/store/pubsub/PubSubStore.java @@ -18,21 +18,9 @@ package com.corundumstudio.socketio.store.pubsub; public interface PubSubStore { - // TODO refactor to enum - String CONNECT = "connect"; + void publish(PubSubType type, PubSubMessage msg); - String DISCONNECT = "disconnect"; - - String JOIN = "join"; - - String LEAVE = "leave"; - - String DISPATCH = "dispatch"; - - - void publish(String name, PubSubMessage msg); - - void subscribe(String name, PubSubListener listener, Class clazz); + void subscribe(PubSubType type, PubSubListener listener, Class clazz); void unsubscribe(String name); diff --git a/src/main/java/com/corundumstudio/socketio/store/pubsub/PubSubType.java b/src/main/java/com/corundumstudio/socketio/store/pubsub/PubSubType.java new file mode 100644 index 0000000..4321b47 --- /dev/null +++ b/src/main/java/com/corundumstudio/socketio/store/pubsub/PubSubType.java @@ -0,0 +1,16 @@ +package com.corundumstudio.socketio.store.pubsub; + +public enum PubSubType { + + CONNECT, + DISCONNECT, + JOIN, + LEAVE, + DISPATCH; + + @Override + public String toString() { + return name().toLowerCase(); + } + +} From 5f771b703b2d3a0191f64d65942dfacdb45b26a2 Mon Sep 17 00:00:00 2001 From: whg333 Date: Sat, 16 Apr 2016 00:23:42 +0800 Subject: [PATCH 4/7] fix implementation classes of the PubSubStore interface with PubSubType --- .../socketio/store/HazelcastPubSubStore.java | 13 ++++++++----- .../socketio/store/MemoryPubSubStore.java | 7 ++++--- .../socketio/store/RedissonPubSubStore.java | 12 +++++++----- .../socketio/store/pubsub/PubSubStore.java | 2 +- 4 files changed, 20 insertions(+), 14 deletions(-) diff --git a/src/main/java/com/corundumstudio/socketio/store/HazelcastPubSubStore.java b/src/main/java/com/corundumstudio/socketio/store/HazelcastPubSubStore.java index 987e88b..a8d0509 100644 --- a/src/main/java/com/corundumstudio/socketio/store/HazelcastPubSubStore.java +++ b/src/main/java/com/corundumstudio/socketio/store/HazelcastPubSubStore.java @@ -24,6 +24,7 @@ import java.util.concurrent.ConcurrentMap; import com.corundumstudio.socketio.store.pubsub.PubSubListener; import com.corundumstudio.socketio.store.pubsub.PubSubMessage; import com.corundumstudio.socketio.store.pubsub.PubSubStore; +import com.corundumstudio.socketio.store.pubsub.PubSubType; import com.hazelcast.core.HazelcastInstance; import com.hazelcast.core.ITopic; import com.hazelcast.core.Message; @@ -45,14 +46,15 @@ public class HazelcastPubSubStore implements PubSubStore { } @Override - public void publish(String name, PubSubMessage msg) { + public void publish(PubSubType type, PubSubMessage msg) { msg.setNodeId(nodeId); - hazelcastPub.getTopic(name).publish(msg); + hazelcastPub.getTopic(type.toString()).publish(msg); } @Override - public void subscribe(String name, final PubSubListener listener, Class clazz) { - ITopic topic = hazelcastSub.getTopic(name); + public void subscribe(PubSubType type, final PubSubListener listener, Class clazz) { + String name = type.toString(); + ITopic topic = hazelcastSub.getTopic(name); String regId = topic.addMessageListener(new MessageListener() { @Override public void onMessage(Message message) { @@ -75,7 +77,8 @@ public class HazelcastPubSubStore implements PubSubStore { } @Override - public void unsubscribe(String name) { + public void unsubscribe(PubSubType type) { + String name = type.toString(); Queue regIds = map.remove(name); ITopic topic = hazelcastSub.getTopic(name); for (String id : regIds) { diff --git a/src/main/java/com/corundumstudio/socketio/store/MemoryPubSubStore.java b/src/main/java/com/corundumstudio/socketio/store/MemoryPubSubStore.java index c912eb5..b472682 100644 --- a/src/main/java/com/corundumstudio/socketio/store/MemoryPubSubStore.java +++ b/src/main/java/com/corundumstudio/socketio/store/MemoryPubSubStore.java @@ -18,19 +18,20 @@ package com.corundumstudio.socketio.store; import com.corundumstudio.socketio.store.pubsub.PubSubListener; import com.corundumstudio.socketio.store.pubsub.PubSubMessage; import com.corundumstudio.socketio.store.pubsub.PubSubStore; +import com.corundumstudio.socketio.store.pubsub.PubSubType; public class MemoryPubSubStore implements PubSubStore { @Override - public void publish(String name, PubSubMessage msg) { + public void publish(PubSubType type, PubSubMessage msg) { } @Override - public void subscribe(String name, PubSubListener listener, Class clazz) { + public void subscribe(PubSubType type, PubSubListener listener, Class clazz) { } @Override - public void unsubscribe(String name) { + public void unsubscribe(PubSubType type) { } @Override diff --git a/src/main/java/com/corundumstudio/socketio/store/RedissonPubSubStore.java b/src/main/java/com/corundumstudio/socketio/store/RedissonPubSubStore.java index 7e5c864..e57216f 100644 --- a/src/main/java/com/corundumstudio/socketio/store/RedissonPubSubStore.java +++ b/src/main/java/com/corundumstudio/socketio/store/RedissonPubSubStore.java @@ -44,14 +44,15 @@ public class RedissonPubSubStore implements PubSubStore { } @Override - public void publish(String name, PubSubMessage msg) { + public void publish(com.corundumstudio.socketio.store.pubsub.PubSubType type, PubSubMessage msg) { msg.setNodeId(nodeId); - redissonPub.getTopic(name).publish(msg); + redissonPub.getTopic(type.toString()).publish(msg); } @Override - public void subscribe(String name, final PubSubListener listener, Class clazz) { - RTopic topic = redissonSub.getTopic(name); + public void subscribe(com.corundumstudio.socketio.store.pubsub.PubSubType type, final PubSubListener listener, Class clazz) { + String name = type.toString(); + RTopic topic = redissonSub.getTopic(name); int regId = topic.addListener(new MessageListener() { @Override public void onMessage(String channel, T msg) { @@ -73,7 +74,8 @@ public class RedissonPubSubStore implements PubSubStore { } @Override - public void unsubscribe(String name) { + public void unsubscribe(com.corundumstudio.socketio.store.pubsub.PubSubType type) { + String name = type.toString(); Queue regIds = map.remove(name); RTopic topic = redissonSub.getTopic(name); for (Integer id : regIds) { diff --git a/src/main/java/com/corundumstudio/socketio/store/pubsub/PubSubStore.java b/src/main/java/com/corundumstudio/socketio/store/pubsub/PubSubStore.java index 998931e..4bdcf83 100644 --- a/src/main/java/com/corundumstudio/socketio/store/pubsub/PubSubStore.java +++ b/src/main/java/com/corundumstudio/socketio/store/pubsub/PubSubStore.java @@ -22,7 +22,7 @@ public interface PubSubStore { void subscribe(PubSubType type, PubSubListener listener, Class clazz); - void unsubscribe(String name); + void unsubscribe(PubSubType type); void shutdown(); From 805dfbd6d98b1d48b4494db7aef48f8137ab58d9 Mon Sep 17 00:00:00 2001 From: whg333 Date: Sat, 16 Apr 2016 00:34:49 +0800 Subject: [PATCH 5/7] 4 spaces for tabs --- .../socketio/store/HazelcastPubSubStore.java | 4 ++-- .../socketio/store/RedissonPubSubStore.java | 4 ++-- .../socketio/store/pubsub/PubSubType.java | 20 ++++++++----------- 3 files changed, 12 insertions(+), 16 deletions(-) diff --git a/src/main/java/com/corundumstudio/socketio/store/HazelcastPubSubStore.java b/src/main/java/com/corundumstudio/socketio/store/HazelcastPubSubStore.java index a8d0509..815b9c8 100644 --- a/src/main/java/com/corundumstudio/socketio/store/HazelcastPubSubStore.java +++ b/src/main/java/com/corundumstudio/socketio/store/HazelcastPubSubStore.java @@ -54,7 +54,7 @@ public class HazelcastPubSubStore implements PubSubStore { @Override public void subscribe(PubSubType type, final PubSubListener listener, Class clazz) { String name = type.toString(); - ITopic topic = hazelcastSub.getTopic(name); + ITopic topic = hazelcastSub.getTopic(name); String regId = topic.addMessageListener(new MessageListener() { @Override public void onMessage(Message message) { @@ -78,7 +78,7 @@ public class HazelcastPubSubStore implements PubSubStore { @Override public void unsubscribe(PubSubType type) { - String name = type.toString(); + String name = type.toString(); Queue regIds = map.remove(name); ITopic topic = hazelcastSub.getTopic(name); for (String id : regIds) { diff --git a/src/main/java/com/corundumstudio/socketio/store/RedissonPubSubStore.java b/src/main/java/com/corundumstudio/socketio/store/RedissonPubSubStore.java index e57216f..fee1d67 100644 --- a/src/main/java/com/corundumstudio/socketio/store/RedissonPubSubStore.java +++ b/src/main/java/com/corundumstudio/socketio/store/RedissonPubSubStore.java @@ -52,7 +52,7 @@ public class RedissonPubSubStore implements PubSubStore { @Override public void subscribe(com.corundumstudio.socketio.store.pubsub.PubSubType type, final PubSubListener listener, Class clazz) { String name = type.toString(); - RTopic topic = redissonSub.getTopic(name); + RTopic topic = redissonSub.getTopic(name); int regId = topic.addListener(new MessageListener() { @Override public void onMessage(String channel, T msg) { @@ -75,7 +75,7 @@ public class RedissonPubSubStore implements PubSubStore { @Override public void unsubscribe(com.corundumstudio.socketio.store.pubsub.PubSubType type) { - String name = type.toString(); + String name = type.toString(); Queue regIds = map.remove(name); RTopic topic = redissonSub.getTopic(name); for (Integer id : regIds) { diff --git a/src/main/java/com/corundumstudio/socketio/store/pubsub/PubSubType.java b/src/main/java/com/corundumstudio/socketio/store/pubsub/PubSubType.java index 4321b47..eef309b 100644 --- a/src/main/java/com/corundumstudio/socketio/store/pubsub/PubSubType.java +++ b/src/main/java/com/corundumstudio/socketio/store/pubsub/PubSubType.java @@ -1,16 +1,12 @@ package com.corundumstudio.socketio.store.pubsub; public enum PubSubType { - - CONNECT, - DISCONNECT, - JOIN, - LEAVE, - DISPATCH; - - @Override - public String toString() { - return name().toLowerCase(); - } - + + CONNECT, DISCONNECT, JOIN, LEAVE, DISPATCH; + + @Override + public String toString() { + return name().toLowerCase(); + } + } From e00cd159c9fcb6504a9d978945d493e2a18551c6 Mon Sep 17 00:00:00 2001 From: whg333 Date: Sat, 16 Apr 2016 00:50:23 +0800 Subject: [PATCH 6/7] import the socketio PubSubType not the redisson PubSubType --- .../corundumstudio/socketio/store/RedissonPubSubStore.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/corundumstudio/socketio/store/RedissonPubSubStore.java b/src/main/java/com/corundumstudio/socketio/store/RedissonPubSubStore.java index fee1d67..2facb2d 100644 --- a/src/main/java/com/corundumstudio/socketio/store/RedissonPubSubStore.java +++ b/src/main/java/com/corundumstudio/socketio/store/RedissonPubSubStore.java @@ -26,6 +26,7 @@ import org.redisson.core.RTopic; import com.corundumstudio.socketio.store.pubsub.PubSubListener; import com.corundumstudio.socketio.store.pubsub.PubSubMessage; import com.corundumstudio.socketio.store.pubsub.PubSubStore; +import com.corundumstudio.socketio.store.pubsub.PubSubType; import io.netty.util.internal.PlatformDependent; @@ -44,13 +45,13 @@ public class RedissonPubSubStore implements PubSubStore { } @Override - public void publish(com.corundumstudio.socketio.store.pubsub.PubSubType type, PubSubMessage msg) { + public void publish(PubSubType type, PubSubMessage msg) { msg.setNodeId(nodeId); redissonPub.getTopic(type.toString()).publish(msg); } @Override - public void subscribe(com.corundumstudio.socketio.store.pubsub.PubSubType type, final PubSubListener listener, Class clazz) { + public void subscribe(PubSubType type, final PubSubListener listener, Class clazz) { String name = type.toString(); RTopic topic = redissonSub.getTopic(name); int regId = topic.addListener(new MessageListener() { @@ -74,7 +75,7 @@ public class RedissonPubSubStore implements PubSubStore { } @Override - public void unsubscribe(com.corundumstudio.socketio.store.pubsub.PubSubType type) { + public void unsubscribe(PubSubType type) { String name = type.toString(); Queue regIds = map.remove(name); RTopic topic = redissonSub.getTopic(name); From 3eccd8f5aff361be6d32042b525a543f54364882 Mon Sep 17 00:00:00 2001 From: Johnny Marnell Date: Sat, 21 May 2016 16:03:56 -0400 Subject: [PATCH 7/7] Change Jackson support members from private to protected - Allows easy overrides of Jackson internals (notably, ObjectMapper) to easily extend JacksonJsonSupport and override with custom config --- .../socketio/protocol/JacksonJsonSupport.java | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/main/java/com/corundumstudio/socketio/protocol/JacksonJsonSupport.java b/src/main/java/com/corundumstudio/socketio/protocol/JacksonJsonSupport.java index d4a552e..441846f 100644 --- a/src/main/java/com/corundumstudio/socketio/protocol/JacksonJsonSupport.java +++ b/src/main/java/com/corundumstudio/socketio/protocol/JacksonJsonSupport.java @@ -291,14 +291,14 @@ public class JacksonJsonSupport implements JsonSupport { } - private final ExBeanSerializerModifier modifier = new ExBeanSerializerModifier(); - private final ThreadLocal namespaceClass = new ThreadLocal(); - private final ThreadLocal> currentAckClass = new ThreadLocal>(); - private final ObjectMapper objectMapper = new ObjectMapper(); - private final EventDeserializer eventDeserializer = new EventDeserializer(); - private final AckArgsDeserializer ackArgsDeserializer = new AckArgsDeserializer(); - - private static final Logger log = LoggerFactory.getLogger(JacksonJsonSupport.class); + protected final ExBeanSerializerModifier modifier = new ExBeanSerializerModifier(); + protected final ThreadLocal namespaceClass = new ThreadLocal(); + protected final ThreadLocal> currentAckClass = new ThreadLocal>(); + protected final ObjectMapper objectMapper = new ObjectMapper(); + protected final EventDeserializer eventDeserializer = new EventDeserializer(); + protected final AckArgsDeserializer ackArgsDeserializer = new AckArgsDeserializer(); + + protected static final Logger log = LoggerFactory.getLogger(JacksonJsonSupport.class); public JacksonJsonSupport() { this(new Module[] {});