Browse Source

Ack refactoring

master
Nikita 13 years ago
parent
commit
967e06bdba
  1. 15
      src/main/java/com/corundumstudio/socketio/PacketListener.java
  2. 20
      src/main/java/com/corundumstudio/socketio/handler/PacketHandler.java
  3. 41
      src/main/java/com/corundumstudio/socketio/namespace/Namespace.java
  4. 5
      src/test/java/com/corundumstudio/socketio/PacketHandlerTest.java

15
src/main/java/com/corundumstudio/socketio/PacketListener.java

@ -15,6 +15,8 @@
*/
package com.corundumstudio.socketio;
import java.util.Collections;
import com.corundumstudio.socketio.ack.AckManager;
import com.corundumstudio.socketio.namespace.Namespace;
import com.corundumstudio.socketio.namespace.NamespacesHub;
@ -33,7 +35,9 @@ public class PacketListener {
this.namespacesHub = namespacesHub;
}
public void onPacket(Packet packet, SocketIOClient client, AckRequest ackRequest) {
public void onPacket(Packet packet, NamespaceClient client) {
final AckRequest ackRequest = new AckRequest(packet, client);
if (packet.isAck()) {
ackManager.initAckIndex(client.getSessionId(), packet.getId());
}
@ -60,8 +64,7 @@ public class PacketListener {
}
case HEARTBEAT:
NamespaceClient nc = (NamespaceClient)client;
heartbeatHandler.onHeartbeat(nc.getBaseClient());
heartbeatHandler.onHeartbeat(client.getBaseClient());
break;
case MESSAGE: {
@ -77,9 +80,13 @@ public class PacketListener {
}
case DISCONNECT:
((NamespaceClient)client).onDisconnect();
client.onDisconnect();
break;
}
// send ack response if it not executed
// during {@link DataListener#onData} invocation
ackRequest.sendAckData(Collections.emptyList());
}
}

20
src/main/java/com/corundumstudio/socketio/handler/PacketHandler.java

