Browse Source

Room handling optimization. Race condition during map entry removing fixed.

master
Nikita 11 years ago
parent
commit
119faa3753
  1. 3
      src/main/java/com/corundumstudio/socketio/BroadcastOperations.java
  2. 4
      src/main/java/com/corundumstudio/socketio/SocketIOClient.java
  3. 63
      src/main/java/com/corundumstudio/socketio/namespace/Namespace.java
  4. 4
      src/main/java/com/corundumstudio/socketio/transport/NamespaceClient.java

3
src/main/java/com/corundumstudio/socketio/BroadcastOperations.java

@ -22,6 +22,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import com.corundumstudio.socketio.misc.IterableCollection;
import com.corundumstudio.socketio.namespace.Namespace;
@ -46,7 +47,7 @@ public class BroadcastOperations implements ClientOperations {
this.clients = clients;
for (SocketIOClient socketIOClient : clients) {
Namespace namespace = (Namespace)socketIOClient.getNamespace();
List<String> rooms = namespace.getRooms(socketIOClient);
Set<String> rooms = namespace.getRooms(socketIOClient);
List<String> roomsList = namespaceRooms.get(namespace.getName());
if (roomsList == null) {

4
src/main/java/com/corundumstudio/socketio/SocketIOClient.java

@ -16,7 +16,7 @@
package com.corundumstudio.socketio;
import java.net.SocketAddress;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import com.corundumstudio.socketio.parser.Packet;
@ -123,6 +123,6 @@ public interface SocketIOClient extends ClientOperations, Store {
*
* @return
*/
List<String> getAllRooms();
Set<String> getAllRooms();
}

63
src/main/java/com/corundumstudio/socketio/namespace/Namespace.java

@ -20,8 +20,8 @@ import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Queue;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
@ -66,8 +66,8 @@ public class Namespace implements SocketIONamespace {
private final Queue<ConnectListener> connectListeners = new ConcurrentLinkedQueue<ConnectListener>();
private final Queue<DisconnectListener> disconnectListeners = new ConcurrentLinkedQueue<DisconnectListener>();
// TODO user Set<UUID>
private final ConcurrentMap<String, Queue<UUID>> roomClients = new ConcurrentHashMap<String, Queue<UUID>>();
private final ConcurrentMap<String, Set<UUID>> roomClients = new ConcurrentHashMap<String, Set<UUID>>();
private final ConcurrentMap<UUID, Set<String>> clientRooms = new ConcurrentHashMap<UUID, Set<String>>();
private final String name;
private final boolean autoAck;
@ -305,64 +305,69 @@ public class Namespace implements SocketIONamespace {
}
}
public void join(String room, UUID sessionId) {
Queue<UUID> clients = roomClients.get(room);
private <K, V> void join(ConcurrentMap<K, Set<V>> map, K key, V value) {
Set<V> clients = map.get(key);
if (clients == null) {
clients = new ConcurrentLinkedQueue<UUID>();
Queue<UUID> oldClients = roomClients.putIfAbsent(room, clients);
clients = Collections.newSetFromMap(new ConcurrentHashMap<V, Boolean>());
Set<V> oldClients = map.putIfAbsent(key, clients);
if (oldClients != null) {
clients = oldClients;
}
}
clients.add(sessionId);
clients.add(value);
// object may be changed due to other concurrent call
if (clients != roomClients.get(room)) {
if (clients != map.get(key)) {
// re-join if queue has been replaced
joinRoom(room, sessionId);
join(map, key, value);
}
}
public void join(String room, UUID sessionId) {
join(roomClients, room, sessionId);
join(clientRooms, sessionId, room);
}
public void leaveRoom(String room, UUID sessionId) {
leave(room, sessionId);
storeFactory.pubSubStore().publish(PubSubStore.LEAVE, new JoinLeaveMessage(sessionId, room, getName()));
}
public void leave(String room, UUID sessionId) {
Queue<UUID> clients = roomClients.get(room);
private <K, V> void leave(ConcurrentMap<K, Set<V>> map, K room, V sessionId) {
Set<V> clients = map.get(room);
if (clients == null) {
return;
}
clients.remove(sessionId);
if (clients.isEmpty()) {
clients = roomClients.remove(room);
// join which was added after queue deletion
for (UUID clientId : clients) {
joinRoom(room, clientId);
}
map.remove(room, Collections.emptySet());
}
}
// TODO optimize
public List<String> getRooms(SocketIOClient client) {
List<String> result = new ArrayList<String>();
for (Entry<String, Queue<UUID>> entry : roomClients.entrySet()) {
if (entry.getValue().contains(client.getSessionId())) {
result.add(entry.getKey());
}
public void leave(String room, UUID sessionId) {
leave(roomClients, room, sessionId);
leave(clientRooms, sessionId, room);
}
public Set<String> getRooms(SocketIOClient client) {
Set<String> res = clientRooms.get(client.getSessionId());
if (res == null) {
return Collections.emptySet();
}
return result;
return Collections.unmodifiableSet(res);
}
public Iterable<SocketIOClient> getRoomClients(String room) {
Queue<UUID> sessionIds = roomClients.get(room);
Set<UUID> sessionIds = roomClients.get(room);
if (sessionIds == null) {
return Collections.emptyList();
}
List<SocketIOClient> result = new ArrayList<SocketIOClient>();
for (SocketIOClient client : allClients.values()) {
if (sessionIds.contains(client.getSessionId())) {
for (UUID sessionId : sessionIds) {
SocketIOClient client = allClients.get(sessionId);
if(client != null) {
result.add(client);
}
}
@ -370,7 +375,7 @@ public class Namespace implements SocketIONamespace {
}
public Collection<SocketIOClient> getAllClients() {
return allClients.values();
return Collections.unmodifiableCollection(allClients.values());
}
public SocketIOClient getClient(UUID uuid) {

4
src/main/java/com/corundumstudio/socketio/transport/NamespaceClient.java

@ -17,7 +17,7 @@ package com.corundumstudio.socketio.transport;
import java.net.SocketAddress;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
@ -217,7 +217,7 @@ public class NamespaceClient implements SocketIOClient {
}
@Override
public List<String> getAllRooms() {
public Set<String> getAllRooms() {
return namespace.getRooms(this);
}

Loading…
Cancel
Save