Browse Source

Merge branch 'master' of github.com:mrniko/netty-socketio

master
Nikita 9 years ago
parent
commit
2d59a91ffe
  1. 56
      README.md
  2. 4
      src/main/java/com/corundumstudio/socketio/BroadcastOperations.java
  3. 6
      src/main/java/com/corundumstudio/socketio/SocketIOChannelInitializer.java
  4. 4
      src/main/java/com/corundumstudio/socketio/handler/AuthorizeHandler.java
  5. 9
      src/main/java/com/corundumstudio/socketio/namespace/Namespace.java
  6. 16
      src/main/java/com/corundumstudio/socketio/protocol/JacksonJsonSupport.java
  7. 11
      src/main/java/com/corundumstudio/socketio/store/HazelcastPubSubStore.java
  8. 7
      src/main/java/com/corundumstudio/socketio/store/MemoryPubSubStore.java
  9. 11
      src/main/java/com/corundumstudio/socketio/store/RedissonPubSubStore.java
  10. 20
      src/main/java/com/corundumstudio/socketio/store/pubsub/BaseStoreFactory.java
  11. 18
      src/main/java/com/corundumstudio/socketio/store/pubsub/PubSubStore.java
  12. 12
      src/main/java/com/corundumstudio/socketio/store/pubsub/PubSubType.java

56
README.md

