Browse Source

Issue #17 fixed

master
Nikita 13 years ago
parent
commit
7f4cf3d05e
  1. 12
      README.md
  2. 92
      src/main/java/com/corundumstudio/socketio/BroadcastOperations.java
  3. 94
      src/main/java/com/corundumstudio/socketio/ClientOperations.java
  4. 56
      src/main/java/com/corundumstudio/socketio/JoinIterator.java
  5. 71
      src/main/java/com/corundumstudio/socketio/SocketIOClient.java
  6. 13
      src/main/java/com/corundumstudio/socketio/SocketIOPipelineFactory.java
  7. 10
      src/main/java/com/corundumstudio/socketio/SocketIOServer.java
  8. 7
      src/main/java/com/corundumstudio/socketio/transport/WebSocketTransport.java
  9. 17
      src/main/java/com/corundumstudio/socketio/transport/XHRPollingTransport.java
  10. 48
      src/test/java/com/corundumstudio/socketio/JoinIteratorsTest.java

12
README.md

@ -11,6 +11,8 @@ Licensed under the Apache License 2.0.
* Supports xhr-polling transport
* Supports websocket transport (Hixie-75/76/Hybi-00, Hybi-10..Hybi-13)
#Usage example
##Server
@ -18,14 +20,12 @@ Licensed under the Apache License 2.0.
SocketIOListener handler = new SocketIOListener() {
@Override
public void onEvent(SocketIOClient client, Packet packet) {
public void onEvent(SocketIOClient client, String name, Object data) {
...
}
@Override
public void onMessage(SocketIOClient client, Packet packet) {
// get a message
packet.getData().toString();
public void onMessage(SocketIOClient client, String message) {
...
}
@ -40,9 +40,7 @@ Licensed under the Apache License 2.0.
}
@Override
public void onJsonObject(SocketIOClient client, Packet packet) {
// get a json object
packet.getData();
public void onJsonObject(SocketIOClient client, Objet data) {
...
SampleObject obj = new SampleObject();

92
src/main/java/com/corundumstudio/socketio/BroadcastOperations.java

@ -0,0 +1,92 @@
/**
* Copyright 2012 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.corundumstudio.socketio;
import com.corundumstudio.socketio.parser.Packet;
public class BroadcastOperations implements ClientOperations {
private final Iterable<SocketIOClient> clients;
public BroadcastOperations(Iterable<SocketIOClient> clients) {
super();
this.clients = clients;
}
@Override
public void sendMessage(String message) {
for (SocketIOClient client : clients) {
client.sendMessage(message);
}
}
@Override
public void sendMessage(String message, AckCallback ackCallback) {
for (SocketIOClient client : clients) {
client.sendMessage(message, ackCallback);
}
}
@Override
public void sendJsonObject(Object object) {
for (SocketIOClient client : clients) {
client.sendJsonObject(object);
}
}
@Override
public void sendJsonObject(Object object, AckCallback ackCallback) {
for (SocketIOClient client : clients) {
client.sendJsonObject(object, ackCallback);
}
}
@Override
public void send(Packet packet) {
for (SocketIOClient client : clients) {
client.send(packet);
}
}
@Override
public void send(Packet packet, AckCallback ackCallback) {
for (SocketIOClient client : clients) {
client.send(packet, ackCallback);
}
}
@Override
public void disconnect() {
for (SocketIOClient client : clients) {
client.disconnect();
}
}
@Override
public void sendEvent(String name, Object data) {
for (SocketIOClient client : clients) {
client.sendEvent(name, data);
}
}
@Override
public void sendEvent(String name, Object data, AckCallback ackCallback) {
for (SocketIOClient client : clients) {
client.sendEvent(name, data, ackCallback);
}
}
}

94
src/main/java/com/corundumstudio/socketio/ClientOperations.java

@ -0,0 +1,94 @@
/**
* Copyright 2012 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.corundumstudio.socketio;
import com.corundumstudio.socketio.parser.Packet;
/**
* Available client operations
*
*/
public interface ClientOperations {
/**
* Send message
*
* @param message - message to send
*/
void sendMessage(String message);
/**
* Send message with ack callback
*
* @param message - message to send
* @param ackCallback - ack callback
*/
void sendMessage(String message, AckCallback ackCallback);
/**
* Send object. Object will be encoded to json-format.
*
* @param object - object to send
*/
void sendJsonObject(Object object);
/**
* Send object with ack callback
*
* @param object - object to send
* @param ackCallback - ack callback
*/
void sendJsonObject(Object object, AckCallback ackCallback);
/**
* Send packet
*
* @param packet - packet to send
*/
void send(Packet packet);
/**
* Send packet with ack callback
*
* @param packet - packet to send
* @param ackCallback - ack callback
*/
void send(Packet packet, AckCallback ackCallback);
/**
* Disconnect client
*
*/
void disconnect();
/**
* Send event
*
* @param name - event name
* @param data - event data
*/
void sendEvent(String name, Object data);
/**
* Send event with ack callback
*
* @param name - event name
* @param data - event data
* @param ackCallback - ack callback
*/
void sendEvent(String name, Object data, AckCallback ackCallback);
}

56
src/main/java/com/corundumstudio/socketio/JoinIterator.java

@ -0,0 +1,56 @@
/**
* Copyright 2012 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.corundumstudio.socketio;
import java.util.Arrays;
import java.util.Iterator;
public class JoinIterator<T> implements Iterator<T> {
private final Iterator<Iterator<T>> listIterator;
private Iterator<T> currentIterator;
public JoinIterator(Iterator<T> ... iterators) {
listIterator = Arrays.asList(iterators).iterator();
}
@Override
public boolean hasNext() {
if (currentIterator == null || !currentIterator.hasNext()) {
while (listIterator.hasNext()) {
Iterator<T> iterator = listIterator.next();
if (iterator.hasNext()) {
currentIterator = iterator;
return true;
}
}
return false;
}
return currentIterator.hasNext();
}
@Override
public T next() {
return currentIterator.next();
}
@Override
public void remove() {
currentIterator.remove();
}
}

71
src/main/java/com/corundumstudio/socketio/SocketIOClient.java

@ -18,9 +18,7 @@ package com.corundumstudio.socketio;
import java.net.SocketAddress;
import java.util.UUID;
import com.corundumstudio.socketio.parser.Packet;
public interface SocketIOClient {
public interface SocketIOClient extends ClientOperations {
/**
* Client session id, uses {@link UUID} object
@ -30,73 +28,10 @@ public interface SocketIOClient {
UUID getSessionId();
/**
* Send message
*
* @param message - message to send
*/
void sendMessage(String message);
/**
* Send message with ack callback
*
* @param message - message to send
* @param ackCallback - ack callback
*/
void sendMessage(String message, AckCallback ackCallback);
/**
* Send object. Object will be encoded to json-format.
*
* @param object - object to send
*/
void sendJsonObject(Object object);
/**
* Send object with ack callback
*
* @param object - object to send
* @param ackCallback - ack callback
*/
void sendJsonObject(Object object, AckCallback ackCallback);
/**
* Send packet
* Get client remote address
*
* @param packet - packet to send
* @return remote address
*/
void send(Packet packet);
/**
* Send packet with ack callback
*
* @param packet - packet to send
* @param ackCallback - ack callback
*/
void send(Packet packet, AckCallback ackCallback);
/**
* Disconnect client
*
*/
void disconnect();
/**
* Send event
*
* @param name - event name
* @param data - event data
*/
void sendEvent(String name, Object data);
/**
* Send event with ack callback
*
* @param name - event name
* @param data - event data
* @param ackCallback - ack callback
*/
void sendEvent(String name, Object data, AckCallback ackCallback);
SocketAddress getRemoteAddress();
}

13
src/main/java/com/corundumstudio/socketio/SocketIOPipelineFactory.java

@ -17,6 +17,8 @@ package com.corundumstudio.socketio;
import static org.jboss.netty.channel.Channels.pipeline;
import java.util.Iterator;
import org.codehaus.jackson.map.ObjectMapper;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
@ -81,6 +83,17 @@ public class SocketIOPipelineFactory implements ChannelPipelineFactory, Disconne
socketIOEncoder = new SocketIOEncoder(objectMapper, encoder);
}
public Iterable<SocketIOClient> getAllClients() {
return new Iterable<SocketIOClient>() {
@Override
public Iterator<SocketIOClient> iterator() {
Iterator<SocketIOClient> xhrClients = xhrPollingTransport.getAllClients().iterator();
Iterator<SocketIOClient> webSocketClients = webSocketTransport.getAllClients().iterator();
return new JoinIterator<SocketIOClient>(xhrClients, webSocketClients);
}
};
}
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = pipeline();

10
src/main/java/com/corundumstudio/socketio/SocketIOServer.java

@ -35,6 +35,7 @@ public class SocketIOServer {
private Channel mainChannel;
private Configuration config;
private boolean started;
public SocketIOServer(Configuration configuration) {
this.config = new Configuration(configuration);
@ -45,6 +46,13 @@ public class SocketIOServer {
this.pipelineFactory = pipelineFactory;
}
public ClientOperations getBroadcastOperations() {
if (!started) {
throw new IllegalStateException("Server have not started!");
}
return new BroadcastOperations(pipelineFactory.getAllClients());
}
public void start() {
ChannelFactory factory = new NioServerSocketChannelFactory(config.getBossExecutor(), config.getWorkerExecutor());
bootstrap = new ServerBootstrap(factory);
@ -55,6 +63,7 @@ public class SocketIOServer {
bootstrap.setOption("child.keepAlive", true);
mainChannel = bootstrap.bind(new InetSocketAddress(config.getHostname(), config.getPort()));
started = true;
log.info("SocketIO server started at port: {}", config.getPort());
}
@ -62,6 +71,7 @@ public class SocketIOServer {
pipelineFactory.stop();
mainChannel.close();
bootstrap.releaseExternalResources();
started = false;
}
}

7
src/main/java/com/corundumstudio/socketio/transport/WebSocketTransport.java

@ -16,6 +16,7 @@
package com.corundumstudio.socketio.transport;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
@ -50,7 +51,7 @@ public class WebSocketTransport extends SimpleChannelUpstreamHandler implements
private final Logger log = LoggerFactory.getLogger(getClass());
private final Map<UUID, WebSocketClient> sessionId2Client = new ConcurrentHashMap<UUID, WebSocketClient>();
private final Map<UUID, SocketIOClient> sessionId2Client = new ConcurrentHashMap<UUID, SocketIOClient>();
private final Map<Integer, WebSocketClient> channelId2Client = new ConcurrentHashMap<Integer, WebSocketClient>();
private final AckManager ackManager;
@ -159,4 +160,8 @@ public class WebSocketTransport extends SimpleChannelUpstreamHandler implements
}
}
public Collection<SocketIOClient> getAllClients() {
return sessionId2Client.values();
}
}

17
src/main/java/com/corundumstudio/socketio/transport/XHRPollingTransport.java

@ -16,6 +16,7 @@
package com.corundumstudio.socketio.transport;
import java.io.IOException;
import java.util.Collection;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
@ -57,7 +58,7 @@ public class XHRPollingTransport extends SimpleChannelUpstreamHandler implements
private final Logger log = LoggerFactory.getLogger(getClass());
private final Map<UUID, XHRPollingClient> sessionId2Client = new ConcurrentHashMap<UUID, XHRPollingClient>();
private final Map<UUID, SocketIOClient> sessionId2Client = new ConcurrentHashMap<UUID, SocketIOClient>();
private final CancelableScheduler scheduler;
private final AckManager ackManager;
@ -94,7 +95,7 @@ public class XHRPollingTransport extends SimpleChannelUpstreamHandler implements
onGet(sessionId, channel, req);
}
if (queryDecoder.getParameters().containsKey("disconnect")) {
XHRPollingClient client = sessionId2Client.get(sessionId);
SocketIOClient client = sessionId2Client.get(sessionId);
disconnectable.onDisconnect(client);
}
} else {
@ -114,7 +115,7 @@ public class XHRPollingTransport extends SimpleChannelUpstreamHandler implements
scheduler.schedule(key, new Runnable() {
@Override
public void run() {
XHRPollingClient client = sessionId2Client.get(sessionId);
SocketIOClient client = sessionId2Client.get(sessionId);
if (client != null) {
client.send(new Packet(PacketType.NOOP));
}
@ -132,7 +133,7 @@ public class XHRPollingTransport extends SimpleChannelUpstreamHandler implements
scheduler.schedule(key, new Runnable() {
@Override
public void run() {
XHRPollingClient client = sessionId2Client.get(sessionId);
SocketIOClient client = sessionId2Client.get(sessionId);
if (client != null) {
disconnectable.onDisconnect(client);
log.debug("Client: {} disconnected due to connection timeout", sessionId);
@ -144,7 +145,7 @@ public class XHRPollingTransport extends SimpleChannelUpstreamHandler implements
}
private void onPost(UUID sessionId, Channel channel, HttpRequest req) throws IOException {
XHRPollingClient client = sessionId2Client.get(sessionId);
SocketIOClient client = sessionId2Client.get(sessionId);
if (client == null) {
log.debug("Client with sessionId: {} was already disconnected. Channel closed!", sessionId);
channel.close();
@ -163,7 +164,7 @@ public class XHRPollingTransport extends SimpleChannelUpstreamHandler implements
}
String origin = req.getHeader(HttpHeaders.Names.ORIGIN);
XHRPollingClient client = sessionId2Client.get(sessionId);
XHRPollingClient client = (XHRPollingClient)sessionId2Client.get(sessionId);
if (client == null) {
client = createClient(origin, channel, sessionId);
}
@ -206,4 +207,8 @@ public class XHRPollingTransport extends SimpleChannelUpstreamHandler implements
}
}
public Collection<SocketIOClient> getAllClients() {
return sessionId2Client.values();
}
}

48
src/test/java/com/corundumstudio/socketio/JoinIteratorsTest.java

@ -0,0 +1,48 @@
/**
* Copyright 2012 Nikita Koksharov
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.corundumstudio.socketio;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import org.junit.Assert;
import org.junit.Test;
public class JoinIteratorsTest {
@Test
public void testIterator() {
List<Integer> list1 = Arrays.asList(1, 2);
List<Integer> list2 = Arrays.asList(3, 4);
Iterator<Integer> i = list1.iterator();
Iterator<Integer> i2 = list1.iterator();
JoinIterator<Integer> iterators = new JoinIterator<Integer>(i, i2);
List<Integer> mainList = new ArrayList<Integer>();
for (; iterators.hasNext();) {
Integer integer = iterators.next();
mainList.add(integer);
}
Assert.assertEquals(list1.size() + list2.size(), mainList.size());
mainList.removeAll(list1);
mainList.removeAll(list2);
Assert.assertTrue(mainList.isEmpty());
}
}
Loading…
Cancel
Save