Browse Source

Merge pull request #342 from whg333/master

fix implementation classes of the PubSubStore interface with PubSubType
master
Nikita Koksharov 9 years ago
parent
commit
891c467a84
  1. 11
      src/main/java/com/corundumstudio/socketio/store/HazelcastPubSubStore.java
  2. 7
      src/main/java/com/corundumstudio/socketio/store/MemoryPubSubStore.java
  3. 11
      src/main/java/com/corundumstudio/socketio/store/RedissonPubSubStore.java
  4. 2
      src/main/java/com/corundumstudio/socketio/store/pubsub/PubSubStore.java
  5. 6
      src/main/java/com/corundumstudio/socketio/store/pubsub/PubSubType.java

11
src/main/java/com/corundumstudio/socketio/store/HazelcastPubSubStore.java

@ -24,6 +24,7 @@ import java.util.concurrent.ConcurrentMap;
import com.corundumstudio.socketio.store.pubsub.PubSubListener; import com.corundumstudio.socketio.store.pubsub.PubSubListener;
import com.corundumstudio.socketio.store.pubsub.PubSubMessage; import com.corundumstudio.socketio.store.pubsub.PubSubMessage;
import com.corundumstudio.socketio.store.pubsub.PubSubStore; import com.corundumstudio.socketio.store.pubsub.PubSubStore;
import com.corundumstudio.socketio.store.pubsub.PubSubType;
import com.hazelcast.core.HazelcastInstance; import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.ITopic; import com.hazelcast.core.ITopic;
import com.hazelcast.core.Message; import com.hazelcast.core.Message;
@ -45,13 +46,14 @@ public class HazelcastPubSubStore implements PubSubStore {
} }
@Override @Override
public void publish(String name, PubSubMessage msg) {
public void publish(PubSubType type, PubSubMessage msg) {
msg.setNodeId(nodeId); msg.setNodeId(nodeId);
hazelcastPub.getTopic(name).publish(msg);
hazelcastPub.getTopic(type.toString()).publish(msg);
} }
@Override @Override
public <T extends PubSubMessage> void subscribe(String name, final PubSubListener<T> listener, Class<T> clazz) {
public <T extends PubSubMessage> void subscribe(PubSubType type, final PubSubListener<T> listener, Class<T> clazz) {
String name = type.toString();
ITopic<T> topic = hazelcastSub.getTopic(name); ITopic<T> topic = hazelcastSub.getTopic(name);
String regId = topic.addMessageListener(new MessageListener<T>() { String regId = topic.addMessageListener(new MessageListener<T>() {
@Override @Override
@ -75,7 +77,8 @@ public class HazelcastPubSubStore implements PubSubStore {
} }
@Override @Override
public void unsubscribe(String name) {
public void unsubscribe(PubSubType type) {
String name = type.toString();
Queue<String> regIds = map.remove(name); Queue<String> regIds = map.remove(name);
ITopic<Object> topic = hazelcastSub.getTopic(name); ITopic<Object> topic = hazelcastSub.getTopic(name);
for (String id : regIds) { for (String id : regIds) {

7
src/main/java/com/corundumstudio/socketio/store/MemoryPubSubStore.java

@ -18,19 +18,20 @@ package com.corundumstudio.socketio.store;
import com.corundumstudio.socketio.store.pubsub.PubSubListener; import com.corundumstudio.socketio.store.pubsub.PubSubListener;
import com.corundumstudio.socketio.store.pubsub.PubSubMessage; import com.corundumstudio.socketio.store.pubsub.PubSubMessage;
import com.corundumstudio.socketio.store.pubsub.PubSubStore; import com.corundumstudio.socketio.store.pubsub.PubSubStore;
import com.corundumstudio.socketio.store.pubsub.PubSubType;
public class MemoryPubSubStore implements PubSubStore { public class MemoryPubSubStore implements PubSubStore {
@Override @Override
public void publish(String name, PubSubMessage msg) {
public void publish(PubSubType type, PubSubMessage msg) {
} }
@Override @Override
public <T extends PubSubMessage> void subscribe(String name, PubSubListener<T> listener, Class<T> clazz) {
public <T extends PubSubMessage> void subscribe(PubSubType type, PubSubListener<T> listener, Class<T> clazz) {
} }
@Override @Override
public void unsubscribe(String name) {
public void unsubscribe(PubSubType type) {
} }
@Override @Override

11
src/main/java/com/corundumstudio/socketio/store/RedissonPubSubStore.java

@ -26,6 +26,7 @@ import org.redisson.core.RTopic;
import com.corundumstudio.socketio.store.pubsub.PubSubListener; import com.corundumstudio.socketio.store.pubsub.PubSubListener;
import com.corundumstudio.socketio.store.pubsub.PubSubMessage; import com.corundumstudio.socketio.store.pubsub.PubSubMessage;
import com.corundumstudio.socketio.store.pubsub.PubSubStore; import com.corundumstudio.socketio.store.pubsub.PubSubStore;
import com.corundumstudio.socketio.store.pubsub.PubSubType;
import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.PlatformDependent;
@ -44,13 +45,14 @@ public class RedissonPubSubStore implements PubSubStore {
} }
@Override @Override
public void publish(String name, PubSubMessage msg) {
public void publish(PubSubType type, PubSubMessage msg) {
msg.setNodeId(nodeId); msg.setNodeId(nodeId);
redissonPub.getTopic(name).publish(msg);
redissonPub.getTopic(type.toString()).publish(msg);
} }
@Override @Override
public <T extends PubSubMessage> void subscribe(String name, final PubSubListener<T> listener, Class<T> clazz) {
public <T extends PubSubMessage> void subscribe(PubSubType type, final PubSubListener<T> listener, Class<T> clazz) {
String name = type.toString();
RTopic<T> topic = redissonSub.getTopic(name); RTopic<T> topic = redissonSub.getTopic(name);
int regId = topic.addListener(new MessageListener<T>() { int regId = topic.addListener(new MessageListener<T>() {
@Override @Override
@ -73,7 +75,8 @@ public class RedissonPubSubStore implements PubSubStore {
} }
@Override @Override
public void unsubscribe(String name) {
public void unsubscribe(PubSubType type) {
String name = type.toString();
Queue<Integer> regIds = map.remove(name); Queue<Integer> regIds = map.remove(name);
RTopic<Object> topic = redissonSub.getTopic(name); RTopic<Object> topic = redissonSub.getTopic(name);
for (Integer id : regIds) { for (Integer id : regIds) {

2
src/main/java/com/corundumstudio/socketio/store/pubsub/PubSubStore.java

@ -22,7 +22,7 @@ public interface PubSubStore {
<T extends PubSubMessage> void subscribe(PubSubType type, PubSubListener<T> listener, Class<T> clazz); <T extends PubSubMessage> void subscribe(PubSubType type, PubSubListener<T> listener, Class<T> clazz);
void unsubscribe(String name);
void unsubscribe(PubSubType type);
void shutdown(); void shutdown();

6
src/main/java/com/corundumstudio/socketio/store/pubsub/PubSubType.java

@ -2,11 +2,7 @@ package com.corundumstudio.socketio.store.pubsub;
public enum PubSubType { public enum PubSubType {
CONNECT,
DISCONNECT,
JOIN,
LEAVE,
DISPATCH;
CONNECT, DISCONNECT, JOIN, LEAVE, DISPATCH;
@Override @Override
public String toString() { public String toString() {

Loading…
Cancel
Save