From 8441045ed5adceb6b0fdae41ed3d534fa091f070 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=A2=81=E5=98=89=E7=A5=BA?= Date: Tue, 11 Aug 2020 19:59:58 +0800 Subject: [PATCH 1/2] bugfix: multy node with redisson receive multy same message --- .../socketio/BroadcastOperations.java | 117 ++--------------- .../MultiRoomBroadcastOperations.java | 117 +++++++++++++++++ .../SingleRoomBroadcastOperations.java | 123 ++++++++++++++++++ .../socketio/SocketIOServer.java | 25 +++- .../socketio/namespace/Namespace.java | 5 +- 5 files changed, 274 insertions(+), 113 deletions(-) create mode 100644 src/main/java/com/corundumstudio/socketio/MultiRoomBroadcastOperations.java create mode 100644 src/main/java/com/corundumstudio/socketio/SingleRoomBroadcastOperations.java diff --git a/src/main/java/com/corundumstudio/socketio/BroadcastOperations.java b/src/main/java/com/corundumstudio/socketio/BroadcastOperations.java index 63b3456..7866d99 100644 --- a/src/main/java/com/corundumstudio/socketio/BroadcastOperations.java +++ b/src/main/java/com/corundumstudio/socketio/BroadcastOperations.java @@ -15,123 +15,24 @@ */ package com.corundumstudio.socketio; -import java.util.Arrays; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; - -import com.corundumstudio.socketio.misc.IterableCollection; -import com.corundumstudio.socketio.namespace.Namespace; import com.corundumstudio.socketio.protocol.Packet; -import com.corundumstudio.socketio.protocol.PacketType; -import com.corundumstudio.socketio.store.StoreFactory; -import com.corundumstudio.socketio.store.pubsub.DispatchMessage; -import com.corundumstudio.socketio.store.pubsub.PubSubType; + +import java.util.Collection; /** - * Fully thread-safe. + * broadcast interface * */ -public class BroadcastOperations implements ClientOperations { - - private final Iterable clients; - private final StoreFactory storeFactory; - - public BroadcastOperations(Iterable clients, StoreFactory storeFactory) { - super(); - this.clients = clients; - this.storeFactory = storeFactory; - } - - private void dispatch(Packet packet) { - Map> namespaceRooms = new HashMap>(); - for (SocketIOClient socketIOClient : clients) { - Namespace namespace = (Namespace)socketIOClient.getNamespace(); - Set rooms = namespace.getRooms(socketIOClient); - - Set roomsList = namespaceRooms.get(namespace.getName()); - if (roomsList == null) { - roomsList = new HashSet(); - namespaceRooms.put(namespace.getName(), roomsList); - } - roomsList.addAll(rooms); - } - for (Entry> entry : namespaceRooms.entrySet()) { - for (String room : entry.getValue()) { - storeFactory.pubSubStore().publish(PubSubType.DISPATCH, new DispatchMessage(room, packet, entry.getKey())); - } - } - } - - public Collection getClients() { - return new IterableCollection(clients); - } - - @Override - public void send(Packet packet) { - for (SocketIOClient client : clients) { - client.send(packet); - } - dispatch(packet); - } - - public void send(Packet packet, BroadcastAckCallback ackCallback) { - for (SocketIOClient client : clients) { - client.send(packet, ackCallback.createClientCallback(client)); - } - ackCallback.loopFinished(); - } +public interface BroadcastOperations extends ClientOperations { - @Override - public void disconnect() { - for (SocketIOClient client : clients) { - client.disconnect(); - } - } + Collection getClients(); - public void sendEvent(String name, SocketIOClient excludedClient, Object... data) { - Packet packet = new Packet(PacketType.MESSAGE); - packet.setSubType(PacketType.EVENT); - packet.setName(name); - packet.setData(Arrays.asList(data)); + void send(Packet packet, BroadcastAckCallback ackCallback); - for (SocketIOClient client : clients) { - if (client.getSessionId().equals(excludedClient.getSessionId())) { - continue; - } - client.send(packet); - } - dispatch(packet); - } - - @Override - public void sendEvent(String name, Object... data) { - Packet packet = new Packet(PacketType.MESSAGE); - packet.setSubType(PacketType.EVENT); - packet.setName(name); - packet.setData(Arrays.asList(data)); - send(packet); - } + void sendEvent(String name, SocketIOClient excludedClient, Object... data); - public void sendEvent(String name, Object data, BroadcastAckCallback ackCallback) { - for (SocketIOClient client : clients) { - client.sendEvent(name, ackCallback.createClientCallback(client), data); - } - ackCallback.loopFinished(); - } - - public void sendEvent(String name, Object data, SocketIOClient excludedClient, BroadcastAckCallback ackCallback) { - for (SocketIOClient client : clients) { - if (client.getSessionId().equals(excludedClient.getSessionId())) { - continue; - } - client.sendEvent(name, ackCallback.createClientCallback(client), data); - } - ackCallback.loopFinished(); - } + void sendEvent(String name, Object data, BroadcastAckCallback ackCallback); + void sendEvent(String name, Object data, SocketIOClient excludedClient, BroadcastAckCallback ackCallback); } diff --git a/src/main/java/com/corundumstudio/socketio/MultiRoomBroadcastOperations.java b/src/main/java/com/corundumstudio/socketio/MultiRoomBroadcastOperations.java new file mode 100644 index 0000000..cac1744 --- /dev/null +++ b/src/main/java/com/corundumstudio/socketio/MultiRoomBroadcastOperations.java @@ -0,0 +1,117 @@ +/** + * Copyright (c) 2012-2019 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.corundumstudio.socketio; + +import com.corundumstudio.socketio.protocol.Packet; + +import java.util.Collection; +import java.util.HashSet; +import java.util.Set; + +/** + * @Author: liangjiaqi + * @Date: 2020/8/8 6:02 PM + */ +public class MultiRoomBroadcastOperations implements BroadcastOperations { + + private Collection broadcastOperations; + + public MultiRoomBroadcastOperations(Collection broadcastOperations) { + this.broadcastOperations = broadcastOperations; + } + + @Override + public Collection getClients() { + Set clients = new HashSet(); + if( this.broadcastOperations == null || this.broadcastOperations.size() == 0 ) { + return clients; + } + for( BroadcastOperations b : this.broadcastOperations ) { + clients.addAll( b.getClients() ); + } + return clients; + } + + @Override + public void send(Packet packet, BroadcastAckCallback ackCallback) { + if( this.broadcastOperations == null || this.broadcastOperations.size() == 0 ) { + return; + } + for( BroadcastOperations b : this.broadcastOperations ) { + b.send( packet, ackCallback ); + } + } + + @Override + public void sendEvent(String name, SocketIOClient excludedClient, Object... data) { + if( this.broadcastOperations == null || this.broadcastOperations.size() == 0 ) { + return; + } + for( BroadcastOperations b : this.broadcastOperations ) { + b.sendEvent( name, excludedClient, data ); + } + } + + @Override + public void sendEvent(String name, Object data, BroadcastAckCallback ackCallback) { + if( this.broadcastOperations == null || this.broadcastOperations.size() == 0 ) { + return; + } + for( BroadcastOperations b : this.broadcastOperations ) { + b.sendEvent( name, data, ackCallback ); + } + } + + @Override + public void sendEvent(String name, Object data, SocketIOClient excludedClient, BroadcastAckCallback ackCallback) { + if( this.broadcastOperations == null || this.broadcastOperations.size() == 0 ) { + return; + } + for( BroadcastOperations b : this.broadcastOperations ) { + b.sendEvent( name, data, excludedClient, ackCallback ); + } + } + + @Override + public void send(Packet packet) { + if( this.broadcastOperations == null || this.broadcastOperations.size() == 0 ) { + return; + } + for( BroadcastOperations b : this.broadcastOperations ) { + b.send( packet ); + } + } + + @Override + public void disconnect() { + if( this.broadcastOperations == null || this.broadcastOperations.size() == 0 ) { + return; + } + for( BroadcastOperations b : this.broadcastOperations ) { + b.disconnect(); + } + } + + @Override + public void sendEvent(String name, Object... data) { + if( this.broadcastOperations == null || this.broadcastOperations.size() == 0 ) { + return; + } + for( BroadcastOperations b : this.broadcastOperations ) { + b.sendEvent( name, data ); + } + } +} diff --git a/src/main/java/com/corundumstudio/socketio/SingleRoomBroadcastOperations.java b/src/main/java/com/corundumstudio/socketio/SingleRoomBroadcastOperations.java new file mode 100644 index 0000000..4f9ac67 --- /dev/null +++ b/src/main/java/com/corundumstudio/socketio/SingleRoomBroadcastOperations.java @@ -0,0 +1,123 @@ +/** + * Copyright (c) 2012-2019 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.corundumstudio.socketio; + +import com.corundumstudio.socketio.misc.IterableCollection; +import com.corundumstudio.socketio.protocol.Packet; +import com.corundumstudio.socketio.protocol.PacketType; +import com.corundumstudio.socketio.store.StoreFactory; +import com.corundumstudio.socketio.store.pubsub.DispatchMessage; +import com.corundumstudio.socketio.store.pubsub.PubSubType; + +import java.util.Arrays; +import java.util.Collection; + +/** + * @Author: liangjiaqi + * @Date: 2020/8/8 6:08 PM + */ +public class SingleRoomBroadcastOperations implements BroadcastOperations { + private final String namespace; + private final String room; + private final Iterable clients; + private final StoreFactory storeFactory; + + public SingleRoomBroadcastOperations(String namespace, String room, Iterable clients, StoreFactory storeFactory) { + super(); + this.namespace = namespace; + this.room = room; + this.clients = clients; + this.storeFactory = storeFactory; + } + + private void dispatch(Packet packet) { + this.storeFactory.pubSubStore().publish( + PubSubType.DISPATCH, + new DispatchMessage(this.room, packet, this.namespace)); + } + + @Override + public Collection getClients() { + return new IterableCollection(clients); + } + + @Override + public void send(Packet packet) { + for (SocketIOClient client : clients) { + client.send(packet); + } + dispatch(packet); + } + + @Override + public void send(Packet packet, BroadcastAckCallback ackCallback) { + for (SocketIOClient client : clients) { + client.send(packet, ackCallback.createClientCallback(client)); + } + ackCallback.loopFinished(); + } + + @Override + public void disconnect() { + for (SocketIOClient client : clients) { + client.disconnect(); + } + } + + @Override + public void sendEvent(String name, SocketIOClient excludedClient, Object... data) { + Packet packet = new Packet(PacketType.MESSAGE); + packet.setSubType(PacketType.EVENT); + packet.setName(name); + packet.setData(Arrays.asList(data)); + + for (SocketIOClient client : clients) { + if (client.getSessionId().equals(excludedClient.getSessionId())) { + continue; + } + client.send(packet); + } + dispatch(packet); + } + + @Override + public void sendEvent(String name, Object... data) { + Packet packet = new Packet(PacketType.MESSAGE); + packet.setSubType(PacketType.EVENT); + packet.setName(name); + packet.setData(Arrays.asList(data)); + send(packet); + } + + @Override + public void sendEvent(String name, Object data, BroadcastAckCallback ackCallback) { + for (SocketIOClient client : clients) { + client.sendEvent(name, ackCallback.createClientCallback(client), data); + } + ackCallback.loopFinished(); + } + + @Override + public void sendEvent(String name, Object data, SocketIOClient excludedClient, BroadcastAckCallback ackCallback) { + for (SocketIOClient client : clients) { + if (client.getSessionId().equals(excludedClient.getSessionId())) { + continue; + } + client.sendEvent(name, ackCallback.createClientCallback(client), data); + } + ackCallback.loopFinished(); + } +} diff --git a/src/main/java/com/corundumstudio/socketio/SocketIOServer.java b/src/main/java/com/corundumstudio/socketio/SocketIOServer.java index ff06769..83f4b92 100644 --- a/src/main/java/com/corundumstudio/socketio/SocketIOServer.java +++ b/src/main/java/com/corundumstudio/socketio/SocketIOServer.java @@ -29,7 +29,9 @@ import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; import java.net.InetSocketAddress; +import java.util.ArrayList; import java.util.Collection; +import java.util.List; import java.util.UUID; import org.slf4j.Logger; @@ -97,7 +99,16 @@ public class SocketIOServer implements ClientListeners { } public BroadcastOperations getBroadcastOperations() { - return new BroadcastOperations(getAllClients(), configCopy.getStoreFactory()); + Collection namespaces = namespacesHub.getAllNamespaces(); + List list = new ArrayList(); + BroadcastOperations broadcast = null; + if( namespaces != null && namespaces.size() > 0 ) { + for( SocketIONamespace n : namespaces ) { + broadcast = n.getBroadcastOperations(); + list.add( broadcast ); + } + } + return new MultiRoomBroadcastOperations( list ); } /** @@ -108,8 +119,16 @@ public class SocketIOServer implements ClientListeners { * @return broadcast operations */ public BroadcastOperations getRoomOperations(String room) { - Iterable clients = namespacesHub.getRoomClients(room); - return new BroadcastOperations(clients, configCopy.getStoreFactory()); + Collection namespaces = namespacesHub.getAllNamespaces(); + List list = new ArrayList(); + BroadcastOperations broadcast = null; + if( namespaces != null && namespaces.size() > 0 ) { + for( SocketIONamespace n : namespaces ) { + broadcast = n.getRoomOperations( room ); + list.add( broadcast ); + } + } + return new MultiRoomBroadcastOperations( list ); } /** diff --git a/src/main/java/com/corundumstudio/socketio/namespace/Namespace.java b/src/main/java/com/corundumstudio/socketio/namespace/Namespace.java index 4eb1df3..8dde9a4 100644 --- a/src/main/java/com/corundumstudio/socketio/namespace/Namespace.java +++ b/src/main/java/com/corundumstudio/socketio/namespace/Namespace.java @@ -31,6 +31,7 @@ import com.corundumstudio.socketio.AckRequest; import com.corundumstudio.socketio.BroadcastOperations; import com.corundumstudio.socketio.Configuration; import com.corundumstudio.socketio.MultiTypeArgs; +import com.corundumstudio.socketio.SingleRoomBroadcastOperations; import com.corundumstudio.socketio.SocketIOClient; import com.corundumstudio.socketio.SocketIONamespace; import com.corundumstudio.socketio.annotation.ScannerEngine; @@ -239,12 +240,12 @@ public class Namespace implements SocketIONamespace { @Override public BroadcastOperations getBroadcastOperations() { - return new BroadcastOperations(allClients.values(), storeFactory); + return new SingleRoomBroadcastOperations(getName(), getName(), allClients.values(), storeFactory); } @Override public BroadcastOperations getRoomOperations(String room) { - return new BroadcastOperations(getRoomClients(room), storeFactory); + return new SingleRoomBroadcastOperations(getName(), room, getRoomClients(room), storeFactory); } @Override From 452870d6aff013133760b4df9f9714c8824099a4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=A2=81=E5=98=89=E7=A5=BA?= Date: Tue, 11 Aug 2020 20:03:00 +0800 Subject: [PATCH 2/2] bugfix: node just publish one leave msg on client which joined multi rooms disconnect --- .../com/corundumstudio/socketio/namespace/Namespace.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/corundumstudio/socketio/namespace/Namespace.java b/src/main/java/com/corundumstudio/socketio/namespace/Namespace.java index 8dde9a4..97057dd 100644 --- a/src/main/java/com/corundumstudio/socketio/namespace/Namespace.java +++ b/src/main/java/com/corundumstudio/socketio/namespace/Namespace.java @@ -188,11 +188,10 @@ public class Namespace implements SocketIONamespace { Set joinedRooms = client.getAllRooms(); allClients.remove(client.getSessionId()); - leave(getName(), client.getSessionId()); - storeFactory.pubSubStore().publish(PubSubType.LEAVE, new JoinLeaveMessage(client.getSessionId(), getName(), getName())); - + // client must leave all rooms and publish the leave msg one by one on disconnect. for (String joinedRoom : joinedRooms) { leave(roomClients, joinedRoom, client.getSessionId()); + storeFactory.pubSubStore().publish(PubSubType.LEAVE, new JoinLeaveMessage(client.getSessionId(), joinedRoom, getName())); } clientRooms.remove(client.getSessionId());