Browse Source

BinaryEvent parsing added. #175

master
Nikita 11 years ago
parent
commit
69f66e4142
  1. 9
      src/main/java/com/corundumstudio/socketio/handler/ClientHead.java
  2. 8
      src/main/java/com/corundumstudio/socketio/handler/InPacketHandler.java
  3. 5
      src/main/java/com/corundumstudio/socketio/handler/PacketListener.java
  4. 34
      src/main/java/com/corundumstudio/socketio/protocol/Packet.java
  5. 89
      src/main/java/com/corundumstudio/socketio/protocol/PacketDecoder.java
  6. 12
      src/main/java/com/corundumstudio/socketio/protocol/PacketEncoder.java

9
src/main/java/com/corundumstudio/socketio/handler/ClientHead.java

@ -71,6 +71,8 @@ public class ClientHead {
private final CancelableScheduler disconnectScheduler;
private final Configuration configuration;
private Packet lastBinaryPacket;
// TODO use lazy set
private volatile Transport currentTransport;
@ -258,4 +260,11 @@ public class ClientHead {
return channels.get(transport).getPacketsQueue();
}
public void setLastBinaryPacket(Packet lastBinaryPacket) {
this.lastBinaryPacket = lastBinaryPacket;
}
public Packet getLastBinaryPacket() {
return lastBinaryPacket;
}
}

8
src/main/java/com/corundumstudio/socketio/handler/InPacketHandler.java

