diff --git a/src/main/java/com/corundumstudio/socketio/store/HazelcastPubSubStore.java b/src/main/java/com/corundumstudio/socketio/store/HazelcastPubSubStore.java index 987e88b..815b9c8 100644 --- a/src/main/java/com/corundumstudio/socketio/store/HazelcastPubSubStore.java +++ b/src/main/java/com/corundumstudio/socketio/store/HazelcastPubSubStore.java @@ -24,6 +24,7 @@ import java.util.concurrent.ConcurrentMap; import com.corundumstudio.socketio.store.pubsub.PubSubListener; import com.corundumstudio.socketio.store.pubsub.PubSubMessage; import com.corundumstudio.socketio.store.pubsub.PubSubStore; +import com.corundumstudio.socketio.store.pubsub.PubSubType; import com.hazelcast.core.HazelcastInstance; import com.hazelcast.core.ITopic; import com.hazelcast.core.Message; @@ -45,13 +46,14 @@ public class HazelcastPubSubStore implements PubSubStore { } @Override - public void publish(String name, PubSubMessage msg) { + public void publish(PubSubType type, PubSubMessage msg) { msg.setNodeId(nodeId); - hazelcastPub.getTopic(name).publish(msg); + hazelcastPub.getTopic(type.toString()).publish(msg); } @Override - public void subscribe(String name, final PubSubListener listener, Class clazz) { + public void subscribe(PubSubType type, final PubSubListener listener, Class clazz) { + String name = type.toString(); ITopic topic = hazelcastSub.getTopic(name); String regId = topic.addMessageListener(new MessageListener() { @Override @@ -75,7 +77,8 @@ public class HazelcastPubSubStore implements PubSubStore { } @Override - public void unsubscribe(String name) { + public void unsubscribe(PubSubType type) { + String name = type.toString(); Queue regIds = map.remove(name); ITopic topic = hazelcastSub.getTopic(name); for (String id : regIds) { diff --git a/src/main/java/com/corundumstudio/socketio/store/MemoryPubSubStore.java b/src/main/java/com/corundumstudio/socketio/store/MemoryPubSubStore.java index c912eb5..b472682 100644 --- a/src/main/java/com/corundumstudio/socketio/store/MemoryPubSubStore.java +++ b/src/main/java/com/corundumstudio/socketio/store/MemoryPubSubStore.java @@ -18,19 +18,20 @@ package com.corundumstudio.socketio.store; import com.corundumstudio.socketio.store.pubsub.PubSubListener; import com.corundumstudio.socketio.store.pubsub.PubSubMessage; import com.corundumstudio.socketio.store.pubsub.PubSubStore; +import com.corundumstudio.socketio.store.pubsub.PubSubType; public class MemoryPubSubStore implements PubSubStore { @Override - public void publish(String name, PubSubMessage msg) { + public void publish(PubSubType type, PubSubMessage msg) { } @Override - public void subscribe(String name, PubSubListener listener, Class clazz) { + public void subscribe(PubSubType type, PubSubListener listener, Class clazz) { } @Override - public void unsubscribe(String name) { + public void unsubscribe(PubSubType type) { } @Override diff --git a/src/main/java/com/corundumstudio/socketio/store/RedissonPubSubStore.java b/src/main/java/com/corundumstudio/socketio/store/RedissonPubSubStore.java index 7e5c864..2facb2d 100644 --- a/src/main/java/com/corundumstudio/socketio/store/RedissonPubSubStore.java +++ b/src/main/java/com/corundumstudio/socketio/store/RedissonPubSubStore.java @@ -26,6 +26,7 @@ import org.redisson.core.RTopic; import com.corundumstudio.socketio.store.pubsub.PubSubListener; import com.corundumstudio.socketio.store.pubsub.PubSubMessage; import com.corundumstudio.socketio.store.pubsub.PubSubStore; +import com.corundumstudio.socketio.store.pubsub.PubSubType; import io.netty.util.internal.PlatformDependent; @@ -44,13 +45,14 @@ public class RedissonPubSubStore implements PubSubStore { } @Override - public void publish(String name, PubSubMessage msg) { + public void publish(PubSubType type, PubSubMessage msg) { msg.setNodeId(nodeId); - redissonPub.getTopic(name).publish(msg); + redissonPub.getTopic(type.toString()).publish(msg); } @Override - public void subscribe(String name, final PubSubListener listener, Class clazz) { + public void subscribe(PubSubType type, final PubSubListener listener, Class clazz) { + String name = type.toString(); RTopic topic = redissonSub.getTopic(name); int regId = topic.addListener(new MessageListener() { @Override @@ -73,7 +75,8 @@ public class RedissonPubSubStore implements PubSubStore { } @Override - public void unsubscribe(String name) { + public void unsubscribe(PubSubType type) { + String name = type.toString(); Queue regIds = map.remove(name); RTopic topic = redissonSub.getTopic(name); for (Integer id : regIds) { diff --git a/src/main/java/com/corundumstudio/socketio/store/pubsub/PubSubStore.java b/src/main/java/com/corundumstudio/socketio/store/pubsub/PubSubStore.java index 998931e..4bdcf83 100644 --- a/src/main/java/com/corundumstudio/socketio/store/pubsub/PubSubStore.java +++ b/src/main/java/com/corundumstudio/socketio/store/pubsub/PubSubStore.java @@ -22,7 +22,7 @@ public interface PubSubStore { void subscribe(PubSubType type, PubSubListener listener, Class clazz); - void unsubscribe(String name); + void unsubscribe(PubSubType type); void shutdown(); diff --git a/src/main/java/com/corundumstudio/socketio/store/pubsub/PubSubType.java b/src/main/java/com/corundumstudio/socketio/store/pubsub/PubSubType.java index 4321b47..eef309b 100644 --- a/src/main/java/com/corundumstudio/socketio/store/pubsub/PubSubType.java +++ b/src/main/java/com/corundumstudio/socketio/store/pubsub/PubSubType.java @@ -1,16 +1,12 @@ package com.corundumstudio.socketio.store.pubsub; public enum PubSubType { - - CONNECT, - DISCONNECT, - JOIN, - LEAVE, - DISPATCH; - - @Override - public String toString() { - return name().toLowerCase(); - } - + + CONNECT, DISCONNECT, JOIN, LEAVE, DISPATCH; + + @Override + public String toString() { + return name().toLowerCase(); + } + }