Browse Source

'TODO refactor to enum' in PubSubStore interface

master
whg 9 years ago
parent
commit
1f74462484
  1. 4
      src/main/java/com/corundumstudio/socketio/BroadcastOperations.java
  2. 6
      src/main/java/com/corundumstudio/socketio/SocketIOChannelInitializer.java
  3. 4
      src/main/java/com/corundumstudio/socketio/handler/AuthorizeHandler.java
  4. 9
      src/main/java/com/corundumstudio/socketio/namespace/Namespace.java
  5. 20
      src/main/java/com/corundumstudio/socketio/store/pubsub/BaseStoreFactory.java
  6. 16
      src/main/java/com/corundumstudio/socketio/store/pubsub/PubSubStore.java
  7. 16
      src/main/java/com/corundumstudio/socketio/store/pubsub/PubSubType.java

4
src/main/java/com/corundumstudio/socketio/BroadcastOperations.java

@ -30,7 +30,7 @@ import com.corundumstudio.socketio.protocol.Packet;
import com.corundumstudio.socketio.protocol.PacketType;
import com.corundumstudio.socketio.store.StoreFactory;
import com.corundumstudio.socketio.store.pubsub.DispatchMessage;
import com.corundumstudio.socketio.store.pubsub.PubSubStore;
import com.corundumstudio.socketio.store.pubsub.PubSubType;
/**
* Fully thread-safe.
@ -62,7 +62,7 @@ public class BroadcastOperations implements ClientOperations {
private void dispatch(Packet packet) {
for (Entry<String, List<String>> entry : namespaceRooms.entrySet()) {
for (String room : entry.getValue()) {
storeFactory.pubSubStore().publish(PubSubStore.DISPATCH, new DispatchMessage(room, packet, entry.getKey()));
storeFactory.pubSubStore().publish(PubSubType.DISPATCH, new DispatchMessage(room, packet, entry.getKey()));
}
}
}

6
src/main/java/com/corundumstudio/socketio/SocketIOChannelInitializer.java

@ -42,7 +42,7 @@ import com.corundumstudio.socketio.scheduler.CancelableScheduler;
import com.corundumstudio.socketio.scheduler.HashedWheelTimeoutScheduler;
import com.corundumstudio.socketio.store.StoreFactory;
import com.corundumstudio.socketio.store.pubsub.DisconnectMessage;
import com.corundumstudio.socketio.store.pubsub.PubSubStore;
import com.corundumstudio.socketio.store.pubsub.PubSubType;
import com.corundumstudio.socketio.transport.PollingTransport;
import com.corundumstudio.socketio.transport.WebSocketTransport;
@ -54,9 +54,7 @@ import io.netty.handler.codec.http.HttpContentCompressor;
import io.netty.handler.codec.http.HttpMessage;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpRequestDecoder;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseEncoder;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler;
import io.netty.handler.ssl.SslHandler;
@ -224,7 +222,7 @@ public class SocketIOChannelInitializer extends ChannelInitializer<Channel> impl
authorizeHandler.onDisconnect(client);
configuration.getStoreFactory().onDisconnect(client);
configuration.getStoreFactory().pubSubStore().publish(PubSubStore.DISCONNECT, new DisconnectMessage(client.getSessionId()));
configuration.getStoreFactory().pubSubStore().publish(PubSubType.DISCONNECT, new DisconnectMessage(client.getSessionId()));
log.debug("Client with sessionId: {} disconnected", client.getSessionId());
}

4
src/main/java/com/corundumstudio/socketio/handler/AuthorizeHandler.java

@ -45,7 +45,7 @@ import com.corundumstudio.socketio.scheduler.SchedulerKey;
import com.corundumstudio.socketio.scheduler.SchedulerKey.Type;
import com.corundumstudio.socketio.store.StoreFactory;
import com.corundumstudio.socketio.store.pubsub.ConnectMessage;
import com.corundumstudio.socketio.store.pubsub.PubSubStore;
import com.corundumstudio.socketio.store.pubsub.PubSubType;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
@ -223,7 +223,7 @@ public class AuthorizeHandler extends ChannelInboundHandlerAdapter implements Di
packet.setSubType(PacketType.CONNECT);
client.send(packet);
configuration.getStoreFactory().pubSubStore().publish(PubSubStore.CONNECT, new ConnectMessage(client.getSessionId()));
configuration.getStoreFactory().pubSubStore().publish(PubSubType.CONNECT, new ConnectMessage(client.getSessionId()));
SocketIOClient nsClient = client.addNamespaceClient(ns);
ns.onConnect(nsClient);

9
src/main/java/com/corundumstudio/socketio/namespace/Namespace.java

@ -46,6 +46,7 @@ import com.corundumstudio.socketio.protocol.Packet;
import com.corundumstudio.socketio.store.StoreFactory;
import com.corundumstudio.socketio.store.pubsub.JoinLeaveMessage;
import com.corundumstudio.socketio.store.pubsub.PubSubStore;
import com.corundumstudio.socketio.store.pubsub.PubSubType;
import com.corundumstudio.socketio.transport.NamespaceClient;
/**
@ -172,7 +173,7 @@ public class Namespace implements SocketIONamespace {
allClients.remove(client.getSessionId());
leave(getName(), client.getSessionId());
storeFactory.pubSubStore().publish(PubSubStore.LEAVE, new JoinLeaveMessage(client.getSessionId(), getName(), getName()));
storeFactory.pubSubStore().publish(PubSubType.LEAVE, new JoinLeaveMessage(client.getSessionId(), getName(), getName()));
try {
for (DisconnectListener listener : disconnectListeners) {
@ -190,7 +191,7 @@ public class Namespace implements SocketIONamespace {
public void onConnect(SocketIOClient client) {
join(getName(), client.getSessionId());
storeFactory.pubSubStore().publish(PubSubStore.JOIN, new JoinLeaveMessage(client.getSessionId(), getName(), getName()));
storeFactory.pubSubStore().publish(PubSubType.JOIN, new JoinLeaveMessage(client.getSessionId(), getName(), getName()));
try {
for (ConnectListener listener : connectListeners) {
@ -248,7 +249,7 @@ public class Namespace implements SocketIONamespace {
public void joinRoom(String room, UUID sessionId) {
join(room, sessionId);
storeFactory.pubSubStore().publish(PubSubStore.JOIN, new JoinLeaveMessage(sessionId, room, getName()));
storeFactory.pubSubStore().publish(PubSubType.JOIN, new JoinLeaveMessage(sessionId, room, getName()));
}
public void dispatch(String room, Packet packet) {
@ -283,7 +284,7 @@ public class Namespace implements SocketIONamespace {
public void leaveRoom(String room, UUID sessionId) {
leave(room, sessionId);
storeFactory.pubSubStore().publish(PubSubStore.LEAVE, new JoinLeaveMessage(sessionId, room, getName()));
storeFactory.pubSubStore().publish(PubSubType.LEAVE, new JoinLeaveMessage(sessionId, room, getName()));
}
private <K, V> void leave(ConcurrentMap<K, Set<V>> map, K room, V sessionId) {

20
src/main/java/com/corundumstudio/socketio/store/pubsub/BaseStoreFactory.java

@ -36,48 +36,48 @@ public abstract class BaseStoreFactory implements StoreFactory {
@Override
public void init(final NamespacesHub namespacesHub, final AuthorizeHandler authorizeHandler, JsonSupport jsonSupport) {
pubSubStore().subscribe(PubSubStore.DISCONNECT, new PubSubListener<DisconnectMessage>() {
pubSubStore().subscribe(PubSubType.DISCONNECT, new PubSubListener<DisconnectMessage>() {
@Override
public void onMessage(DisconnectMessage msg) {
log.debug("{} sessionId: {}", PubSubStore.DISCONNECT, msg.getSessionId());
log.debug("{} sessionId: {}", PubSubType.DISCONNECT, msg.getSessionId());
}
}, DisconnectMessage.class);
pubSubStore().subscribe(PubSubStore.CONNECT, new PubSubListener<ConnectMessage>() {
pubSubStore().subscribe(PubSubType.CONNECT, new PubSubListener<ConnectMessage>() {
@Override
public void onMessage(ConnectMessage msg) {
authorizeHandler.connect(msg.getSessionId());
log.debug("{} sessionId: {}", PubSubStore.CONNECT, msg.getSessionId());
log.debug("{} sessionId: {}", PubSubType.CONNECT, msg.getSessionId());
}
}, ConnectMessage.class);
pubSubStore().subscribe(PubSubStore.DISPATCH, new PubSubListener<DispatchMessage>() {
pubSubStore().subscribe(PubSubType.DISPATCH, new PubSubListener<DispatchMessage>() {
@Override
public void onMessage(DispatchMessage msg) {
String name = msg.getRoom();
namespacesHub.get(msg.getNamespace()).dispatch(name, msg.getPacket());
log.debug("{} packet: {}", PubSubStore.DISPATCH, msg.getPacket());
log.debug("{} packet: {}", PubSubType.DISPATCH, msg.getPacket());
}
}, DispatchMessage.class);
pubSubStore().subscribe(PubSubStore.JOIN, new PubSubListener<JoinLeaveMessage>() {
pubSubStore().subscribe(PubSubType.JOIN, new PubSubListener<JoinLeaveMessage>() {
@Override
public void onMessage(JoinLeaveMessage msg) {
String name = msg.getRoom();
namespacesHub.get(msg.getNamespace()).join(name, msg.getSessionId());
log.debug("{} sessionId: {}", PubSubStore.JOIN, msg.getSessionId());
log.debug("{} sessionId: {}", PubSubType.JOIN, msg.getSessionId());
}
}, JoinLeaveMessage.class);
pubSubStore().subscribe(PubSubStore.LEAVE, new PubSubListener<JoinLeaveMessage>() {
pubSubStore().subscribe(PubSubType.LEAVE, new PubSubListener<JoinLeaveMessage>() {
@Override
public void onMessage(JoinLeaveMessage msg) {
String name = msg.getRoom();
namespacesHub.get(msg.getNamespace()).leave(name, msg.getSessionId());
log.debug("{} sessionId: {}", PubSubStore.LEAVE, msg.getSessionId());
log.debug("{} sessionId: {}", PubSubType.LEAVE, msg.getSessionId());
}
}, JoinLeaveMessage.class);
}

16
src/main/java/com/corundumstudio/socketio/store/pubsub/PubSubStore.java

@ -18,21 +18,9 @@ package com.corundumstudio.socketio.store.pubsub;
public interface PubSubStore {
// TODO refactor to enum
String CONNECT = "connect";
void publish(PubSubType type, PubSubMessage msg);
String DISCONNECT = "disconnect";
String JOIN = "join";
String LEAVE = "leave";
String DISPATCH = "dispatch";
void publish(String name, PubSubMessage msg);
<T extends PubSubMessage> void subscribe(String name, PubSubListener<T> listener, Class<T> clazz);
<T extends PubSubMessage> void subscribe(PubSubType type, PubSubListener<T> listener, Class<T> clazz);
void unsubscribe(String name);

16
src/main/java/com/corundumstudio/socketio/store/pubsub/PubSubType.java

@ -0,0 +1,16 @@
package com.corundumstudio.socketio.store.pubsub;
public enum PubSubType {
CONNECT,
DISCONNECT,
JOIN,
LEAVE,
DISPATCH;
@Override
public String toString() {
return name().toLowerCase();
}
}
Loading…
Cancel
Save