@ -16,9 +16,12 @@
package com.corundumstudio.socketio.handler;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.base64.Base64;
import io.netty.util.CharsetUtil;
import org.slf4j.Logger;
@ -62,7 +65,10 @@ public class InPacketHandler extends SimpleChannelInboundHandler<PacketsMessage>
}
while (content.isReadable()) {
try {
Packet packet = decoder.decodePackets(content, client.getSessionId());
Packet packet = decoder.decodePackets(content, client);
if (packet.hasAttachments() && !packet.isAttachmentsLoaded()) {
return;
}
Namespace ns = namespacesHub.get(packet.getNsp());
if (ns == null) {
log.debug("Can't find namespace for endpoint: {}, sessionId: {} probably it was removed.", packet.getNsp(), client.getSessionId());

5
src/main/java/com/corundumstudio/socketio/handler/PacketListener.java

@ -81,7 +81,7 @@ public class PacketListener {
if (packet.getSubType() == PacketType.DISCONNECT) {
client.onDisconnect();
}
if (packet.getSubType() == PacketType.CONNECT) {
Namespace namespace = namespacesHub.get(packet.getNsp());
namespace.onConnect(client);
@ -93,7 +93,8 @@ public class PacketListener {
ackManager.onAck(client, packet);
}
if (packet.getSubType() == PacketType.EVENT) {
if (packet.getSubType() == PacketType.EVENT
|| packet.getSubType() == PacketType.BINARY_EVENT) {
Namespace namespace = namespacesHub.get(packet.getNsp());
List<Object> args = Collections.emptyList();
if (packet.getData() != null) {

34
src/main/java/com/corundumstudio/socketio/protocol/Packet.java

@ -15,7 +15,11 @@
*/
package com.corundumstudio.socketio.protocol;
import io.netty.buffer.ByteBuf;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import com.corundumstudio.socketio.namespace.Namespace;
@ -30,6 +34,10 @@ public class Packet implements Serializable {
private String nsp = Namespace.DEFAULT_NAME;
private Object data;
private ByteBuf dataSource;
private int attachmentsCount;
private List<ByteBuf> attachments;
protected Packet() {
}
@ -93,6 +101,32 @@ public class Packet implements Serializable {
return getAckId() != null && getSubType().equals(PacketType.EVENT);
}
public void initAttachments(int attachmentsCount) {
this.attachmentsCount = attachmentsCount;
this.attachments = new ArrayList<ByteBuf>(attachmentsCount);
}
public void addAttachment(ByteBuf attachment) {
if (this.attachments.size() < attachmentsCount) {
this.attachments.add(attachment);
}
}
public List<ByteBuf> getAttachments() {
return attachments;
}
public boolean hasAttachments() {
return attachmentsCount != 0;
}
public boolean isAttachmentsLoaded() {
return this.attachments.size() == attachmentsCount;
}
public ByteBuf getDataSource() {
return dataSource;
}
public void setDataSource(ByteBuf dataSource) {
this.dataSource = dataSource;
}
@Override
public String toString() {
return "Packet [type=" + type + ", ackId=" + ackId + "]";

89
src/main/java/com/corundumstudio/socketio/protocol/PacketDecoder.java

@ -22,13 +22,17 @@ import io.netty.util.CharsetUtil;
import java.io.IOException;
import java.net.URLDecoder;
import java.util.LinkedList;
import java.util.UUID;
import com.corundumstudio.socketio.AckCallback;
import com.corundumstudio.socketio.ack.AckManager;
import com.corundumstudio.socketio.handler.ClientHead;
public class PacketDecoder {
private final ByteBuf QUOTES = Unpooled.copiedBuffer("\"", CharsetUtil.UTF_8);
private final JsonSupport jsonSupport;
private final AckManager ackManager;
@ -51,7 +55,6 @@ public class PacketDecoder {
Integer length = Integer.valueOf(len);
packet = packet.substring(splitIndex+1, splitIndex+length+1);
// packet = new String(packet.getBytes(CharsetUtil.ISO_8859_1), CharsetUtil.UTF_8);
return Unpooled.wrappedBuffer(packet.getBytes(CharsetUtil.UTF_8));
}
@ -90,7 +93,7 @@ public class PacketDecoder {
}
}
public Packet decodePackets(ByteBuf buffer, UUID uuid) throws IOException {
public Packet decodePackets(ByteBuf buffer, ClientHead client) throws IOException {
boolean isString = buffer.getByte(buffer.readerIndex()) == 0x0;
if (isString) {
int headEndIndex = buffer.bytesBefore((byte)-1);
@ -99,9 +102,9 @@ public class PacketDecoder {
ByteBuf frame = buffer.slice(buffer.readerIndex() + 1, len);
// skip this frame
buffer.readerIndex(buffer.readerIndex() + 1 + len);
return decode(uuid, frame);
return decode(client, frame);
}
return decode(uuid, buffer);
return decode(client, buffer);
}
private String readString(ByteBuf frame) {
@ -114,7 +117,10 @@ public class PacketDecoder {
return new String(bytes, CharsetUtil.UTF_8);
}
private Packet decode(UUID uuid, ByteBuf frame) throws IOException {
private Packet decode(ClientHead head, ByteBuf frame) throws IOException {
if (frame.getByte(0) == 'b') {
return parseBinary(head, frame);
}
PacketType type = readType(frame);
Packet packet = new Packet(type);
@ -130,6 +136,12 @@ public class PacketDecoder {
PacketType innerType = readInnerType(frame);
packet.setSubType(innerType);
parseHeader(frame, packet, innerType);
parseBody(head, frame, packet);
return packet;
}
private void parseHeader(ByteBuf frame, Packet packet, PacketType innerType) {
int endIndex = frame.bytesBefore((byte)'[');
if (endIndex > 0) {
// TODO optimize
@ -144,11 +156,57 @@ public class PacketDecoder {
packet.setAckId(Long.valueOf(ackId));
}
} else {
long ackId = readLong(frame, endIndex);
packet.setAckId(ackId);
boolean hasAttachments = frame.bytesBefore(endIndex, (byte)'-') != -1;
if (hasAttachments && PacketType.BINARY_EVENT.equals(innerType)) {
int attachments = (int) readLong(frame, endIndex-1);
packet.initAttachments(attachments);
frame.readerIndex(frame.readerIndex() + 1);
} else {
long ackId = readLong(frame, endIndex);
packet.setAckId(ackId);
}
}
}
}
private Packet parseBinary(ClientHead head, ByteBuf frame) throws IOException {
frame.readShort();
Packet binaryPacket = head.getLastBinaryPacket();
if (binaryPacket != null) {
binaryPacket.addAttachment(Unpooled.copiedBuffer(frame));
frame.readerIndex(frame.readerIndex() + frame.readableBytes());
if (binaryPacket.isAttachmentsLoaded()) {
LinkedList<ByteBuf> slices = new LinkedList<ByteBuf>();
ByteBuf source = binaryPacket.getDataSource();
for (int i = 0; i < binaryPacket.getAttachments().size(); i++) {
ByteBuf attachment = binaryPacket.getAttachments().get(i);
ByteBuf scanValue = Unpooled.copiedBuffer("{\"_placeholder\":true,\"num\":" + i + "}", CharsetUtil.UTF_8);
int pos = PacketEncoder.find(source, scanValue);
if (pos == -1) {
throw new IllegalStateException("Can't find attachment by index: " + i + " in packet source");
}
ByteBuf prefixBuf = source.slice(source.readerIndex(), pos - source.readerIndex());
slices.add(prefixBuf);
slices.add(QUOTES);
slices.add(attachment);
slices.add(QUOTES);
source.readerIndex(pos + scanValue.readableBytes());
}
slices.add(source.slice());
ByteBuf compositeBuf = Unpooled.wrappedBuffer(slices.toArray(new ByteBuf[slices.size()]));
parseBody(head, compositeBuf, binaryPacket);
head.setLastBinaryPacket(null);
return binaryPacket;
}
}
return new Packet(PacketType.MESSAGE);
}
private void parseBody(ClientHead head, ByteBuf frame, Packet packet) throws IOException {
if (packet.getType() == PacketType.MESSAGE) {
if (packet.getSubType() == PacketType.CONNECT
|| packet.getSubType() == PacketType.DISCONNECT) {
@ -157,20 +215,25 @@ public class PacketDecoder {
if (packet.getSubType() == PacketType.ACK) {
ByteBufInputStream in = new ByteBufInputStream(frame);
AckCallback<?> callback = ackManager.getCallback(uuid, packet.getAckId());
AckCallback<?> callback = ackManager.getCallback(head.getSessionId(), packet.getAckId());
AckArgs args = jsonSupport.readAckArgs(in, callback);
packet.setData(args.getArgs());
}
if (packet.getSubType() == PacketType.EVENT
|| packet.getSubType() == PacketType.BINARY_EVENT) {
ByteBufInputStream in = new ByteBufInputStream(frame);
Event event = jsonSupport.readValue(in, Event.class);
packet.setName(event.getName());
packet.setData(event.getArgs());
if (packet.hasAttachments() && !packet.isAttachmentsLoaded()) {
packet.setDataSource(Unpooled.copiedBuffer(frame));
frame.readerIndex(frame.readableBytes());
head.setLastBinaryPacket(packet);
} else {
ByteBufInputStream in = new ByteBufInputStream(frame);
Event event = jsonSupport.readValue(in, Event.class);
packet.setName(event.getName());
packet.setData(event.getArgs());
}
}
}
return packet;
}
}

12
src/main/java/com/corundumstudio/socketio/protocol/PacketEncoder.java

@ -95,7 +95,6 @@ public class PacketEncoder {
buf.release();
// TODO optimize
packet = QUOTES_PATTERN.matcher(packet).replaceAll("\\\\\"");
// packet = new String(packet.getBytes(CharsetUtil.UTF_8), CharsetUtil.ISO_8859_1);
out.writeBytes(packet.getBytes(CharsetUtil.UTF_8));
out.writeBytes(JSONP_END);
@ -290,7 +289,16 @@ public class PacketEncoder {
return count;
}
private boolean isValueFound(ByteBuf buffer, int index, ByteBuf search) {
public static int find(ByteBuf buffer, ByteBuf searchValue) {
for (int i = buffer.readerIndex(); i < buffer.readerIndex() + buffer.readableBytes(); i++) {
if (isValueFound(buffer, i, searchValue)) {
return i;
}
}
return -1;
}
private static boolean isValueFound(ByteBuf buffer, int index, ByteBuf search) {
for (int i = 0; i < search.readableBytes(); i++) {
if (buffer.getByte(index + i) != search.getByte(i)) {
return false;

Loading…
Cancel
Save