Browse Source

fix implementation classes of the PubSubStore interface with PubSubType

master
whg333 9 years ago
parent
commit
5f771b703b
  1. 13
      src/main/java/com/corundumstudio/socketio/store/HazelcastPubSubStore.java
  2. 7
      src/main/java/com/corundumstudio/socketio/store/MemoryPubSubStore.java
  3. 12
      src/main/java/com/corundumstudio/socketio/store/RedissonPubSubStore.java
  4. 2
      src/main/java/com/corundumstudio/socketio/store/pubsub/PubSubStore.java

13
src/main/java/com/corundumstudio/socketio/store/HazelcastPubSubStore.java

@ -24,6 +24,7 @@ import java.util.concurrent.ConcurrentMap;
import com.corundumstudio.socketio.store.pubsub.PubSubListener;
import com.corundumstudio.socketio.store.pubsub.PubSubMessage;
import com.corundumstudio.socketio.store.pubsub.PubSubStore;
import com.corundumstudio.socketio.store.pubsub.PubSubType;
import com.hazelcast.core.HazelcastInstance;
import com.hazelcast.core.ITopic;
import com.hazelcast.core.Message;
@ -45,14 +46,15 @@ public class HazelcastPubSubStore implements PubSubStore {
}
@Override
public void publish(String name, PubSubMessage msg) {
public void publish(PubSubType type, PubSubMessage msg) {
msg.setNodeId(nodeId);
hazelcastPub.getTopic(name).publish(msg);
hazelcastPub.getTopic(type.toString()).publish(msg);
}
@Override
public <T extends PubSubMessage> void subscribe(String name, final PubSubListener<T> listener, Class<T> clazz) {
ITopic<T> topic = hazelcastSub.getTopic(name);
public <T extends PubSubMessage> void subscribe(PubSubType type, final PubSubListener<T> listener, Class<T> clazz) {
String name = type.toString();
ITopic<T> topic = hazelcastSub.getTopic(name);
String regId = topic.addMessageListener(new MessageListener<T>() {
@Override
public void onMessage(Message<T> message) {
@ -75,7 +77,8 @@ public class HazelcastPubSubStore implements PubSubStore {
}
@Override
public void unsubscribe(String name) {
public void unsubscribe(PubSubType type) {
String name = type.toString();
Queue<String> regIds = map.remove(name);
ITopic<Object> topic = hazelcastSub.getTopic(name);
for (String id : regIds) {

7
src/main/java/com/corundumstudio/socketio/store/MemoryPubSubStore.java

@ -18,19 +18,20 @@ package com.corundumstudio.socketio.store;
import com.corundumstudio.socketio.store.pubsub.PubSubListener;
import com.corundumstudio.socketio.store.pubsub.PubSubMessage;
import com.corundumstudio.socketio.store.pubsub.PubSubStore;
import com.corundumstudio.socketio.store.pubsub.PubSubType;
public class MemoryPubSubStore implements PubSubStore {
@Override
public void publish(String name, PubSubMessage msg) {
public void publish(PubSubType type, PubSubMessage msg) {
}
@Override
public <T extends PubSubMessage> void subscribe(String name, PubSubListener<T> listener, Class<T> clazz) {
public <T extends PubSubMessage> void subscribe(PubSubType type, PubSubListener<T> listener, Class<T> clazz) {
}
@Override
public void unsubscribe(String name) {
public void unsubscribe(PubSubType type) {
}
@Override

12
src/main/java/com/corundumstudio/socketio/store/RedissonPubSubStore.java

@ -44,14 +44,15 @@ public class RedissonPubSubStore implements PubSubStore {
}
@Override
public void publish(String name, PubSubMessage msg) {
public void publish(com.corundumstudio.socketio.store.pubsub.PubSubType type, PubSubMessage msg) {
msg.setNodeId(nodeId);
redissonPub.getTopic(name).publish(msg);
redissonPub.getTopic(type.toString()).publish(msg);
}
@Override
public <T extends PubSubMessage> void subscribe(String name, final PubSubListener<T> listener, Class<T> clazz) {
RTopic<T> topic = redissonSub.getTopic(name);
public <T extends PubSubMessage> void subscribe(com.corundumstudio.socketio.store.pubsub.PubSubType type, final PubSubListener<T> listener, Class<T> clazz) {
String name = type.toString();
RTopic<T> topic = redissonSub.getTopic(name);
int regId = topic.addListener(new MessageListener<T>() {
@Override
public void onMessage(String channel, T msg) {
@ -73,7 +74,8 @@ public class RedissonPubSubStore implements PubSubStore {
}
@Override
public void unsubscribe(String name) {
public void unsubscribe(com.corundumstudio.socketio.store.pubsub.PubSubType type) {
String name = type.toString();
Queue<Integer> regIds = map.remove(name);
RTopic<Object> topic = redissonSub.getTopic(name);
for (Integer id : regIds) {

2
src/main/java/com/corundumstudio/socketio/store/pubsub/PubSubStore.java

@ -22,7 +22,7 @@ public interface PubSubStore {
<T extends PubSubMessage> void subscribe(PubSubType type, PubSubListener<T> listener, Class<T> clazz);
void unsubscribe(String name);
void unsubscribe(PubSubType type);
void shutdown();

Loading…
Cancel
Save