Browse Source
Merge pull request #771 from GaryLeung922/hotfix
Merge pull request #771 from GaryLeung922/hotfix
Hotfix:multy node with redisson receive multy same messagemaster
committed by
GitHub
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 276 additions and 116 deletions
-
117src/main/java/com/corundumstudio/socketio/BroadcastOperations.java
-
117src/main/java/com/corundumstudio/socketio/MultiRoomBroadcastOperations.java
-
123src/main/java/com/corundumstudio/socketio/SingleRoomBroadcastOperations.java
-
25src/main/java/com/corundumstudio/socketio/SocketIOServer.java
-
10src/main/java/com/corundumstudio/socketio/namespace/Namespace.java
@ -0,0 +1,117 @@ |
|||
/** |
|||
* Copyright (c) 2012-2019 Nikita Koksharov |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0 |
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package com.corundumstudio.socketio; |
|||
|
|||
import com.corundumstudio.socketio.protocol.Packet; |
|||
|
|||
import java.util.Collection; |
|||
import java.util.HashSet; |
|||
import java.util.Set; |
|||
|
|||
/** |
|||
* @Author: liangjiaqi |
|||
* @Date: 2020/8/8 6:02 PM |
|||
*/ |
|||
public class MultiRoomBroadcastOperations implements BroadcastOperations { |
|||
|
|||
private Collection<BroadcastOperations> broadcastOperations; |
|||
|
|||
public MultiRoomBroadcastOperations(Collection<BroadcastOperations> broadcastOperations) { |
|||
this.broadcastOperations = broadcastOperations; |
|||
} |
|||
|
|||
@Override |
|||
public Collection<SocketIOClient> getClients() { |
|||
Set<SocketIOClient> clients = new HashSet<SocketIOClient>(); |
|||
if( this.broadcastOperations == null || this.broadcastOperations.size() == 0 ) { |
|||
return clients; |
|||
} |
|||
for( BroadcastOperations b : this.broadcastOperations ) { |
|||
clients.addAll( b.getClients() ); |
|||
} |
|||
return clients; |
|||
} |
|||
|
|||
@Override |
|||
public <T> void send(Packet packet, BroadcastAckCallback<T> ackCallback) { |
|||
if( this.broadcastOperations == null || this.broadcastOperations.size() == 0 ) { |
|||
return; |
|||
} |
|||
for( BroadcastOperations b : this.broadcastOperations ) { |
|||
b.send( packet, ackCallback ); |
|||
} |
|||
} |
|||
|
|||
@Override |
|||
public void sendEvent(String name, SocketIOClient excludedClient, Object... data) { |
|||
if( this.broadcastOperations == null || this.broadcastOperations.size() == 0 ) { |
|||
return; |
|||
} |
|||
for( BroadcastOperations b : this.broadcastOperations ) { |
|||
b.sendEvent( name, excludedClient, data ); |
|||
} |
|||
} |
|||
|
|||
@Override |
|||
public <T> void sendEvent(String name, Object data, BroadcastAckCallback<T> ackCallback) { |
|||
if( this.broadcastOperations == null || this.broadcastOperations.size() == 0 ) { |
|||
return; |
|||
} |
|||
for( BroadcastOperations b : this.broadcastOperations ) { |
|||
b.sendEvent( name, data, ackCallback ); |
|||
} |
|||
} |
|||
|
|||
@Override |
|||
public <T> void sendEvent(String name, Object data, SocketIOClient excludedClient, BroadcastAckCallback<T> ackCallback) { |
|||
if( this.broadcastOperations == null || this.broadcastOperations.size() == 0 ) { |
|||
return; |
|||
} |
|||
for( BroadcastOperations b : this.broadcastOperations ) { |
|||
b.sendEvent( name, data, excludedClient, ackCallback ); |
|||
} |
|||
} |
|||
|
|||
@Override |
|||
public void send(Packet packet) { |
|||
if( this.broadcastOperations == null || this.broadcastOperations.size() == 0 ) { |
|||
return; |
|||
} |
|||
for( BroadcastOperations b : this.broadcastOperations ) { |
|||
b.send( packet ); |
|||
} |
|||
} |
|||
|
|||
@Override |
|||
public void disconnect() { |
|||
if( this.broadcastOperations == null || this.broadcastOperations.size() == 0 ) { |
|||
return; |
|||
} |
|||
for( BroadcastOperations b : this.broadcastOperations ) { |
|||
b.disconnect(); |
|||
} |
|||
} |
|||
|
|||
@Override |
|||
public void sendEvent(String name, Object... data) { |
|||
if( this.broadcastOperations == null || this.broadcastOperations.size() == 0 ) { |
|||
return; |
|||
} |
|||
for( BroadcastOperations b : this.broadcastOperations ) { |
|||
b.sendEvent( name, data ); |
|||
} |
|||
} |
|||
} |
@ -0,0 +1,123 @@ |
|||
/** |
|||
* Copyright (c) 2012-2019 Nikita Koksharov |
|||
* |
|||
* Licensed under the Apache License, Version 2.0 (the "License"); |
|||
* you may not use this file except in compliance with the License. |
|||
* You may obtain a copy of the License at |
|||
* |
|||
* http://www.apache.org/licenses/LICENSE-2.0 |
|||
* |
|||
* Unless required by applicable law or agreed to in writing, software |
|||
* distributed under the License is distributed on an "AS IS" BASIS, |
|||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|||
* See the License for the specific language governing permissions and |
|||
* limitations under the License. |
|||
*/ |
|||
package com.corundumstudio.socketio; |
|||
|
|||
import com.corundumstudio.socketio.misc.IterableCollection; |
|||
import com.corundumstudio.socketio.protocol.Packet; |
|||
import com.corundumstudio.socketio.protocol.PacketType; |
|||
import com.corundumstudio.socketio.store.StoreFactory; |
|||
import com.corundumstudio.socketio.store.pubsub.DispatchMessage; |
|||
import com.corundumstudio.socketio.store.pubsub.PubSubType; |
|||
|
|||
import java.util.Arrays; |
|||
import java.util.Collection; |
|||
|
|||
/** |
|||
* @Author: liangjiaqi |
|||
* @Date: 2020/8/8 6:08 PM |
|||
*/ |
|||
public class SingleRoomBroadcastOperations implements BroadcastOperations { |
|||
private final String namespace; |
|||
private final String room; |
|||
private final Iterable<SocketIOClient> clients; |
|||
private final StoreFactory storeFactory; |
|||
|
|||
public SingleRoomBroadcastOperations(String namespace, String room, Iterable<SocketIOClient> clients, StoreFactory storeFactory) { |
|||
super(); |
|||
this.namespace = namespace; |
|||
this.room = room; |
|||
this.clients = clients; |
|||
this.storeFactory = storeFactory; |
|||
} |
|||
|
|||
private void dispatch(Packet packet) { |
|||
this.storeFactory.pubSubStore().publish( |
|||
PubSubType.DISPATCH, |
|||
new DispatchMessage(this.room, packet, this.namespace)); |
|||
} |
|||
|
|||
@Override |
|||
public Collection<SocketIOClient> getClients() { |
|||
return new IterableCollection<SocketIOClient>(clients); |
|||
} |
|||
|
|||
@Override |
|||
public void send(Packet packet) { |
|||
for (SocketIOClient client : clients) { |
|||
client.send(packet); |
|||
} |
|||
dispatch(packet); |
|||
} |
|||
|
|||
@Override |
|||
public <T> void send(Packet packet, BroadcastAckCallback<T> ackCallback) { |
|||
for (SocketIOClient client : clients) { |
|||
client.send(packet, ackCallback.createClientCallback(client)); |
|||
} |
|||
ackCallback.loopFinished(); |
|||
} |
|||
|
|||
@Override |
|||
public void disconnect() { |
|||
for (SocketIOClient client : clients) { |
|||
client.disconnect(); |
|||
} |
|||
} |
|||
|
|||
@Override |
|||
public void sendEvent(String name, SocketIOClient excludedClient, Object... data) { |
|||
Packet packet = new Packet(PacketType.MESSAGE); |
|||
packet.setSubType(PacketType.EVENT); |
|||
packet.setName(name); |
|||
packet.setData(Arrays.asList(data)); |
|||
|
|||
for (SocketIOClient client : clients) { |
|||
if (client.getSessionId().equals(excludedClient.getSessionId())) { |
|||
continue; |
|||
} |
|||
client.send(packet); |
|||
} |
|||
dispatch(packet); |
|||
} |
|||
|
|||
@Override |
|||
public void sendEvent(String name, Object... data) { |
|||
Packet packet = new Packet(PacketType.MESSAGE); |
|||
packet.setSubType(PacketType.EVENT); |
|||
packet.setName(name); |
|||
packet.setData(Arrays.asList(data)); |
|||
send(packet); |
|||
} |
|||
|
|||
@Override |
|||
public <T> void sendEvent(String name, Object data, BroadcastAckCallback<T> ackCallback) { |
|||
for (SocketIOClient client : clients) { |
|||
client.sendEvent(name, ackCallback.createClientCallback(client), data); |
|||
} |
|||
ackCallback.loopFinished(); |
|||
} |
|||
|
|||
@Override |
|||
public <T> void sendEvent(String name, Object data, SocketIOClient excludedClient, BroadcastAckCallback<T> ackCallback) { |
|||
for (SocketIOClient client : clients) { |
|||
if (client.getSessionId().equals(excludedClient.getSessionId())) { |
|||
continue; |
|||
} |
|||
client.sendEvent(name, ackCallback.createClientCallback(client), data); |
|||
} |
|||
ackCallback.loopFinished(); |
|||
} |
|||
} |
Write
Preview
Loading…
Cancel
Save
Reference in new issue