Browse Source

non b64 mode binary event handling. #178

master
Nikita 11 years ago
parent
commit
c32826a792
  1. 23
      src/main/java/com/corundumstudio/socketio/handler/EncoderHandler.java
  2. 35
      src/main/java/com/corundumstudio/socketio/protocol/JacksonJsonSupport.java
  3. 27
      src/main/java/com/corundumstudio/socketio/protocol/PacketDecoder.java
  4. 13
      src/main/java/com/corundumstudio/socketio/protocol/PacketEncoder.java

23
src/main/java/com/corundumstudio/socketio/handler/EncoderHandler.java

@ -23,19 +23,19 @@ import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
import static io.netty.handler.codec.http.HttpHeaders.Values.KEEP_ALIVE;
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.base64.Base64;
import io.netty.handler.codec.base64.Base64Dialect;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
import io.netty.util.Attribute;
@ -59,13 +59,12 @@ import com.corundumstudio.socketio.messages.HttpMessage;
import com.corundumstudio.socketio.messages.OutPacketMessage;
import com.corundumstudio.socketio.messages.XHROptionsMessage;
import com.corundumstudio.socketio.messages.XHRPostMessage;
import com.corundumstudio.socketio.protocol.PacketEncoder;
import com.corundumstudio.socketio.protocol.Packet;
import com.corundumstudio.socketio.protocol.PacketEncoder;
@Sharable
public class EncoderHandler extends ChannelOutboundHandlerAdapter {
private static final byte[] BINARY_HEADER = "b4".getBytes(CharsetUtil.UTF_8);
private static final byte[] OK = "ok".getBytes(CharsetUtil.UTF_8);
public static final AttributeKey<String> ORIGIN = AttributeKey.valueOf("origin");
@ -203,7 +202,7 @@ public class EncoderHandler extends ChannelOutboundHandlerAdapter {
}
}
private void handleWebsocket(OutPacketMessage msg, ChannelHandlerContext ctx) throws IOException {
private void handleWebsocket(final OutPacketMessage msg, ChannelHandlerContext ctx) throws IOException {
while (true) {
Queue<Packet> queue = msg.getClientHead().getPacketsQueue(msg.getTransport());
Packet packet = queue.poll();
@ -211,13 +210,12 @@ public class EncoderHandler extends ChannelOutboundHandlerAdapter {
break;
}
ByteBuf out = encoder.allocateBuffer(ctx.alloc());
final ByteBuf out = encoder.allocateBuffer(ctx.alloc());
encoder.encodePacket(packet, out, ctx.alloc(), true, false);
WebSocketFrame res = new TextWebSocketFrame(out);
if (log.isTraceEnabled()) {
log.trace("Out message: {} sessionId: {}", out.toString(CharsetUtil.UTF_8),
msg.getSessionId());
log.trace("Out message: {} sessionId: {}", out.toString(CharsetUtil.UTF_8), msg.getSessionId());
}
ctx.channel().writeAndFlush(res);
if (!out.isReadable()) {
@ -226,9 +224,12 @@ public class EncoderHandler extends ChannelOutboundHandlerAdapter {
for (ByteBuf buf : packet.getAttachments()) {
ByteBuf outBuf = encoder.allocateBuffer(ctx.alloc());
outBuf.writeBytes(BINARY_HEADER);
outBuf.writeBytes(Base64.encode(buf, Base64Dialect.URL_SAFE));
ctx.channel().writeAndFlush(outBuf);
outBuf.writeByte(4);
outBuf.writeBytes(buf);
if (log.isTraceEnabled()) {
log.trace("Out attachment: {} sessionId: {}", ByteBufUtil.hexDump(outBuf), msg.getSessionId());
}
ctx.channel().writeAndFlush(new BinaryWebSocketFrame(outBuf));
}
}
}

35
src/main/java/com/corundumstudio/socketio/protocol/JacksonJsonSupport.java

@ -40,6 +40,7 @@ import com.fasterxml.jackson.core.JsonGenerationException;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.core.SerializableString;
import com.fasterxml.jackson.core.io.CharacterEscapes;
import com.fasterxml.jackson.core.io.SerializedString;
@ -61,7 +62,6 @@ import com.fasterxml.jackson.databind.jsonFormatVisitors.JsonFormatTypes;
import com.fasterxml.jackson.databind.jsonFormatVisitors.JsonFormatVisitorWrapper;
import com.fasterxml.jackson.databind.jsontype.TypeSerializer;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.fasterxml.jackson.databind.ser.BeanSerializerModifier;
import com.fasterxml.jackson.databind.ser.std.StdSerializer;
@ -200,8 +200,7 @@ public class JacksonJsonSupport implements JsonSupport {
public Event deserialize(JsonParser jp, DeserializationContext ctxt) throws IOException,
JsonProcessingException {
ObjectMapper mapper = (ObjectMapper) jp.getCodec();
ArrayNode root = (ArrayNode) mapper.readTree(jp);
String eventName = root.get(0).asText();
String eventName = jp.nextTextValue();
EventKey ek = new EventKey(namespaceClass.get(), eventName);
if (!eventMapping.containsKey(ek)) {
@ -213,23 +212,21 @@ public class JacksonJsonSupport implements JsonSupport {
List<Object> eventArgs = new ArrayList<Object>();
Event event = new Event(eventName, eventArgs);
if (root.size() > 1) {
Iterator<JsonNode> iterator = root.elements();
// skip 0 node
iterator.next();
List<Class<?>> eventClasses = eventMapping.get(ek);
int i = 0;
while (iterator.hasNext()) {
JsonNode node = iterator.next();
if (i > eventClasses.size() - 1) {
log.debug("Event {} has more args than declared in handler: {}", eventName, root);
break;
}
Class<?> eventClass = eventClasses.get(i);
Object arg = mapper.treeToValue(node, eventClass);
eventArgs.add(arg);
i++;
List<Class<?>> eventClasses = eventMapping.get(ek);
int i = 0;
while (true) {
JsonToken token = jp.nextToken();
if (token == JsonToken.END_ARRAY) {
break;
}
if (i > eventClasses.size() - 1) {
log.debug("Event {} has more args than declared in handler: {}", eventName, null);
break;
}
Class<?> eventClass = eventClasses.get(i);
Object arg = mapper.readValue(jp, eventClass);
eventArgs.add(arg);
i++;
}
return event;
}

27
src/main/java/com/corundumstudio/socketio/protocol/PacketDecoder.java

@ -18,6 +18,8 @@ package com.corundumstudio.socketio.protocol;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.Unpooled;
import io.netty.handler.codec.base64.Base64;
import io.netty.handler.codec.base64.Base64Dialect;
import io.netty.util.CharsetUtil;
import java.io.IOException;
@ -123,7 +125,8 @@ public class PacketDecoder {
}
private Packet decode(ClientHead head, ByteBuf frame) throws IOException {
if (frame.getByte(0) == 'b') {
if ((frame.getByte(0) == 'b' && frame.getByte(1) == '4')
|| frame.getByte(0) == 4 || frame.getByte(0) == 1) {
return parseBinary(head, frame);
}
PacketType type = readType(frame);
@ -175,10 +178,28 @@ public class PacketDecoder {
}
private Packet parseBinary(ClientHead head, ByteBuf frame) throws IOException {
frame.readShort();
if (frame.getByte(0) == 1) {
frame.readByte();
int headEndIndex = frame.bytesBefore((byte)-1);
int len = (int) readLong(frame, headEndIndex);
frame.readShort();
}
if (frame.getByte(0) == 'b' && frame.getByte(1) == '4') {
frame.readShort();
} else if (frame.getByte(0) == 4) {
frame.readByte();
}
Packet binaryPacket = head.getLastBinaryPacket();
if (binaryPacket != null) {
binaryPacket.addAttachment(Unpooled.copiedBuffer(frame));
ByteBuf attachBuf;
if (frame.getByte(0) == 'b' && frame.getByte(1) == '4') {
attachBuf = frame;
} else {
attachBuf = Base64.encode(frame);
}
binaryPacket.addAttachment(Unpooled.copiedBuffer(attachBuf));
frame.readerIndex(frame.readerIndex() + frame.readableBytes());
if (binaryPacket.isAttachmentsLoaded()) {

13
src/main/java/com/corundumstudio/socketio/protocol/PacketEncoder.java

@ -34,6 +34,7 @@ import com.corundumstudio.socketio.Configuration;
public class PacketEncoder {
private static final Pattern QUOTES_PATTERN = Pattern.compile("\"", Pattern.LITERAL);
private static final byte[] BINARY_HEADER = "b4".getBytes(CharsetUtil.UTF_8);
private static final byte[] B64_DELIMITER = new byte[] {':'};
private static final byte[] JSONP_HEAD = "___eio[".getBytes(CharsetUtil.UTF_8);
private static final byte[] JSONP_START = "](\"".getBytes(CharsetUtil.UTF_8);
@ -90,7 +91,7 @@ public class PacketEncoder {
ByteBuf encodedBuf = Base64.encode(attachment, Base64Dialect.URL_SAFE);
buf.writeBytes(toChars(encodedBuf.readableBytes() + 2));
buf.writeBytes(B64_DELIMITER);
buf.writeBytes(new byte[] {'b', '4'});
buf.writeBytes(BINARY_HEADER);
buf.writeBytes(encodedBuf);
}
}
@ -123,11 +124,11 @@ public class PacketEncoder {
i++;
for (ByteBuf attachment : packet.getAttachments()) {
ByteBuf encodedBuf = Base64.encode(attachment, Base64Dialect.URL_SAFE);
buffer.writeBytes(toChars(encodedBuf.readableBytes() + 2));
buffer.writeBytes(B64_DELIMITER);
buffer.writeBytes(new byte[] {'b', '4'});
buffer.writeBytes(encodedBuf);
buffer.writeByte(1);
buffer.writeBytes(longToBytes(attachment.readableBytes() + 1));
buffer.writeByte(0xff);
buffer.writeByte(4);
buffer.writeBytes(attachment);
}
}
}

Loading…
Cancel
Save