@ -15,8 +15,6 @@
*/
package com.corundumstudio.socketio.handler;
import java.util.Collections;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.ChannelHandler.Sharable;
import org.jboss.netty.channel.ChannelHandlerContext;
@ -27,14 +25,14 @@ import org.jboss.netty.util.CharsetUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.corundumstudio.socketio.AckRequest;
import com.corundumstudio.socketio.PacketListener;
import com.corundumstudio.socketio.SocketIOClient;
import com.corundumstudio.socketio.messages.PacketsMessage;
import com.corundumstudio.socketio.namespace.Namespace;
import com.corundumstudio.socketio.namespace.NamespacesHub;
import com.corundumstudio.socketio.parser.Decoder;
import com.corundumstudio.socketio.parser.Packet;
import com.corundumstudio.socketio.transport.BaseClient;
import com.corundumstudio.socketio.transport.NamespaceClient;
@Sharable
public class PacketHandler extends SimpleChannelUpstreamHandler {
@ -59,21 +57,17 @@ public class PacketHandler extends SimpleChannelUpstreamHandler {
if (msg instanceof PacketsMessage) {
PacketsMessage message = (PacketsMessage) msg;
ChannelBuffer content = message.getContent();
BaseClient client = message.getClient();
if (log.isTraceEnabled()) {
log.trace("In message: {} sessionId: {}", new Object[] {content.toString(CharsetUtil.UTF_8), message.getClient().getSessionId()});
log.trace("In message: {} sessionId: {}", new Object[] {content.toString(CharsetUtil.UTF_8), client.getSessionId()});
}
while (content.readable()) {
Packet packet = decoder.decodePackets(content, message.getClient().getSessionId());
Packet packet = decoder.decodePackets(content, client.getSessionId());
Namespace ns = namespacesHub.get(packet.getEndpoint());
SocketIOClient client = message.getClient().getChildClient(ns);
AckRequest ackSender = new AckRequest(packet, client);
packetListener.onPacket(packet, client, ackSender);
// send ack response if it not executed
// during {@link DataListener#onData} invocation
ackSender.sendAckData(Collections.emptyList());
NamespaceClient nClient = (NamespaceClient) client.getChildClient(ns);
packetListener.onPacket(packet, nClient);
}
} else {
ctx.sendUpstream(e);

41
src/main/java/com/corundumstudio/socketio/namespace/Namespace.java

@ -29,11 +29,12 @@ import com.corundumstudio.socketio.listener.ConnectListener;
import com.corundumstudio.socketio.listener.DataListener;
import com.corundumstudio.socketio.listener.DisconnectListener;
import com.corundumstudio.socketio.parser.JsonSupport;
import com.corundumstudio.socketio.transport.NamespaceClient;
import com.corundumstudio.socketio.utils.ConcurrentHashSet;
/**
* Hub object for all clients in one namespace.
* Namespace shares by different network-clients.
* Namespace shares by different namespace-clients.
*
* @see com.corundumstudio.socketio.transport.NamespaceClient
*/
@ -82,18 +83,6 @@ public class Namespace implements SocketIONamespace {
jsonSupport.addEventMapping(eventName, eventClass);
}
@SuppressWarnings({"rawtypes", "unchecked"})
public void onEvent(SocketIOClient client, String eventName, Object data, AckRequest ackRequest) {
EventEntry entry = eventListeners.get(eventName);
if (entry == null) {
return;
}
Queue<DataListener> listeners = entry.getListeners();
for (DataListener dataListener : listeners) {
dataListener.onData(client, data, ackRequest);
}
}
@Override
public <T> void addJsonObjectListener(Class<T> clazz, DataListener<T> listener) {
Queue<DataListener<?>> queue = jsonObjectListeners.get(clazz);
@ -108,7 +97,25 @@ public class Namespace implements SocketIONamespace {
jsonSupport.addJsonClass(clazz);
}
public void onJsonObject(SocketIOClient client, Object data, AckRequest ackRequest) {
@SuppressWarnings({"rawtypes", "unchecked"})
public void onEvent(NamespaceClient client, String eventName, Object data, AckRequest ackRequest) {
EventEntry entry = eventListeners.get(eventName);
if (entry == null) {
return;
}
Queue<DataListener> listeners = entry.getListeners();
for (DataListener dataListener : listeners) {
dataListener.onData(client, data, ackRequest);
}
}
public void onMessage(NamespaceClient client, String data, AckRequest ackRequest) {
for (DataListener<String> listener : messageListeners) {
listener.onData(client, data, ackRequest);
}
}
public void onJsonObject(NamespaceClient client, Object data, AckRequest ackRequest) {
Queue<DataListener<?>> queue = jsonObjectListeners.get(data.getClass());
if (queue == null) {
return;
@ -150,12 +157,6 @@ public class Namespace implements SocketIONamespace {
return messageListeners;
}
public void onMessage(SocketIOClient client, String data, AckRequest ackRequest) {
for (DataListener<String> listener : messageListeners) {
listener.onData(client, data, ackRequest);
}
}
@Override
public BroadcastOperations getBroadcastOperations() {
return new BroadcastOperations(clients);

5
src/test/java/com/corundumstudio/socketio/PacketHandlerTest.java

@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import junit.framework.Assert;
import mockit.Mocked;
import com.corundumstudio.socketio.transport.NamespaceClient;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
@ -68,7 +69,7 @@ public class PacketHandlerTest {
private PacketListener createTestListener(final List<Packet> packets) {
PacketListener listener = new PacketListener(null, null, null) {
@Override
public void onPacket(Packet packet, SocketIOClient client, AckRequest ackRequest) {
public void onPacket(Packet packet, NamespaceClient client) {
int index = invocations.incrementAndGet();
Packet currentPacket = packets.get(index-1);
Assert.assertEquals(currentPacket.getType(), packet.getType());
@ -139,7 +140,7 @@ public class PacketHandlerTest {
public void testDecodePerf() throws Exception {
PacketListener listener = new PacketListener(null, null, null) {
@Override
public void onPacket(Packet packet, SocketIOClient client, AckRequest ackRequest) {
public void onPacket(Packet packet, NamespaceClient client) {
}
};
PacketHandler handler = new PacketHandler(listener, decoder, namespacesHub);

Loading…
Cancel
Save