From 7f4cf3d05e23fc268d104843755c6f311da28e99 Mon Sep 17 00:00:00 2001 From: Nikita Date: Sat, 26 May 2012 13:29:01 +0400 Subject: [PATCH] Issue #17 fixed --- README.md | 12 +-- .../socketio/BroadcastOperations.java | 92 ++++++++++++++++++ .../socketio/ClientOperations.java | 94 +++++++++++++++++++ .../corundumstudio/socketio/JoinIterator.java | 56 +++++++++++ .../socketio/SocketIOClient.java | 71 +------------- .../socketio/SocketIOPipelineFactory.java | 13 +++ .../socketio/SocketIOServer.java | 10 ++ .../transport/WebSocketTransport.java | 7 +- .../transport/XHRPollingTransport.java | 17 ++-- .../socketio/JoinIteratorsTest.java | 48 ++++++++++ 10 files changed, 338 insertions(+), 82 deletions(-) create mode 100644 src/main/java/com/corundumstudio/socketio/BroadcastOperations.java create mode 100644 src/main/java/com/corundumstudio/socketio/ClientOperations.java create mode 100644 src/main/java/com/corundumstudio/socketio/JoinIterator.java create mode 100644 src/test/java/com/corundumstudio/socketio/JoinIteratorsTest.java diff --git a/README.md b/README.md index 02794e2..637b96f 100644 --- a/README.md +++ b/README.md @@ -11,6 +11,8 @@ Licensed under the Apache License 2.0. * Supports xhr-polling transport * Supports websocket transport (Hixie-75/76/Hybi-00, Hybi-10..Hybi-13) + + #Usage example ##Server @@ -18,14 +20,12 @@ Licensed under the Apache License 2.0. SocketIOListener handler = new SocketIOListener() { @Override - public void onEvent(SocketIOClient client, Packet packet) { + public void onEvent(SocketIOClient client, String name, Object data) { ... } @Override - public void onMessage(SocketIOClient client, Packet packet) { - // get a message - packet.getData().toString(); + public void onMessage(SocketIOClient client, String message) { ... } @@ -40,9 +40,7 @@ Licensed under the Apache License 2.0. } @Override - public void onJsonObject(SocketIOClient client, Packet packet) { - // get a json object - packet.getData(); + public void onJsonObject(SocketIOClient client, Objet data) { ... SampleObject obj = new SampleObject(); diff --git a/src/main/java/com/corundumstudio/socketio/BroadcastOperations.java b/src/main/java/com/corundumstudio/socketio/BroadcastOperations.java new file mode 100644 index 0000000..f0189f6 --- /dev/null +++ b/src/main/java/com/corundumstudio/socketio/BroadcastOperations.java @@ -0,0 +1,92 @@ +/** + * Copyright 2012 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.corundumstudio.socketio; + +import com.corundumstudio.socketio.parser.Packet; + +public class BroadcastOperations implements ClientOperations { + + private final Iterable clients; + + public BroadcastOperations(Iterable clients) { + super(); + this.clients = clients; + } + + @Override + public void sendMessage(String message) { + for (SocketIOClient client : clients) { + client.sendMessage(message); + } + } + + @Override + public void sendMessage(String message, AckCallback ackCallback) { + for (SocketIOClient client : clients) { + client.sendMessage(message, ackCallback); + } + } + + @Override + public void sendJsonObject(Object object) { + for (SocketIOClient client : clients) { + client.sendJsonObject(object); + } + } + + @Override + public void sendJsonObject(Object object, AckCallback ackCallback) { + for (SocketIOClient client : clients) { + client.sendJsonObject(object, ackCallback); + } + } + + @Override + public void send(Packet packet) { + for (SocketIOClient client : clients) { + client.send(packet); + } + } + + @Override + public void send(Packet packet, AckCallback ackCallback) { + for (SocketIOClient client : clients) { + client.send(packet, ackCallback); + } + } + + @Override + public void disconnect() { + for (SocketIOClient client : clients) { + client.disconnect(); + } + } + + @Override + public void sendEvent(String name, Object data) { + for (SocketIOClient client : clients) { + client.sendEvent(name, data); + } + } + + @Override + public void sendEvent(String name, Object data, AckCallback ackCallback) { + for (SocketIOClient client : clients) { + client.sendEvent(name, data, ackCallback); + } + } + +} diff --git a/src/main/java/com/corundumstudio/socketio/ClientOperations.java b/src/main/java/com/corundumstudio/socketio/ClientOperations.java new file mode 100644 index 0000000..e3143d9 --- /dev/null +++ b/src/main/java/com/corundumstudio/socketio/ClientOperations.java @@ -0,0 +1,94 @@ +/** + * Copyright 2012 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.corundumstudio.socketio; + +import com.corundumstudio.socketio.parser.Packet; + +/** + * Available client operations + * + */ +public interface ClientOperations { + + /** + * Send message + * + * @param message - message to send + */ + void sendMessage(String message); + + /** + * Send message with ack callback + * + * @param message - message to send + * @param ackCallback - ack callback + */ + void sendMessage(String message, AckCallback ackCallback); + + /** + * Send object. Object will be encoded to json-format. + * + * @param object - object to send + */ + void sendJsonObject(Object object); + + /** + * Send object with ack callback + * + * @param object - object to send + * @param ackCallback - ack callback + */ + void sendJsonObject(Object object, AckCallback ackCallback); + + /** + * Send packet + * + * @param packet - packet to send + */ + void send(Packet packet); + + /** + * Send packet with ack callback + * + * @param packet - packet to send + * @param ackCallback - ack callback + */ + void send(Packet packet, AckCallback ackCallback); + + /** + * Disconnect client + * + */ + void disconnect(); + + /** + * Send event + * + * @param name - event name + * @param data - event data + */ + void sendEvent(String name, Object data); + + /** + * Send event with ack callback + * + * @param name - event name + * @param data - event data + * @param ackCallback - ack callback + */ + void sendEvent(String name, Object data, AckCallback ackCallback); + +} diff --git a/src/main/java/com/corundumstudio/socketio/JoinIterator.java b/src/main/java/com/corundumstudio/socketio/JoinIterator.java new file mode 100644 index 0000000..5121d76 --- /dev/null +++ b/src/main/java/com/corundumstudio/socketio/JoinIterator.java @@ -0,0 +1,56 @@ +/** + * Copyright 2012 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.corundumstudio.socketio; + +import java.util.Arrays; +import java.util.Iterator; + +public class JoinIterator implements Iterator { + + private final Iterator> listIterator; + private Iterator currentIterator; + + public JoinIterator(Iterator ... iterators) { + listIterator = Arrays.asList(iterators).iterator(); + + } + + @Override + public boolean hasNext() { + if (currentIterator == null || !currentIterator.hasNext()) { + while (listIterator.hasNext()) { + Iterator iterator = listIterator.next(); + if (iterator.hasNext()) { + currentIterator = iterator; + return true; + } + } + return false; + } + return currentIterator.hasNext(); + } + + @Override + public T next() { + return currentIterator.next(); + } + + @Override + public void remove() { + currentIterator.remove(); + } + +} diff --git a/src/main/java/com/corundumstudio/socketio/SocketIOClient.java b/src/main/java/com/corundumstudio/socketio/SocketIOClient.java index f4259db..fabb13f 100644 --- a/src/main/java/com/corundumstudio/socketio/SocketIOClient.java +++ b/src/main/java/com/corundumstudio/socketio/SocketIOClient.java @@ -18,9 +18,7 @@ package com.corundumstudio.socketio; import java.net.SocketAddress; import java.util.UUID; -import com.corundumstudio.socketio.parser.Packet; - -public interface SocketIOClient { +public interface SocketIOClient extends ClientOperations { /** * Client session id, uses {@link UUID} object @@ -30,73 +28,10 @@ public interface SocketIOClient { UUID getSessionId(); /** - * Send message - * - * @param message - message to send - */ - void sendMessage(String message); - - /** - * Send message with ack callback - * - * @param message - message to send - * @param ackCallback - ack callback - */ - void sendMessage(String message, AckCallback ackCallback); - - /** - * Send object. Object will be encoded to json-format. - * - * @param object - object to send - */ - void sendJsonObject(Object object); - - /** - * Send object with ack callback - * - * @param object - object to send - * @param ackCallback - ack callback - */ - void sendJsonObject(Object object, AckCallback ackCallback); - - /** - * Send packet + * Get client remote address * - * @param packet - packet to send + * @return remote address */ - void send(Packet packet); - - /** - * Send packet with ack callback - * - * @param packet - packet to send - * @param ackCallback - ack callback - */ - void send(Packet packet, AckCallback ackCallback); - - /** - * Disconnect client - * - */ - void disconnect(); - - /** - * Send event - * - * @param name - event name - * @param data - event data - */ - void sendEvent(String name, Object data); - - /** - * Send event with ack callback - * - * @param name - event name - * @param data - event data - * @param ackCallback - ack callback - */ - void sendEvent(String name, Object data, AckCallback ackCallback); - SocketAddress getRemoteAddress(); } diff --git a/src/main/java/com/corundumstudio/socketio/SocketIOPipelineFactory.java b/src/main/java/com/corundumstudio/socketio/SocketIOPipelineFactory.java index f7ec03c..7805f2e 100644 --- a/src/main/java/com/corundumstudio/socketio/SocketIOPipelineFactory.java +++ b/src/main/java/com/corundumstudio/socketio/SocketIOPipelineFactory.java @@ -17,6 +17,8 @@ package com.corundumstudio.socketio; import static org.jboss.netty.channel.Channels.pipeline; +import java.util.Iterator; + import org.codehaus.jackson.map.ObjectMapper; import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipelineFactory; @@ -81,6 +83,17 @@ public class SocketIOPipelineFactory implements ChannelPipelineFactory, Disconne socketIOEncoder = new SocketIOEncoder(objectMapper, encoder); } + public Iterable getAllClients() { + return new Iterable() { + @Override + public Iterator iterator() { + Iterator xhrClients = xhrPollingTransport.getAllClients().iterator(); + Iterator webSocketClients = webSocketTransport.getAllClients().iterator(); + return new JoinIterator(xhrClients, webSocketClients); + } + }; + } + public ChannelPipeline getPipeline() throws Exception { ChannelPipeline pipeline = pipeline(); diff --git a/src/main/java/com/corundumstudio/socketio/SocketIOServer.java b/src/main/java/com/corundumstudio/socketio/SocketIOServer.java index d335c2a..692424d 100644 --- a/src/main/java/com/corundumstudio/socketio/SocketIOServer.java +++ b/src/main/java/com/corundumstudio/socketio/SocketIOServer.java @@ -35,6 +35,7 @@ public class SocketIOServer { private Channel mainChannel; private Configuration config; + private boolean started; public SocketIOServer(Configuration configuration) { this.config = new Configuration(configuration); @@ -45,6 +46,13 @@ public class SocketIOServer { this.pipelineFactory = pipelineFactory; } + public ClientOperations getBroadcastOperations() { + if (!started) { + throw new IllegalStateException("Server have not started!"); + } + return new BroadcastOperations(pipelineFactory.getAllClients()); + } + public void start() { ChannelFactory factory = new NioServerSocketChannelFactory(config.getBossExecutor(), config.getWorkerExecutor()); bootstrap = new ServerBootstrap(factory); @@ -55,6 +63,7 @@ public class SocketIOServer { bootstrap.setOption("child.keepAlive", true); mainChannel = bootstrap.bind(new InetSocketAddress(config.getHostname(), config.getPort())); + started = true; log.info("SocketIO server started at port: {}", config.getPort()); } @@ -62,6 +71,7 @@ public class SocketIOServer { pipelineFactory.stop(); mainChannel.close(); bootstrap.releaseExternalResources(); + started = false; } } diff --git a/src/main/java/com/corundumstudio/socketio/transport/WebSocketTransport.java b/src/main/java/com/corundumstudio/socketio/transport/WebSocketTransport.java index c231254..dc8352d 100644 --- a/src/main/java/com/corundumstudio/socketio/transport/WebSocketTransport.java +++ b/src/main/java/com/corundumstudio/socketio/transport/WebSocketTransport.java @@ -16,6 +16,7 @@ package com.corundumstudio.socketio.transport; import java.io.IOException; +import java.util.Collection; import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; @@ -50,7 +51,7 @@ public class WebSocketTransport extends SimpleChannelUpstreamHandler implements private final Logger log = LoggerFactory.getLogger(getClass()); - private final Map sessionId2Client = new ConcurrentHashMap(); + private final Map sessionId2Client = new ConcurrentHashMap(); private final Map channelId2Client = new ConcurrentHashMap(); private final AckManager ackManager; @@ -159,4 +160,8 @@ public class WebSocketTransport extends SimpleChannelUpstreamHandler implements } } + public Collection getAllClients() { + return sessionId2Client.values(); + } + } diff --git a/src/main/java/com/corundumstudio/socketio/transport/XHRPollingTransport.java b/src/main/java/com/corundumstudio/socketio/transport/XHRPollingTransport.java index 28d22d1..df9d3c4 100644 --- a/src/main/java/com/corundumstudio/socketio/transport/XHRPollingTransport.java +++ b/src/main/java/com/corundumstudio/socketio/transport/XHRPollingTransport.java @@ -16,6 +16,7 @@ package com.corundumstudio.socketio.transport; import java.io.IOException; +import java.util.Collection; import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; @@ -57,7 +58,7 @@ public class XHRPollingTransport extends SimpleChannelUpstreamHandler implements private final Logger log = LoggerFactory.getLogger(getClass()); - private final Map sessionId2Client = new ConcurrentHashMap(); + private final Map sessionId2Client = new ConcurrentHashMap(); private final CancelableScheduler scheduler; private final AckManager ackManager; @@ -94,7 +95,7 @@ public class XHRPollingTransport extends SimpleChannelUpstreamHandler implements onGet(sessionId, channel, req); } if (queryDecoder.getParameters().containsKey("disconnect")) { - XHRPollingClient client = sessionId2Client.get(sessionId); + SocketIOClient client = sessionId2Client.get(sessionId); disconnectable.onDisconnect(client); } } else { @@ -114,7 +115,7 @@ public class XHRPollingTransport extends SimpleChannelUpstreamHandler implements scheduler.schedule(key, new Runnable() { @Override public void run() { - XHRPollingClient client = sessionId2Client.get(sessionId); + SocketIOClient client = sessionId2Client.get(sessionId); if (client != null) { client.send(new Packet(PacketType.NOOP)); } @@ -132,7 +133,7 @@ public class XHRPollingTransport extends SimpleChannelUpstreamHandler implements scheduler.schedule(key, new Runnable() { @Override public void run() { - XHRPollingClient client = sessionId2Client.get(sessionId); + SocketIOClient client = sessionId2Client.get(sessionId); if (client != null) { disconnectable.onDisconnect(client); log.debug("Client: {} disconnected due to connection timeout", sessionId); @@ -144,7 +145,7 @@ public class XHRPollingTransport extends SimpleChannelUpstreamHandler implements } private void onPost(UUID sessionId, Channel channel, HttpRequest req) throws IOException { - XHRPollingClient client = sessionId2Client.get(sessionId); + SocketIOClient client = sessionId2Client.get(sessionId); if (client == null) { log.debug("Client with sessionId: {} was already disconnected. Channel closed!", sessionId); channel.close(); @@ -163,7 +164,7 @@ public class XHRPollingTransport extends SimpleChannelUpstreamHandler implements } String origin = req.getHeader(HttpHeaders.Names.ORIGIN); - XHRPollingClient client = sessionId2Client.get(sessionId); + XHRPollingClient client = (XHRPollingClient)sessionId2Client.get(sessionId); if (client == null) { client = createClient(origin, channel, sessionId); } @@ -206,4 +207,8 @@ public class XHRPollingTransport extends SimpleChannelUpstreamHandler implements } } + public Collection getAllClients() { + return sessionId2Client.values(); + } + } diff --git a/src/test/java/com/corundumstudio/socketio/JoinIteratorsTest.java b/src/test/java/com/corundumstudio/socketio/JoinIteratorsTest.java new file mode 100644 index 0000000..aa7d40a --- /dev/null +++ b/src/test/java/com/corundumstudio/socketio/JoinIteratorsTest.java @@ -0,0 +1,48 @@ +/** + * Copyright 2012 Nikita Koksharov + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.corundumstudio.socketio; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; + +import org.junit.Assert; +import org.junit.Test; + +public class JoinIteratorsTest { + + @Test + public void testIterator() { + List list1 = Arrays.asList(1, 2); + List list2 = Arrays.asList(3, 4); + Iterator i = list1.iterator(); + Iterator i2 = list1.iterator(); + JoinIterator iterators = new JoinIterator(i, i2); + + List mainList = new ArrayList(); + for (; iterators.hasNext();) { + Integer integer = iterators.next(); + mainList.add(integer); + } + Assert.assertEquals(list1.size() + list2.size(), mainList.size()); + mainList.removeAll(list1); + mainList.removeAll(list2); + Assert.assertTrue(mainList.isEmpty()); + + } + +}