|
@ -15,14 +15,11 @@ |
|
|
*/ |
|
|
*/ |
|
|
package com.corundumstudio.socketio.store; |
|
|
package com.corundumstudio.socketio.store; |
|
|
|
|
|
|
|
|
import io.netty.util.internal.PlatformDependent; |
|
|
|
|
|
|
|
|
|
|
|
import java.util.Queue; |
|
|
import java.util.Queue; |
|
|
import java.util.concurrent.ConcurrentHashMap; |
|
|
|
|
|
import java.util.concurrent.ConcurrentLinkedQueue; |
|
|
import java.util.concurrent.ConcurrentLinkedQueue; |
|
|
import java.util.concurrent.ConcurrentMap; |
|
|
import java.util.concurrent.ConcurrentMap; |
|
|
|
|
|
|
|
|
import org.redisson.Redisson; |
|
|
|
|
|
|
|
|
import org.redisson.RedissonClient; |
|
|
import org.redisson.core.MessageListener; |
|
|
import org.redisson.core.MessageListener; |
|
|
import org.redisson.core.RTopic; |
|
|
import org.redisson.core.RTopic; |
|
|
|
|
|
|
|
@ -30,15 +27,17 @@ import com.corundumstudio.socketio.store.pubsub.PubSubListener; |
|
|
import com.corundumstudio.socketio.store.pubsub.PubSubMessage; |
|
|
import com.corundumstudio.socketio.store.pubsub.PubSubMessage; |
|
|
import com.corundumstudio.socketio.store.pubsub.PubSubStore; |
|
|
import com.corundumstudio.socketio.store.pubsub.PubSubStore; |
|
|
|
|
|
|
|
|
|
|
|
import io.netty.util.internal.PlatformDependent; |
|
|
|
|
|
|
|
|
public class RedissonPubSubStore implements PubSubStore { |
|
|
public class RedissonPubSubStore implements PubSubStore { |
|
|
|
|
|
|
|
|
private final Redisson redissonPub; |
|
|
|
|
|
private final Redisson redissonSub; |
|
|
|
|
|
|
|
|
private final RedissonClient redissonPub; |
|
|
|
|
|
private final RedissonClient redissonSub; |
|
|
private final Long nodeId; |
|
|
private final Long nodeId; |
|
|
|
|
|
|
|
|
private final ConcurrentMap<String, Queue<Integer>> map = PlatformDependent.newConcurrentHashMap(); |
|
|
private final ConcurrentMap<String, Queue<Integer>> map = PlatformDependent.newConcurrentHashMap(); |
|
|
|
|
|
|
|
|
public RedissonPubSubStore(Redisson redissonPub, Redisson redissonSub, Long nodeId) { |
|
|
|
|
|
|
|
|
public RedissonPubSubStore(RedissonClient redissonPub, RedissonClient redissonSub, Long nodeId) { |
|
|
this.redissonPub = redissonPub; |
|
|
this.redissonPub = redissonPub; |
|
|
this.redissonSub = redissonSub; |
|
|
this.redissonSub = redissonSub; |
|
|
this.nodeId = nodeId; |
|
|
this.nodeId = nodeId; |
|
@ -55,7 +54,7 @@ public class RedissonPubSubStore implements PubSubStore { |
|
|
RTopic<T> topic = redissonSub.getTopic(name); |
|
|
RTopic<T> topic = redissonSub.getTopic(name); |
|
|
int regId = topic.addListener(new MessageListener<T>() { |
|
|
int regId = topic.addListener(new MessageListener<T>() { |
|
|
@Override |
|
|
@Override |
|
|
public void onMessage(T msg) { |
|
|
|
|
|
|
|
|
public void onMessage(String channel, T msg) { |
|
|
if (!nodeId.equals(msg.getNodeId())) { |
|
|
if (!nodeId.equals(msg.getNodeId())) { |
|
|
listener.onMessage(msg); |
|
|
listener.onMessage(msg); |
|
|
} |
|
|
} |
|
|