From 5f771b703b2d3a0191f64d65942dfacdb45b26a2 Mon Sep 17 00:00:00 2001 From: whg333 Date: Sat, 16 Apr 2016 00:23:42 +0800 Subject: [PATCH] fix implementation classes of the PubSubStore interface with PubSubType --- .../socketio/store/HazelcastPubSubStore.java | 13 ++++++++----- .../socketio/store/MemoryPubSubStore.java | 7 ++++--- .../socketio/store/RedissonPubSubStore.java | 12 +++++++----- .../socketio/store/pubsub/PubSubStore.java | 2 +- 4 files changed, 20 insertions(+), 14 deletions(-) diff --git a/src/main/java/com/corundumstudio/socketio/store/HazelcastPubSubStore.java b/src/main/java/com/corundumstudio/socketio/store/HazelcastPubSubStore.java index 987e88b..a8d0509 100644 --- a/src/main/java/com/corundumstudio/socketio/store/HazelcastPubSubStore.java +++ b/src/main/java/com/corundumstudio/socketio/store/HazelcastPubSubStore.java @@ -24,6 +24,7 @@ import java.util.concurrent.ConcurrentMap; import com.corundumstudio.socketio.store.pubsub.PubSubListener; import com.corundumstudio.socketio.store.pubsub.PubSubMessage; import com.corundumstudio.socketio.store.pubsub.PubSubStore; +import com.corundumstudio.socketio.store.pubsub.PubSubType; import com.hazelcast.core.HazelcastInstance; import com.hazelcast.core.ITopic; import com.hazelcast.core.Message; @@ -45,14 +46,15 @@ public class HazelcastPubSubStore implements PubSubStore { } @Override - public void publish(String name, PubSubMessage msg) { + public void publish(PubSubType type, PubSubMessage msg) { msg.setNodeId(nodeId); - hazelcastPub.getTopic(name).publish(msg); + hazelcastPub.getTopic(type.toString()).publish(msg); } @Override - public void subscribe(String name, final PubSubListener listener, Class clazz) { - ITopic topic = hazelcastSub.getTopic(name); + public void subscribe(PubSubType type, final PubSubListener listener, Class clazz) { + String name = type.toString(); + ITopic topic = hazelcastSub.getTopic(name); String regId = topic.addMessageListener(new MessageListener() { @Override public void onMessage(Message message) { @@ -75,7 +77,8 @@ public class HazelcastPubSubStore implements PubSubStore { } @Override - public void unsubscribe(String name) { + public void unsubscribe(PubSubType type) { + String name = type.toString(); Queue regIds = map.remove(name); ITopic topic = hazelcastSub.getTopic(name); for (String id : regIds) { diff --git a/src/main/java/com/corundumstudio/socketio/store/MemoryPubSubStore.java b/src/main/java/com/corundumstudio/socketio/store/MemoryPubSubStore.java index c912eb5..b472682 100644 --- a/src/main/java/com/corundumstudio/socketio/store/MemoryPubSubStore.java +++ b/src/main/java/com/corundumstudio/socketio/store/MemoryPubSubStore.java @@ -18,19 +18,20 @@ package com.corundumstudio.socketio.store; import com.corundumstudio.socketio.store.pubsub.PubSubListener; import com.corundumstudio.socketio.store.pubsub.PubSubMessage; import com.corundumstudio.socketio.store.pubsub.PubSubStore; +import com.corundumstudio.socketio.store.pubsub.PubSubType; public class MemoryPubSubStore implements PubSubStore { @Override - public void publish(String name, PubSubMessage msg) { + public void publish(PubSubType type, PubSubMessage msg) { } @Override - public void subscribe(String name, PubSubListener listener, Class clazz) { + public void subscribe(PubSubType type, PubSubListener listener, Class clazz) { } @Override - public void unsubscribe(String name) { + public void unsubscribe(PubSubType type) { } @Override diff --git a/src/main/java/com/corundumstudio/socketio/store/RedissonPubSubStore.java b/src/main/java/com/corundumstudio/socketio/store/RedissonPubSubStore.java index 7e5c864..e57216f 100644 --- a/src/main/java/com/corundumstudio/socketio/store/RedissonPubSubStore.java +++ b/src/main/java/com/corundumstudio/socketio/store/RedissonPubSubStore.java @@ -44,14 +44,15 @@ public class RedissonPubSubStore implements PubSubStore { } @Override - public void publish(String name, PubSubMessage msg) { + public void publish(com.corundumstudio.socketio.store.pubsub.PubSubType type, PubSubMessage msg) { msg.setNodeId(nodeId); - redissonPub.getTopic(name).publish(msg); + redissonPub.getTopic(type.toString()).publish(msg); } @Override - public void subscribe(String name, final PubSubListener listener, Class clazz) { - RTopic topic = redissonSub.getTopic(name); + public void subscribe(com.corundumstudio.socketio.store.pubsub.PubSubType type, final PubSubListener listener, Class clazz) { + String name = type.toString(); + RTopic topic = redissonSub.getTopic(name); int regId = topic.addListener(new MessageListener() { @Override public void onMessage(String channel, T msg) { @@ -73,7 +74,8 @@ public class RedissonPubSubStore implements PubSubStore { } @Override - public void unsubscribe(String name) { + public void unsubscribe(com.corundumstudio.socketio.store.pubsub.PubSubType type) { + String name = type.toString(); Queue regIds = map.remove(name); RTopic topic = redissonSub.getTopic(name); for (Integer id : regIds) { diff --git a/src/main/java/com/corundumstudio/socketio/store/pubsub/PubSubStore.java b/src/main/java/com/corundumstudio/socketio/store/pubsub/PubSubStore.java index 998931e..4bdcf83 100644 --- a/src/main/java/com/corundumstudio/socketio/store/pubsub/PubSubStore.java +++ b/src/main/java/com/corundumstudio/socketio/store/pubsub/PubSubStore.java @@ -22,7 +22,7 @@ public interface PubSubStore { void subscribe(PubSubType type, PubSubListener listener, Class clazz); - void unsubscribe(String name); + void unsubscribe(PubSubType type); void shutdown();