Browse Source

Room support. Issue #47

master
Nikita 12 years ago
parent
commit
5d3bb2d2ce
  1. 7
      src/main/java/com/corundumstudio/socketio/BroadcastOperations.java
  2. 4
      src/main/java/com/corundumstudio/socketio/SocketIOClient.java
  3. 8
      src/main/java/com/corundumstudio/socketio/SocketIOServer.java
  4. 8
      src/main/java/com/corundumstudio/socketio/misc/IterableCollection.java
  5. 50
      src/main/java/com/corundumstudio/socketio/namespace/Namespace.java
  6. 13
      src/main/java/com/corundumstudio/socketio/namespace/NamespacesHub.java
  7. 10
      src/main/java/com/corundumstudio/socketio/transport/NamespaceClient.java

7
src/main/java/com/corundumstudio/socketio/BroadcastOperations.java

@ -15,6 +15,9 @@
*/
package com.corundumstudio.socketio;
import java.util.Collection;
import com.corundumstudio.socketio.misc.IterableCollection;
import com.corundumstudio.socketio.parser.Packet;
public class BroadcastOperations implements ClientOperations {
@ -26,6 +29,10 @@ public class BroadcastOperations implements ClientOperations {
this.clients = clients;
}
public Collection<SocketIOClient> getClients() {
return new IterableCollection<SocketIOClient>(clients);
}
@Override
public void sendMessage(String message) {
for (SocketIOClient client : clients) {

4
src/main/java/com/corundumstudio/socketio/SocketIOClient.java

@ -91,4 +91,8 @@ public interface SocketIOClient extends ClientOperations {
*/
boolean isChannelOpen();
<T> void joinRoom(T roomKey);
<T> void leaveRoom(T roomKey);
}

8
src/main/java/com/corundumstudio/socketio/SocketIOServer.java

@ -16,6 +16,7 @@
package com.corundumstudio.socketio;
import java.net.InetSocketAddress;
import java.util.Collection;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.Channel;
@ -61,7 +62,7 @@ public class SocketIOServer implements ClientListeners {
*
* @return Iterable object with clients
*/
public Iterable<SocketIOClient> getAllClients() {
public Collection<SocketIOClient> getAllClients() {
return pipelineFactory.getAllClients();
}
@ -69,6 +70,11 @@ public class SocketIOServer implements ClientListeners {
return getBroadcastOperations(pipelineFactory.getAllClients());
}
public <T> BroadcastOperations getRoomOperations(T roomKey) {
Iterable<SocketIOClient> clients = namespacesHub.getRoomClients(roomKey);
return new BroadcastOperations(clients);
}
public BroadcastOperations getBroadcastOperations(Iterable<SocketIOClient> clients) {
return new BroadcastOperations(clients);
}

8
src/main/java/com/corundumstudio/socketio/misc/IterableCollection.java

@ -22,18 +22,22 @@ public class IterableCollection<T> extends AbstractCollection<T> {
private final CompositeIterable<T> iterable;
public IterableCollection(Iterable<T> iterable) {
this(new CompositeIterable(iterable));
}
public IterableCollection(CompositeIterable<T> iterable) {
this.iterable = iterable;
}
@Override
public Iterator<T> iterator() {
return new CompositeIterable(iterable).iterator();
return new CompositeIterable<T>(iterable).iterator();
}
@Override
public int size() {
Iterator<T> iterator = new CompositeIterable(iterable).iterator();
Iterator<T> iterator = new CompositeIterable<T>(iterable).iterator();
int count = 0;
while (iterator.hasNext()) {
iterator.next();

50
src/main/java/com/corundumstudio/socketio/namespace/Namespace.java

@ -15,6 +15,7 @@
*/
package com.corundumstudio.socketio.namespace;
import java.util.Collections;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@ -43,7 +44,7 @@ public class Namespace implements SocketIONamespace {
public static final String DEFAULT_NAME = "";
private final Set<SocketIOClient> clients = new ConcurrentHashSet<SocketIOClient>();
private final Set<SocketIOClient> allClients = new ConcurrentHashSet<SocketIOClient>();
private final ConcurrentMap<String, EventEntry<?>> eventListeners =
new ConcurrentHashMap<String, EventEntry<?>>();
private final ConcurrentMap<Class<?>, Queue<DataListener<?>>> jsonObjectListeners =
@ -52,6 +53,8 @@ public class Namespace implements SocketIONamespace {
private final Queue<ConnectListener> connectListeners = new ConcurrentLinkedQueue<ConnectListener>();
private final Queue<DisconnectListener> disconnectListeners = new ConcurrentLinkedQueue<DisconnectListener>();
private final ConcurrentMap<Object, Queue<SocketIOClient>> roomClients = new ConcurrentHashMap<Object, Queue<SocketIOClient>>();
private final String name;
private final JsonSupport jsonSupport;
@ -62,7 +65,7 @@ public class Namespace implements SocketIONamespace {
}
public void addClient(SocketIOClient client) {
clients.add(client);
allClients.add(client);
}
public String getName() {
@ -136,7 +139,7 @@ public class Namespace implements SocketIONamespace {
for (DisconnectListener listener : disconnectListeners) {
listener.onDisconnect(client);
}
clients.remove(client);
allClients.remove(client);
}
@Override
@ -161,7 +164,7 @@ public class Namespace implements SocketIONamespace {
@Override
public BroadcastOperations getBroadcastOperations() {
return new BroadcastOperations(clients);
return new BroadcastOperations(allClients);
}
@ -202,4 +205,43 @@ public class Namespace implements SocketIONamespace {
engine.scan(this, listeners, listenersClass);
}
public void joinRoom(Object roomKey, SocketIOClient namespaceClient) {
Queue<SocketIOClient> clients = roomClients.get(roomKey);
if (clients == null) {
clients = new ConcurrentLinkedQueue<SocketIOClient>();
Queue<SocketIOClient> oldClients = roomClients.putIfAbsent(roomKey, clients);
if (oldClients != null) {
clients = oldClients;
}
}
clients.add(namespaceClient);
if (clients != roomClients.get(roomKey)) {
// re-join if queue has been replaced
joinRoom(roomKey, namespaceClient);
}
}
public void leaveRoom(Object roomKey, SocketIOClient namespaceClient) {
Queue<SocketIOClient> clients = roomClients.get(roomKey);
if (clients == null) {
return;
}
clients.remove(namespaceClient);
if (clients.isEmpty()) {
roomClients.remove(roomKey);
// join which was added after queue deletion
for (SocketIOClient socketIOClient : clients) {
joinRoom(roomKey, socketIOClient);
}
}
}
public Iterable<SocketIOClient> getRoomClients(Object roomKey) {
Queue<SocketIOClient> clients = roomClients.get(roomKey);
if (clients == null) {
return Collections.emptyList();
}
return clients;
}
}

13
src/main/java/com/corundumstudio/socketio/namespace/NamespacesHub.java

@ -15,9 +15,13 @@
*/
package com.corundumstudio.socketio.namespace;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import com.corundumstudio.socketio.SocketIOClient;
import com.corundumstudio.socketio.misc.CompositeIterable;
import com.corundumstudio.socketio.parser.JsonSupport;
public class NamespacesHub {
@ -41,6 +45,15 @@ public class NamespacesHub {
return namespace;
}
public <T> Iterable<SocketIOClient> getRoomClients(T roomKey) {
List<Iterable<SocketIOClient>> allClients = new ArrayList<Iterable<SocketIOClient>>();
for (Namespace namespace : namespaces.values()) {
Iterable<SocketIOClient> clients = namespace.getRoomClients(roomKey);
allClients.add(clients);
}
return new CompositeIterable<SocketIOClient>(allClients);
}
public Namespace get(String name) {
return namespaces.get(name);
}

10
src/main/java/com/corundumstudio/socketio/transport/NamespaceClient.java

@ -169,4 +169,14 @@ public class NamespaceClient implements SocketIOClient {
return true;
}
@Override
public <T> void joinRoom(T roomKey) {
namespace.joinRoom(roomKey, this);
}
@Override
public <T> void leaveRoom(T roomKey) {
namespace.leaveRoom(roomKey, this);
}
}
Loading…
Cancel
Save