Browse Source

Merge pull request #341 from whg333/master

'TODO refactor to enum' in PubSubStore interface
master
Nikita Koksharov 9 years ago
parent
commit
51423ade7e
  1. 4
      src/main/java/com/corundumstudio/socketio/BroadcastOperations.java
  2. 6
      src/main/java/com/corundumstudio/socketio/SocketIOChannelInitializer.java
  3. 4
      src/main/java/com/corundumstudio/socketio/handler/AuthorizeHandler.java
  4. 9
      src/main/java/com/corundumstudio/socketio/namespace/Namespace.java
  5. 20
      src/main/java/com/corundumstudio/socketio/store/pubsub/BaseStoreFactory.java
  6. 16
      src/main/java/com/corundumstudio/socketio/store/pubsub/PubSubStore.java
  7. 16
      src/main/java/com/corundumstudio/socketio/store/pubsub/PubSubType.java

4
src/main/java/com/corundumstudio/socketio/BroadcastOperations.java

@ -30,7 +30,7 @@ import com.corundumstudio.socketio.protocol.Packet;
import com.corundumstudio.socketio.protocol.PacketType; import com.corundumstudio.socketio.protocol.PacketType;
import com.corundumstudio.socketio.store.StoreFactory; import com.corundumstudio.socketio.store.StoreFactory;
import com.corundumstudio.socketio.store.pubsub.DispatchMessage; import com.corundumstudio.socketio.store.pubsub.DispatchMessage;
import com.corundumstudio.socketio.store.pubsub.PubSubStore;
import com.corundumstudio.socketio.store.pubsub.PubSubType;
/** /**
* Fully thread-safe. * Fully thread-safe.
@ -62,7 +62,7 @@ public class BroadcastOperations implements ClientOperations {
private void dispatch(Packet packet) { private void dispatch(Packet packet) {
for (Entry<String, List<String>> entry : namespaceRooms.entrySet()) { for (Entry<String, List<String>> entry : namespaceRooms.entrySet()) {
for (String room : entry.getValue()) { for (String room : entry.getValue()) {
storeFactory.pubSubStore().publish(PubSubStore.DISPATCH, new DispatchMessage(room, packet, entry.getKey()));
storeFactory.pubSubStore().publish(PubSubType.DISPATCH, new DispatchMessage(room, packet, entry.getKey()));
} }
} }
} }

6
src/main/java/com/corundumstudio/socketio/SocketIOChannelInitializer.java

@ -42,7 +42,7 @@ import com.corundumstudio.socketio.scheduler.CancelableScheduler;
import com.corundumstudio.socketio.scheduler.HashedWheelTimeoutScheduler; import com.corundumstudio.socketio.scheduler.HashedWheelTimeoutScheduler;
import com.corundumstudio.socketio.store.StoreFactory; import com.corundumstudio.socketio.store.StoreFactory;
import com.corundumstudio.socketio.store.pubsub.DisconnectMessage; import com.corundumstudio.socketio.store.pubsub.DisconnectMessage;
import com.corundumstudio.socketio.store.pubsub.PubSubStore;
import com.corundumstudio.socketio.store.pubsub.PubSubType;
import com.corundumstudio.socketio.transport.PollingTransport; import com.corundumstudio.socketio.transport.PollingTransport;
import com.corundumstudio.socketio.transport.WebSocketTransport; import com.corundumstudio.socketio.transport.WebSocketTransport;
@ -54,9 +54,7 @@ import io.netty.handler.codec.http.HttpContentCompressor;
import io.netty.handler.codec.http.HttpMessage; import io.netty.handler.codec.http.HttpMessage;
import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestDecoder; import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseEncoder; import io.netty.handler.codec.http.HttpResponseEncoder;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler; import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler;
import io.netty.handler.ssl.SslHandler; import io.netty.handler.ssl.SslHandler;
@ -224,7 +222,7 @@ public class SocketIOChannelInitializer extends ChannelInitializer<Channel> impl
authorizeHandler.onDisconnect(client); authorizeHandler.onDisconnect(client);
configuration.getStoreFactory().onDisconnect(client); configuration.getStoreFactory().onDisconnect(client);
configuration.getStoreFactory().pubSubStore().publish(PubSubStore.DISCONNECT, new DisconnectMessage(client.getSessionId()));
configuration.getStoreFactory().pubSubStore().publish(PubSubType.DISCONNECT, new DisconnectMessage(client.getSessionId()));
log.debug("Client with sessionId: {} disconnected", client.getSessionId()); log.debug("Client with sessionId: {} disconnected", client.getSessionId());
} }

4
src/main/java/com/corundumstudio/socketio/handler/AuthorizeHandler.java

@ -45,7 +45,7 @@ import com.corundumstudio.socketio.scheduler.SchedulerKey;
import com.corundumstudio.socketio.scheduler.SchedulerKey.Type; import com.corundumstudio.socketio.scheduler.SchedulerKey.Type;
import com.corundumstudio.socketio.store.StoreFactory; import com.corundumstudio.socketio.store.StoreFactory;
import com.corundumstudio.socketio.store.pubsub.ConnectMessage; import com.corundumstudio.socketio.store.pubsub.ConnectMessage;
import com.corundumstudio.socketio.store.pubsub.PubSubStore;
import com.corundumstudio.socketio.store.pubsub.PubSubType;
import io.netty.channel.Channel; import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelFutureListener;
@ -223,7 +223,7 @@ public class AuthorizeHandler extends ChannelInboundHandlerAdapter implements Di
packet.setSubType(PacketType.CONNECT); packet.setSubType(PacketType.CONNECT);
client.send(packet); client.send(packet);
configuration.getStoreFactory().pubSubStore().publish(PubSubStore.CONNECT, new ConnectMessage(client.getSessionId()));
configuration.getStoreFactory().pubSubStore().publish(PubSubType.CONNECT, new ConnectMessage(client.getSessionId()));
SocketIOClient nsClient = client.addNamespaceClient(ns); SocketIOClient nsClient = client.addNamespaceClient(ns);
ns.onConnect(nsClient); ns.onConnect(nsClient);

9
src/main/java/com/corundumstudio/socketio/namespace/Namespace.java

@ -46,6 +46,7 @@ import com.corundumstudio.socketio.protocol.Packet;
import com.corundumstudio.socketio.store.StoreFactory; import com.corundumstudio.socketio.store.StoreFactory;
import com.corundumstudio.socketio.store.pubsub.JoinLeaveMessage; import com.corundumstudio.socketio.store.pubsub.JoinLeaveMessage;
import com.corundumstudio.socketio.store.pubsub.PubSubStore; import com.corundumstudio.socketio.store.pubsub.PubSubStore;
import com.corundumstudio.socketio.store.pubsub.PubSubType;
import com.corundumstudio.socketio.transport.NamespaceClient; import com.corundumstudio.socketio.transport.NamespaceClient;
/** /**
@ -172,7 +173,7 @@ public class Namespace implements SocketIONamespace {
allClients.remove(client.getSessionId()); allClients.remove(client.getSessionId());
leave(getName(), client.getSessionId()); leave(getName(), client.getSessionId());
storeFactory.pubSubStore().publish(PubSubStore.LEAVE, new JoinLeaveMessage(client.getSessionId(), getName(), getName()));
storeFactory.pubSubStore().publish(PubSubType.LEAVE, new JoinLeaveMessage(client.getSessionId(), getName(), getName()));
try { try {
for (DisconnectListener listener : disconnectListeners) { for (DisconnectListener listener : disconnectListeners) {
@ -190,7 +191,7 @@ public class Namespace implements SocketIONamespace {
public void onConnect(SocketIOClient client) { public void onConnect(SocketIOClient client) {
join(getName(), client.getSessionId()); join(getName(), client.getSessionId());
storeFactory.pubSubStore().publish(PubSubStore.JOIN, new JoinLeaveMessage(client.getSessionId(), getName(), getName()));
storeFactory.pubSubStore().publish(PubSubType.JOIN, new JoinLeaveMessage(client.getSessionId(), getName(), getName()));
try { try {
for (ConnectListener listener : connectListeners) { for (ConnectListener listener : connectListeners) {
@ -248,7 +249,7 @@ public class Namespace implements SocketIONamespace {
public void joinRoom(String room, UUID sessionId) { public void joinRoom(String room, UUID sessionId) {
join(room, sessionId); join(room, sessionId);
storeFactory.pubSubStore().publish(PubSubStore.JOIN, new JoinLeaveMessage(sessionId, room, getName()));
storeFactory.pubSubStore().publish(PubSubType.JOIN, new JoinLeaveMessage(sessionId, room, getName()));
} }
public void dispatch(String room, Packet packet) { public void dispatch(String room, Packet packet) {
@ -283,7 +284,7 @@ public class Namespace implements SocketIONamespace {
public void leaveRoom(String room, UUID sessionId) { public void leaveRoom(String room, UUID sessionId) {
leave(room, sessionId); leave(room, sessionId);
storeFactory.pubSubStore().publish(PubSubStore.LEAVE, new JoinLeaveMessage(sessionId, room, getName()));
storeFactory.pubSubStore().publish(PubSubType.LEAVE, new JoinLeaveMessage(sessionId, room, getName()));
} }
private <K, V> void leave(ConcurrentMap<K, Set<V>> map, K room, V sessionId) { private <K, V> void leave(ConcurrentMap<K, Set<V>> map, K room, V sessionId) {

20
src/main/java/com/corundumstudio/socketio/store/pubsub/BaseStoreFactory.java

@ -36,48 +36,48 @@ public abstract class BaseStoreFactory implements StoreFactory {
@Override @Override
public void init(final NamespacesHub namespacesHub, final AuthorizeHandler authorizeHandler, JsonSupport jsonSupport) { public void init(final NamespacesHub namespacesHub, final AuthorizeHandler authorizeHandler, JsonSupport jsonSupport) {
pubSubStore().subscribe(PubSubStore.DISCONNECT, new PubSubListener<DisconnectMessage>() {
pubSubStore().subscribe(PubSubType.DISCONNECT, new PubSubListener<DisconnectMessage>() {
@Override @Override
public void onMessage(DisconnectMessage msg) { public void onMessage(DisconnectMessage msg) {
log.debug("{} sessionId: {}", PubSubStore.DISCONNECT, msg.getSessionId());
log.debug("{} sessionId: {}", PubSubType.DISCONNECT, msg.getSessionId());
} }
}, DisconnectMessage.class); }, DisconnectMessage.class);
pubSubStore().subscribe(PubSubStore.CONNECT, new PubSubListener<ConnectMessage>() {
pubSubStore().subscribe(PubSubType.CONNECT, new PubSubListener<ConnectMessage>() {
@Override @Override
public void onMessage(ConnectMessage msg) { public void onMessage(ConnectMessage msg) {
authorizeHandler.connect(msg.getSessionId()); authorizeHandler.connect(msg.getSessionId());
log.debug("{} sessionId: {}", PubSubStore.CONNECT, msg.getSessionId());
log.debug("{} sessionId: {}", PubSubType.CONNECT, msg.getSessionId());
} }
}, ConnectMessage.class); }, ConnectMessage.class);
pubSubStore().subscribe(PubSubStore.DISPATCH, new PubSubListener<DispatchMessage>() {
pubSubStore().subscribe(PubSubType.DISPATCH, new PubSubListener<DispatchMessage>() {
@Override @Override
public void onMessage(DispatchMessage msg) { public void onMessage(DispatchMessage msg) {
String name = msg.getRoom(); String name = msg.getRoom();
namespacesHub.get(msg.getNamespace()).dispatch(name, msg.getPacket()); namespacesHub.get(msg.getNamespace()).dispatch(name, msg.getPacket());
log.debug("{} packet: {}", PubSubStore.DISPATCH, msg.getPacket());
log.debug("{} packet: {}", PubSubType.DISPATCH, msg.getPacket());
} }
}, DispatchMessage.class); }, DispatchMessage.class);
pubSubStore().subscribe(PubSubStore.JOIN, new PubSubListener<JoinLeaveMessage>() {
pubSubStore().subscribe(PubSubType.JOIN, new PubSubListener<JoinLeaveMessage>() {
@Override @Override
public void onMessage(JoinLeaveMessage msg) { public void onMessage(JoinLeaveMessage msg) {
String name = msg.getRoom(); String name = msg.getRoom();
namespacesHub.get(msg.getNamespace()).join(name, msg.getSessionId()); namespacesHub.get(msg.getNamespace()).join(name, msg.getSessionId());
log.debug("{} sessionId: {}", PubSubStore.JOIN, msg.getSessionId());
log.debug("{} sessionId: {}", PubSubType.JOIN, msg.getSessionId());
} }
}, JoinLeaveMessage.class); }, JoinLeaveMessage.class);
pubSubStore().subscribe(PubSubStore.LEAVE, new PubSubListener<JoinLeaveMessage>() {
pubSubStore().subscribe(PubSubType.LEAVE, new PubSubListener<JoinLeaveMessage>() {
@Override @Override
public void onMessage(JoinLeaveMessage msg) { public void onMessage(JoinLeaveMessage msg) {
String name = msg.getRoom(); String name = msg.getRoom();
namespacesHub.get(msg.getNamespace()).leave(name, msg.getSessionId()); namespacesHub.get(msg.getNamespace()).leave(name, msg.getSessionId());
log.debug("{} sessionId: {}", PubSubStore.LEAVE, msg.getSessionId());
log.debug("{} sessionId: {}", PubSubType.LEAVE, msg.getSessionId());
} }
}, JoinLeaveMessage.class); }, JoinLeaveMessage.class);
} }

16
src/main/java/com/corundumstudio/socketio/store/pubsub/PubSubStore.java

@ -18,21 +18,9 @@ package com.corundumstudio.socketio.store.pubsub;
public interface PubSubStore { public interface PubSubStore {
// TODO refactor to enum
String CONNECT = "connect";
void publish(PubSubType type, PubSubMessage msg);
String DISCONNECT = "disconnect";
String JOIN = "join";
String LEAVE = "leave";
String DISPATCH = "dispatch";
void publish(String name, PubSubMessage msg);
<T extends PubSubMessage> void subscribe(String name, PubSubListener<T> listener, Class<T> clazz);
<T extends PubSubMessage> void subscribe(PubSubType type, PubSubListener<T> listener, Class<T> clazz);
void unsubscribe(String name); void unsubscribe(String name);

16
src/main/java/com/corundumstudio/socketio/store/pubsub/PubSubType.java

@ -0,0 +1,16 @@
package com.corundumstudio.socketio.store.pubsub;
public enum PubSubType {
CONNECT,
DISCONNECT,
JOIN,
LEAVE,
DISPATCH;
@Override
public String toString() {
return name().toLowerCase();
}
}
Loading…
Cancel
Save