@ -17,8 +17,8 @@ Features
* Supports namespaces and rooms
* Supports ack (acknowledgment of received data)
* Supports SSL
* Supports client store (Memory, [Redisson](https://github.com/mrniko/redisson), [Hazelcast](http://www.hazelcast.com/))
* Supports distributed broadcast across netty-socketio nodes ([Redisson](https://github.com/mrniko/redisson), [Hazelcast](http://www.hazelcast.com/))
* Supports client store (Memory, [Redisson](http://redisson.org), [Hazelcast](http://www.hazelcast.com/))
* Supports distributed broadcast across netty-socketio nodes ([Redisson](http://redisson.org), [Hazelcast](http://www.hazelcast.com/))
* Supports OSGi
* Supports Spring
* Lock-free and thread-safe implementation
@ -48,13 +48,13 @@ Zipwhip: [zipwhip.com](https://zipwhip.com/)
Recent Releases
================================
####Please Note: trunk is current development branch.
#### Please Note: trunk is current development branch.
####4-Mar-2015 - version 1.7.10 released
#### 4-Mar-2015 - version 1.7.10 released
Fixed - netty updated to 4.1.0.CR3 version
Fixed - binary packet parsing (thanks to Winston Li)
####6-Feb-2015 - version 1.7.9 released
#### 6-Feb-2015 - version 1.7.9 released
Feature - Compression support
Fixed - DotNET client request handling
Fixed - Packet length format parsing
@ -63,46 +63,46 @@ Fixed - Polling clients sporatically get prematurely disconnected (thanks to lpa
Fixed - connections stay open forever if server sent `close` packet
Fixed - compatibility with Redisson latest version
####30-Nov-2015 - version 1.7.8 released
#### 30-Nov-2015 - version 1.7.8 released
Improvement - `WebSocketServerHandshaker.allowExtensions` is `true` now
Improvement - SessionID cookie implementation (thanks to @ryandietrich)
Fixed - clientRooms leak (thanks to @andreaspalm)
Fixed - ExceptionListener not used for errors in JSON parsing
Fixed - "silent channel" attack
####26-Mar-2015 - version 1.6.7 released
#### 26-Mar-2015 - version 1.6.7 released
Improvement - `useStrictOrdering` param added for websocket packets strict ordering
Improvement - `FAIL_ON_EMPTY_BEANS = false` option setted in json decoder
####18-Feb-2015 - version 1.7.7 released
#### 18-Feb-2015 - version 1.7.7 released
Improvement - no need to add jackson lib if you use own JsonSupport impl
Fixed - SocketIO client 1.3.x support
Fixed - Charset encoding handling (thanks to alim-akbashev)
####17-Jan-2015 - version 1.7.6 released
#### 17-Jan-2015 - version 1.7.6 released
Improvement - `SocketIONamespace.getName()` added
Fixed - WebSocket frames aggregation
Fixed - WebSocket buffer release
Fixed - `Unexpected end-of-input in VALUE_STRING` error
Fixed - Access-Control-Allow-Credentials is TRUE for requests with origin header
####05-Dec-2014 - version 1.7.5 released
#### 05-Dec-2014 - version 1.7.5 released
Feature - `Configuration.sslProtocol` param added
Fixed - BinaryEvent ack handling
Fixed - BinaryEvent non b64 encoding/decoding
Fixed - buffer leak during packet encoding
####15-Nov-2014 - version 1.7.4 released
#### 15-Nov-2014 - version 1.7.4 released
Fixed - packet encoding
Fixed - BinaryEvent encoding/decoding
Fixed - unchallenged connections handling
####29-Sep-2014 - version 1.6.6 released
#### 29-Sep-2014 - version 1.6.6 released
Feature - `origin` setting added
Feature - `crossDomainPolicy` setting added
Feature - `SocketIOServer.startAsync` method added
####24-Sep-2014 - version 1.7.3 released
#### 24-Sep-2014 - version 1.7.3 released
Feature - Epoll support
Improvement - BinaryEvent support
Fixed - SocketIOClient disconnect handling
@ -111,23 +111,23 @@ Fixed - NPE then no transport defined during auth
Fixed - ping timeout for polling transport
Fixed - buffer leak in PacketEncoder
####22-Aug-2014 - version 1.7.2 released
#### 22-Aug-2014 - version 1.7.2 released
Fixed - wrong outgoing message encoding using websocket transport
Fixed - NPE in websocket transport
Fixed - multiple packet decoding in polling transport
Fixed - buffer leak
####07-Jul-2014 - version 1.7.1 released
#### 07-Jul-2014 - version 1.7.1 released
Feature - ability to set custom `Access-Control-Allow-Origin` via Configuration.origin
Fixed - connection via CLI socket.io-client
####28-Jun-2014 - version 1.7.0 released
#### 28-Jun-2014 - version 1.7.0 released
Feature - Socket.IO 1.0 protocol support. Thanks to the new protocol decoding/encoding has speedup
__Dropped__ - `SocketIOClient.sendMessage`, `SocketIOClient.sendJsonObject` methods and corresponding listeners
__Dropped__ - Flashsocket transport support
__Dropped__ - protocol version 0.7 ... 0.9.16
####13-May-2014 - version 1.6.5 released
#### 13-May-2014 - version 1.6.5 released
Improvement - single packet encoding optimized, used mostly in WebSocket transport. Encoding time reduced up to 40% (thanks to Viktor Endersz)
Improvement - rooms handling optimized
Improvement - ExceptionListener.exceptionCaught method added
@ -137,7 +137,7 @@ Feature - maxFramePayloadLength setting added
Feature - getAllClients and getClient methods added to SocketIONamespace
Fixed - SocketIOServer.getAllClients returns wrong clients amount
####25-Mar-2014 - version 1.6.4 released
#### 25-Mar-2014 - version 1.6.4 released
Fixed - message release problem
Fixed - problem with exception listener configuration redefinition
__Breaking api change__ - DataListener.onData now throws Exception
@ -145,7 +145,7 @@ Improvement - data parameter added to exception listener
Improvement - ability to setup socket configuration
Improvement - Configuration.autoAck parameter added
####06-Mar-2014 - version 1.6.3 released
#### 06-Mar-2014 - version 1.6.3 released
Fixed - AckCallback handling during client disconnect
Fixed - unauthorized handshake HTTP code changed to 401
__Breaking api change__ - Configuration.heartbeatThreadPoolSize setting removed
@ -153,7 +153,7 @@ Feature - annotated Spring beans support via _SpringAnnotationScanner_
Feature - common exception listener
Improvement - _ScheduledExecutorService_ replaced with _HashedWheelTimer_
####08-Feb-2014 - version 1.6.2 released
#### 08-Feb-2014 - version 1.6.2 released
Fixed - wrong namespace client disconnect handling
Fixed - exception in onConnect/onDisconnect/isAuthorized methods leads to server hang
__Breaking api change__ - SocketIOClient.sendEvent methods signature changed
@ -162,11 +162,11 @@ Improvement - multi type events ack support via _MultiTypeAckCallback_
Improvement - SocketIOClient.getHandshakeData method added
Improvement - Jedis replaced with [Redisson](https://github.com/mrniko/redisson)
####14-Jan-2014 - version 1.6.1 released
#### 14-Jan-2014 - version 1.6.1 released
Fixed - JDK 1.6+ compatibility
Feature - authorization support
####19-Dec-2013 - version 1.6.0 released
#### 19-Dec-2013 - version 1.6.0 released
Fixed - XHR-pooling transport regression
Fixed - Websocket transport regression
Fixed - namespace NPE in PacketHandler
@ -177,33 +177,33 @@ Feature - OSGi support (thanks to rdevera)
Improvement - XHR-pooling optimization
Improvement - SocketIOClient.getAllRooms method added
####07-Dec-2013 - version 1.5.4 released
#### 07-Dec-2013 - version 1.5.4 released
Fixed - flash policy "request leak" after page reload (thanks to ntrp)
Fixed - websocket swf loading (thanks to ntrp)
Fixed - wrong urls causes a potential DDoS
Fixed - Event.class package visibility changed to avoid direct usage
Improvement - Simplified Jackson modules registration
####24-Oct-2013 - version 1.5.2 released
#### 24-Oct-2013 - version 1.5.2 released
Fixed - NPE during shutdown
Improvement - isEmpty method added to Namespace
####13-Oct-2013 - version 1.5.1 released
#### 13-Oct-2013 - version 1.5.1 released
Fixed - wrong ack timeout callback invocation
Fixed - bigdecimal serialization for JSON
Fixed - infinity loop during packet handling exception
Fixed - 'client not found' handling
####27-Aug-2013 - version 1.5.0 released
#### 27-Aug-2013 - version 1.5.0 released
Improvement - encoding buffers allocation optimization.
Improvement - encoding buffers now pooled in memory to reduce GC pressure (netty 4.x feature).
####03-Aug-2013 - version 1.0.1 released
#### 03-Aug-2013 - version 1.0.1 released
Fixed - error on unknown property during deserialization.
Fixed - memory leak in long polling transport.
Improvement - logging error info with inbound data.
####07-Jun-2013 - version 1.0.0 released
#### 07-Jun-2013 - version 1.0.0 released
First stable release.

4
src/main/java/com/corundumstudio/socketio/BroadcastOperations.java

@ -30,7 +30,7 @@ import com.corundumstudio.socketio.protocol.Packet;
import com.corundumstudio.socketio.protocol.PacketType;
import com.corundumstudio.socketio.store.StoreFactory;
import com.corundumstudio.socketio.store.pubsub.DispatchMessage;
import com.corundumstudio.socketio.store.pubsub.PubSubStore;
import com.corundumstudio.socketio.store.pubsub.PubSubType;
/**
* Fully thread-safe.
@ -62,7 +62,7 @@ public class BroadcastOperations implements ClientOperations {
private void dispatch(Packet packet) {
for (Entry<String, List<String>> entry : namespaceRooms.entrySet()) {
for (String room : entry.getValue()) {
storeFactory.pubSubStore().publish(PubSubStore.DISPATCH, new DispatchMessage(room, packet, entry.getKey()));
storeFactory.pubSubStore().publish(PubSubType.DISPATCH, new DispatchMessage(room, packet, entry.getKey()));
}
}
}

6
src/main/java/com/corundumstudio/socketio/SocketIOChannelInitializer.java

@ -42,7 +42,7 @@ import com.corundumstudio.socketio.scheduler.CancelableScheduler;
import com.corundumstudio.socketio.scheduler.HashedWheelTimeoutScheduler;
import com.corundumstudio.socketio.store.StoreFactory;
import com.corundumstudio.socketio.store.pubsub.DisconnectMessage;
import com.corundumstudio.socketio.store.pubsub.PubSubStore;
import com.corundumstudio.socketio.store.pubsub.PubSubType;
import com.corundumstudio.socketio.transport.PollingTransport;
import com.corundumstudio.socketio.transport.WebSocketTransport;
@ -54,9 +54,7 @@ import io.netty.handler.codec.http.HttpContentCompressor;
import io.netty.handler.codec.http.HttpMessage;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseEncoder;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler;
import io.netty.handler.ssl.SslHandler;
@ -224,7 +222,7 @@ public class SocketIOChannelInitializer extends ChannelInitializer<Channel> impl
authorizeHandler.onDisconnect(client);
configuration.getStoreFactory().onDisconnect(client);
configuration.getStoreFactory().pubSubStore().publish(PubSubStore.DISCONNECT, new DisconnectMessage(client.getSessionId()));
configuration.getStoreFactory().pubSubStore().publish(PubSubType.DISCONNECT, new DisconnectMessage(client.getSessionId()));
log.debug("Client with sessionId: {} disconnected", client.getSessionId());
}

4
src/main/java/com/corundumstudio/socketio/handler/AuthorizeHandler.java

@ -45,7 +45,7 @@ import com.corundumstudio.socketio.scheduler.SchedulerKey;
import com.corundumstudio.socketio.scheduler.SchedulerKey.Type;
import com.corundumstudio.socketio.store.StoreFactory;
import com.corundumstudio.socketio.store.pubsub.ConnectMessage;
import com.corundumstudio.socketio.store.pubsub.PubSubStore;
import com.corundumstudio.socketio.store.pubsub.PubSubType;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
@ -223,7 +223,7 @@ public class AuthorizeHandler extends ChannelInboundHandlerAdapter implements Di
packet.setSubType(PacketType.CONNECT);
client.send(packet);
configuration.getStoreFactory().pubSubStore().publish(PubSubStore.CONNECT, new ConnectMessage(client.getSessionId()));
configuration.getStoreFactory().pubSubStore().publish(PubSubType.CONNECT, new ConnectMessage(client.getSessionId()));
SocketIOClient nsClient = client.addNamespaceClient(ns);
ns.onConnect(nsClient);

9
src/main/java/com/corundumstudio/socketio/namespace/Namespace.java

@ -46,6 +46,7 @@ import com.corundumstudio.socketio.protocol.Packet;
import com.corundumstudio.socketio.store.StoreFactory;
import com.corundumstudio.socketio.store.pubsub.JoinLeaveMessage;
import com.corundumstudio.socketio.store.pubsub.PubSubStore;
import com.corundumstudio.socketio.store.pubsub.PubSubType;
import com.corundumstudio.socketio.transport.NamespaceClient;
/**
@ -172,7 +173,7 @@ public class Namespace implements SocketIONamespace {
allClients.remove(client.getSessionId());
leave(getName(), client.getSessionId());
storeFactory.pubSubStore().publish(PubSubStore.LEAVE, new JoinLeaveMessage(client.getSessionId(), getName(), getName()));
storeFactory.pubSubStore().publish(PubSubType.LEAVE, new JoinLeaveMessage(client.getSessionId(), getName(), getName()));
try {
for (DisconnectListener listener : disconnectListeners) {
@ -190,7 +191,7 @@ public class Namespace implements SocketIONamespace {
public void onConnect(SocketIOClient client) {
join(getName(), client.getSessionId());
storeFactory.pubSubStore().publish(PubSubStore.JOIN, new JoinLeaveMessage(client.getSessionId(), getName(), getName()));
storeFactory.pubSubStore().publish(PubSubType.JOIN, new JoinLeaveMessage(client.getSessionId(), getName(), getName()));
try {
for (ConnectListener listener : connectListeners) {
@ -248,7 +249,7 @@ public class Namespace implements SocketIONamespace {
public void joinRoom(String room, UUID sessionId) {
join(room, sessionId);
storeFactory.pubSubStore().publish(PubSubStore.JOIN, new JoinLeaveMessage(sessionId, room, getName()));
storeFactory.pubSubStore().publish(PubSubType.JOIN, new JoinLeaveMessage(sessionId, room, getName()));
}
public void dispatch(String room, Packet packet) {
@ -283,7 +284,7 @@ public class Namespace implements SocketIONamespace {
public void leaveRoom(String room, UUID sessionId) {
leave(room, sessionId);
storeFactory.pubSubStore().publish(PubSubStore.LEAVE, new JoinLeaveMessage(sessionId, room, getName()));
storeFactory.pubSubStore().publish(PubSubType.LEAVE, new JoinLeaveMessage(sessionId, room, getName()));
}
private <K, V> void leave(ConcurrentMap<K, Set<V>> map, K room, V sessionId) {

16
src/main/java/com/corundumstudio/socketio/protocol/JacksonJsonSupport.java

@ -291,14 +291,14 @@ public class JacksonJsonSupport implements JsonSupport {
}
private final ExBeanSerializerModifier modifier = new ExBeanSerializerModifier();
private final ThreadLocal<String> namespaceClass = new ThreadLocal<String>();
private final ThreadLocal<AckCallback<?>> currentAckClass = new ThreadLocal<AckCallback<?>>();
private final ObjectMapper objectMapper = new ObjectMapper();
private final EventDeserializer eventDeserializer = new EventDeserializer();
private final AckArgsDeserializer ackArgsDeserializer = new AckArgsDeserializer();
private static final Logger log = LoggerFactory.getLogger(JacksonJsonSupport.class);
protected final ExBeanSerializerModifier modifier = new ExBeanSerializerModifier();
protected final ThreadLocal<String> namespaceClass = new ThreadLocal<String>();
protected final ThreadLocal<AckCallback<?>> currentAckClass = new ThreadLocal<AckCallback<?>>();
protected final ObjectMapper objectMapper = new ObjectMapper();
protected final EventDeserializer eventDeserializer = new EventDeserializer();
protected final AckArgsDeserializer ackArgsDeserializer = new AckArgsDeserializer();
protected static final Logger log = LoggerFactory.getLogger(JacksonJsonSupport.class);
public JacksonJsonSupport() {
this(new Module[] {});

11
src/main/java/com/corundumstudio/socketio/store/HazelcastPubSubStore.java

@ -24,6 +24,7 @@ import java.util.concurrent.ConcurrentMap;
import com.corundumstudio.socketio.store.pubsub.PubSubListener;
import com.corundumstudio.socketio.store.pubsub.PubSubMessage;
import com.corundumstudio.socketio.store.pubsub.PubSubStore;
import com.corundumstudio.socketio.store.pubsub.PubSubType;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.ITopic;
import com.hazelcast.core.Message;
@ -45,13 +46,14 @@ public class HazelcastPubSubStore implements PubSubStore {
}
@Override
public void publish(String name, PubSubMessage msg) {
public void publish(PubSubType type, PubSubMessage msg) {
msg.setNodeId(nodeId);
hazelcastPub.getTopic(name).publish(msg);
hazelcastPub.getTopic(type.toString()).publish(msg);
}
@Override
public <T extends PubSubMessage> void subscribe(String name, final PubSubListener<T> listener, Class<T> clazz) {
public <T extends PubSubMessage> void subscribe(PubSubType type, final PubSubListener<T> listener, Class<T> clazz) {
String name = type.toString();
ITopic<T> topic = hazelcastSub.getTopic(name);
String regId = topic.addMessageListener(new MessageListener<T>() {
@Override
@ -75,7 +77,8 @@ public class HazelcastPubSubStore implements PubSubStore {
}
@Override
public void unsubscribe(String name) {
public void unsubscribe(PubSubType type) {
String name = type.toString();
Queue<String> regIds = map.remove(name);
ITopic<Object> topic = hazelcastSub.getTopic(name);
for (String id : regIds) {

7
src/main/java/com/corundumstudio/socketio/store/MemoryPubSubStore.java

@ -18,19 +18,20 @@ package com.corundumstudio.socketio.store;
import com.corundumstudio.socketio.store.pubsub.PubSubListener;
import com.corundumstudio.socketio.store.pubsub.PubSubMessage;
import com.corundumstudio.socketio.store.pubsub.PubSubStore;
import com.corundumstudio.socketio.store.pubsub.PubSubType;
public class MemoryPubSubStore implements PubSubStore {
@Override
public void publish(String name, PubSubMessage msg) {
public void publish(PubSubType type, PubSubMessage msg) {
}
@Override
public <T extends PubSubMessage> void subscribe(String name, PubSubListener<T> listener, Class<T> clazz) {
public <T extends PubSubMessage> void subscribe(PubSubType type, PubSubListener<T> listener, Class<T> clazz) {
}
@Override
public void unsubscribe(String name) {
public void unsubscribe(PubSubType type) {
}
@Override

11
src/main/java/com/corundumstudio/socketio/store/RedissonPubSubStore.java

@ -26,6 +26,7 @@ import org.redisson.core.RTopic;
import com.corundumstudio.socketio.store.pubsub.PubSubListener;
import com.corundumstudio.socketio.store.pubsub.PubSubMessage;
import com.corundumstudio.socketio.store.pubsub.PubSubStore;
import com.corundumstudio.socketio.store.pubsub.PubSubType;
import io.netty.util.internal.PlatformDependent;
@ -44,13 +45,14 @@ public class RedissonPubSubStore implements PubSubStore {
}
@Override
public void publish(String name, PubSubMessage msg) {
public void publish(PubSubType type, PubSubMessage msg) {
msg.setNodeId(nodeId);
redissonPub.getTopic(name).publish(msg);
redissonPub.getTopic(type.toString()).publish(msg);
}
@Override
public <T extends PubSubMessage> void subscribe(String name, final PubSubListener<T> listener, Class<T> clazz) {
public <T extends PubSubMessage> void subscribe(PubSubType type, final PubSubListener<T> listener, Class<T> clazz) {
String name = type.toString();
RTopic<T> topic = redissonSub.getTopic(name);
int regId = topic.addListener(new MessageListener<T>() {
@Override
@ -73,7 +75,8 @@ public class RedissonPubSubStore implements PubSubStore {
}
@Override
public void unsubscribe(String name) {
public void unsubscribe(PubSubType type) {
String name = type.toString();
Queue<Integer> regIds = map.remove(name);
RTopic<Object> topic = redissonSub.getTopic(name);
for (Integer id : regIds) {

20
src/main/java/com/corundumstudio/socketio/store/pubsub/BaseStoreFactory.java

@ -36,48 +36,48 @@ public abstract class BaseStoreFactory implements StoreFactory {
@Override
public void init(final NamespacesHub namespacesHub, final AuthorizeHandler authorizeHandler, JsonSupport jsonSupport) {
pubSubStore().subscribe(PubSubStore.DISCONNECT, new PubSubListener<DisconnectMessage>() {
pubSubStore().subscribe(PubSubType.DISCONNECT, new PubSubListener<DisconnectMessage>() {
@Override
public void onMessage(DisconnectMessage msg) {
log.debug("{} sessionId: {}", PubSubStore.DISCONNECT, msg.getSessionId());
log.debug("{} sessionId: {}", PubSubType.DISCONNECT, msg.getSessionId());
}
}, DisconnectMessage.class);
pubSubStore().subscribe(PubSubStore.CONNECT, new PubSubListener<ConnectMessage>() {
pubSubStore().subscribe(PubSubType.CONNECT, new PubSubListener<ConnectMessage>() {
@Override
public void onMessage(ConnectMessage msg) {
authorizeHandler.connect(msg.getSessionId());
log.debug("{} sessionId: {}", PubSubStore.CONNECT, msg.getSessionId());
log.debug("{} sessionId: {}", PubSubType.CONNECT, msg.getSessionId());
}
}, ConnectMessage.class);
pubSubStore().subscribe(PubSubStore.DISPATCH, new PubSubListener<DispatchMessage>() {
pubSubStore().subscribe(PubSubType.DISPATCH, new PubSubListener<DispatchMessage>() {
@Override
public void onMessage(DispatchMessage msg) {
String name = msg.getRoom();
namespacesHub.get(msg.getNamespace()).dispatch(name, msg.getPacket());
log.debug("{} packet: {}", PubSubStore.DISPATCH, msg.getPacket());
log.debug("{} packet: {}", PubSubType.DISPATCH, msg.getPacket());
}
}, DispatchMessage.class);
pubSubStore().subscribe(PubSubStore.JOIN, new PubSubListener<JoinLeaveMessage>() {
pubSubStore().subscribe(PubSubType.JOIN, new PubSubListener<JoinLeaveMessage>() {
@Override
public void onMessage(JoinLeaveMessage msg) {
String name = msg.getRoom();
namespacesHub.get(msg.getNamespace()).join(name, msg.getSessionId());
log.debug("{} sessionId: {}", PubSubStore.JOIN, msg.getSessionId());
log.debug("{} sessionId: {}", PubSubType.JOIN, msg.getSessionId());
}
}, JoinLeaveMessage.class);
pubSubStore().subscribe(PubSubStore.LEAVE, new PubSubListener<JoinLeaveMessage>() {
pubSubStore().subscribe(PubSubType.LEAVE, new PubSubListener<JoinLeaveMessage>() {
@Override
public void onMessage(JoinLeaveMessage msg) {
String name = msg.getRoom();
namespacesHub.get(msg.getNamespace()).leave(name, msg.getSessionId());
log.debug("{} sessionId: {}", PubSubStore.LEAVE, msg.getSessionId());
log.debug("{} sessionId: {}", PubSubType.LEAVE, msg.getSessionId());
}
}, JoinLeaveMessage.class);
}

18
src/main/java/com/corundumstudio/socketio/store/pubsub/PubSubStore.java

@ -18,23 +18,11 @@ package com.corundumstudio.socketio.store.pubsub;
public interface PubSubStore {
// TODO refactor to enum
String CONNECT = "connect";
void publish(PubSubType type, PubSubMessage msg);
String DISCONNECT = "disconnect";
<T extends PubSubMessage> void subscribe(PubSubType type, PubSubListener<T> listener, Class<T> clazz);
String JOIN = "join";
String LEAVE = "leave";
String DISPATCH = "dispatch";
void publish(String name, PubSubMessage msg);
<T extends PubSubMessage> void subscribe(String name, PubSubListener<T> listener, Class<T> clazz);
void unsubscribe(String name);
void unsubscribe(PubSubType type);
void shutdown();

12
src/main/java/com/corundumstudio/socketio/store/pubsub/PubSubType.java

@ -0,0 +1,12 @@
package com.corundumstudio.socketio.store.pubsub;
public enum PubSubType {
CONNECT, DISCONNECT, JOIN, LEAVE, DISPATCH;
@Override
public String toString() {
return name().toLowerCase();
}
}
Loading…
Cancel
Save