From 7ccd152da8e461e63ba5a8984c89523cacc004e6 Mon Sep 17 00:00:00 2001 From: Nikita Date: Sat, 12 May 2012 14:41:31 +0400 Subject: [PATCH] Binary packet parsing --- pom.xml | 6 + .../socketio/PacketHandler.java | 84 ++++++++++++ .../socketio/SocketIOPipelineFactory.java | 12 +- .../socketio/messages/PacketsMessage.java | 25 ++++ .../transport/WebSocketTransport.java | 107 +++++++-------- .../socketio/transport/XHRPollingClient.java | 6 +- .../transport/XHRPollingTransport.java | 30 ++--- .../socketio/PacketHandlerTest.java | 123 ++++++++++++++++++ 8 files changed, 314 insertions(+), 79 deletions(-) create mode 100644 src/main/java/com/corundumstudio/socketio/PacketHandler.java create mode 100644 src/main/java/com/corundumstudio/socketio/messages/PacketsMessage.java create mode 100644 src/test/java/com/corundumstudio/socketio/PacketHandlerTest.java diff --git a/pom.xml b/pom.xml index c2a64d3..802e24e 100644 --- a/pom.xml +++ b/pom.xml @@ -41,6 +41,12 @@ 4.10 test + + com.googlecode.jmockit + jmockit + 0.999.15 + test + io.netty netty diff --git a/src/main/java/com/corundumstudio/socketio/PacketHandler.java b/src/main/java/com/corundumstudio/socketio/PacketHandler.java new file mode 100644 index 0000000..067e114 --- /dev/null +++ b/src/main/java/com/corundumstudio/socketio/PacketHandler.java @@ -0,0 +1,84 @@ +package com.corundumstudio.socketio; + +import java.io.IOException; + +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.channel.MessageEvent; +import org.jboss.netty.channel.SimpleChannelUpstreamHandler; +import org.jboss.netty.channel.ChannelHandler.Sharable; +import org.jboss.netty.util.CharsetUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.corundumstudio.socketio.messages.PacketsMessage; +import com.corundumstudio.socketio.parser.Decoder; +import com.corundumstudio.socketio.parser.Packet; + +@Sharable +public class PacketHandler extends SimpleChannelUpstreamHandler { + + private final Logger log = LoggerFactory.getLogger(getClass()); + + private final PacketListener packetListener; + private final Decoder decoder; + + public PacketHandler(PacketListener packetListener, Decoder decoder) { + super(); + this.packetListener = packetListener; + this.decoder = decoder; + } + + @Override + public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) + throws Exception { + Object msg = e.getMessage(); + if (msg instanceof PacketsMessage) { + PacketsMessage message = (PacketsMessage) msg; + ChannelBuffer content = message.getContent(); + + if (log.isTraceEnabled()) { + log.trace("In message: {} sessionId: {}", new Object[] {content.toString(CharsetUtil.UTF_8), message.getClient().getSessionId()}); + } + while (content.readable()) { + Packet packet = decode(content); + packetListener.onPacket(packet, message.getClient()); + } + } else { + ctx.sendUpstream(e); + } + } + + // TODO use ForkJoin + private Packet decode(ChannelBuffer buffer) throws IOException { + char delimiter = getChar(buffer, buffer.readerIndex()); + if (delimiter == Packet.DELIMITER) { + StringBuilder length = new StringBuilder(4); + for (int i = buffer.readerIndex() + 2 + 1; i < buffer.readerIndex() + buffer.readableBytes(); i++) { + if (getChar(buffer, i) == Packet.DELIMITER) { + break; + } else { + length.append((char)buffer.getUnsignedByte(i)); + } + } + Integer len = Integer.valueOf(length.toString()); + + int startIndex = buffer.readerIndex() + 3 + length.length() + 3; + ChannelBuffer frame = buffer.slice(startIndex, len); + Packet packet = decoder.decodePacket(frame.toString(CharsetUtil.UTF_8)); + buffer.readerIndex(startIndex + len); + return packet; + } else { + Packet packet = decoder.decodePacket(buffer.toString(CharsetUtil.UTF_8)); + buffer.readerIndex(buffer.readableBytes()); + return packet; + } + } + + // TODO refactor it + private char getChar(ChannelBuffer buffer, int index) { + byte[] bytes = {buffer.getByte(index), buffer.getByte(index + 1)}; + return new String(bytes).charAt(0); + } + +} diff --git a/src/main/java/com/corundumstudio/socketio/SocketIOPipelineFactory.java b/src/main/java/com/corundumstudio/socketio/SocketIOPipelineFactory.java index 259df06..6c3e535 100644 --- a/src/main/java/com/corundumstudio/socketio/SocketIOPipelineFactory.java +++ b/src/main/java/com/corundumstudio/socketio/SocketIOPipelineFactory.java @@ -46,17 +46,21 @@ public class SocketIOPipelineFactory implements ChannelPipelineFactory, Disconne private SocketIOListener socketIOHandler; private HeartbeatHandler heartbeatHandler; + private PacketHandler packetHandler; + public SocketIOPipelineFactory(Configuration configuration) { this.socketIOHandler = configuration.getListener(); + this.heartbeatHandler = new HeartbeatHandler(configuration); + ObjectMapper objectMapper = configuration.getObjectMapper(); Encoder encoder = new Encoder(objectMapper); Decoder decoder = new Decoder(objectMapper); - this.heartbeatHandler = new HeartbeatHandler(configuration); PacketListener packetListener = new PacketListener(socketIOHandler, this, heartbeatHandler); + packetHandler = new PacketHandler(packetListener, decoder); authorizeHandler = new AuthorizeHandler(connectPath, socketIOHandler, configuration); - xhrPollingTransport = new XHRPollingTransport(connectPath, decoder, packetListener, this, heartbeatHandler, authorizeHandler, configuration); - webSocketTransport = new WebSocketTransport(connectPath, decoder, this, packetListener, authorizeHandler); + xhrPollingTransport = new XHRPollingTransport(connectPath, this, heartbeatHandler, authorizeHandler, configuration); + webSocketTransport = new WebSocketTransport(connectPath, this, authorizeHandler); socketIOEncoder = new SocketIOEncoder(objectMapper, encoder); } @@ -67,6 +71,8 @@ public class SocketIOPipelineFactory implements ChannelPipelineFactory, Disconne pipeline.addLast("aggregator", new HttpChunkAggregator(65536)); pipeline.addLast("encoder", new HttpResponseEncoder()); + pipeline.addLast("packetHandler", packetHandler); + pipeline.addLast("authorizeHandler", authorizeHandler); pipeline.addLast("xhrPollingTransport", xhrPollingTransport); pipeline.addLast("webSocketTransport", webSocketTransport); diff --git a/src/main/java/com/corundumstudio/socketio/messages/PacketsMessage.java b/src/main/java/com/corundumstudio/socketio/messages/PacketsMessage.java new file mode 100644 index 0000000..0916ad9 --- /dev/null +++ b/src/main/java/com/corundumstudio/socketio/messages/PacketsMessage.java @@ -0,0 +1,25 @@ +package com.corundumstudio.socketio.messages; + +import org.jboss.netty.buffer.ChannelBuffer; + +import com.corundumstudio.socketio.SocketIOClient; + +public class PacketsMessage { + + private final SocketIOClient client; + private final ChannelBuffer content; + + public PacketsMessage(SocketIOClient client, ChannelBuffer content) { + this.client = client; + this.content = content; + } + + public SocketIOClient getClient() { + return client; + } + + public ChannelBuffer getContent() { + return content; + } + +} diff --git a/src/main/java/com/corundumstudio/socketio/transport/WebSocketTransport.java b/src/main/java/com/corundumstudio/socketio/transport/WebSocketTransport.java index b8c4c80..cf67231 100644 --- a/src/main/java/com/corundumstudio/socketio/transport/WebSocketTransport.java +++ b/src/main/java/com/corundumstudio/socketio/transport/WebSocketTransport.java @@ -15,13 +15,15 @@ */ package com.corundumstudio.socketio.transport; -import java.util.List; +import java.io.IOException; import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; +import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.channel.Channels; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.SimpleChannelUpstreamHandler; import org.jboss.netty.handler.codec.http.HttpHeaders; @@ -31,15 +33,14 @@ import org.jboss.netty.handler.codec.http.websocketx.CloseWebSocketFrame; import org.jboss.netty.handler.codec.http.websocketx.TextWebSocketFrame; import org.jboss.netty.handler.codec.http.websocketx.WebSocketServerHandshaker; import org.jboss.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory; +import org.jboss.netty.util.CharsetUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.corundumstudio.socketio.AuthorizeHandler; import com.corundumstudio.socketio.Disconnectable; -import com.corundumstudio.socketio.PacketListener; import com.corundumstudio.socketio.SocketIOClient; -import com.corundumstudio.socketio.parser.Decoder; -import com.corundumstudio.socketio.parser.Packet; +import com.corundumstudio.socketio.messages.PacketsMessage; public class WebSocketTransport extends SimpleChannelUpstreamHandler implements Disconnectable { @@ -50,18 +51,13 @@ public class WebSocketTransport extends SimpleChannelUpstreamHandler implements private final AuthorizeHandler authorizeHandler; private final Disconnectable disconnectable; - private final PacketListener packetListener; - private final Decoder decoder; private final String path; - public WebSocketTransport(String connectPath, Decoder decoder, - Disconnectable disconnectable, PacketListener packetListener, AuthorizeHandler authorizeHandler) { + public WebSocketTransport(String connectPath, Disconnectable disconnectable, AuthorizeHandler authorizeHandler) { this.path = connectPath + "websocket"; - this.decoder = decoder; this.authorizeHandler = authorizeHandler; this.disconnectable = disconnectable; - this.packetListener = packetListener; } @Override @@ -71,57 +67,66 @@ public class WebSocketTransport extends SimpleChannelUpstreamHandler implements ctx.getChannel().close(); } else if (msg instanceof TextWebSocketFrame) { TextWebSocketFrame frame = (TextWebSocketFrame) msg; - WebSocketClient client = channelId2Client.get(ctx.getChannel().getId()); - String content = frame.getText(); - - log.trace("In message: {} sessionId: {}", new Object[] {content, client.getSessionId()}); - - List packets = decoder.decodePayload(content); - for (Packet packet : packets) { - packetListener.onPacket(packet, client); - } + receivePackets(ctx, frame.getBinaryData()); } else if (msg instanceof HttpRequest) { HttpRequest req = (HttpRequest) msg; - - WebSocketServerHandshakerFactory factory = new WebSocketServerHandshakerFactory(getWebSocketLocation(req), null, false); - WebSocketServerHandshaker handshaker = factory.newHandshaker(req); - if (handshaker != null) { - handshaker.handshake(ctx.getChannel(), req); - QueryStringDecoder queryDecoder = new QueryStringDecoder(req.getUri()); - connectClient(ctx.getChannel(), queryDecoder); - } else { - factory.sendUnsupportedWebSocketVersionResponse(ctx.getChannel()); - } + handshake(ctx, req); } else { ctx.sendUpstream(e); } } - private void connectClient(Channel channel, QueryStringDecoder queryDecoder) { - String path = queryDecoder.getPath(); - if (!path.startsWith(this.path)) { + private void handshake(ChannelHandlerContext ctx, HttpRequest req) { + QueryStringDecoder queryDecoder = new QueryStringDecoder(req.getUri()); + Channel channel = ctx.getChannel(); + String path = queryDecoder.getPath(); + if (!path.startsWith(this.path)) { + return; + } + + String[] parts = path.split("/"); + if (parts.length <= 3) { + log.warn("Wrong GET request path: {}, from ip: {}. Channel closed!", + new Object[] {path, channel.getRemoteAddress()}); + channel.close(); + return; + } + + UUID sessionId = UUID.fromString(parts[4]); + + WebSocketServerHandshakerFactory factory = new WebSocketServerHandshakerFactory(getWebSocketLocation(req), null, false); + WebSocketServerHandshaker handshaker = factory.newHandshaker(req); + if (handshaker != null) { + handshaker.handshake(channel, req); + connectClient(channel, sessionId); + } else { + factory.sendUnsupportedWebSocketVersionResponse(ctx.getChannel()); + } + } + + private void receivePackets(ChannelHandlerContext ctx, + ChannelBuffer channelBuffer) throws IOException { + WebSocketClient client = channelId2Client.get(ctx.getChannel().getId()); + if (log.isTraceEnabled()) { + String content = channelBuffer.toString(CharsetUtil.UTF_8); + log.trace("In message: {} sessionId: {}", new Object[] {content, client.getSessionId()}); + } + + Channels.fireMessageReceived(ctx.getChannel(), new PacketsMessage(client, channelBuffer)); + } + + private void connectClient(Channel channel, UUID sessionId) { + if (!authorizeHandler.isSessionAuthorized(sessionId)) { + log.warn("Unauthorized client with sessionId: {}, from ip: {}. Channel closed!", + new Object[] {sessionId, channel.getRemoteAddress()}); + channel.close(); return; } - String[] parts = path.split("/"); - if (parts.length > 3) { - UUID sessionId = UUID.fromString(parts[4]); - if (!authorizeHandler.isSessionAuthorized(sessionId)) { - log.warn("Unauthorized client with sessionId: {}, from ip: {}. Channel closed!", - new Object[] {sessionId, channel.getRemoteAddress()}); - channel.close(); - return; - } - - WebSocketClient client = new WebSocketClient(channel, disconnectable, sessionId); - channelId2Client.put(channel.getId(), client); - sessionId2Client.put(sessionId, client); - authorizeHandler.connect(client); - } else { - log.warn("Wrong GET request path: {}, from ip: {}. Channel closed!", - new Object[] {path, channel.getRemoteAddress()}); - channel.close(); - } + WebSocketClient client = new WebSocketClient(channel, disconnectable, sessionId); + channelId2Client.put(channel.getId(), client); + sessionId2Client.put(sessionId, client); + authorizeHandler.connect(client); } private String getWebSocketLocation(HttpRequest req) { diff --git a/src/main/java/com/corundumstudio/socketio/transport/XHRPollingClient.java b/src/main/java/com/corundumstudio/socketio/transport/XHRPollingClient.java index 39d1cc9..5bedef7 100644 --- a/src/main/java/com/corundumstudio/socketio/transport/XHRPollingClient.java +++ b/src/main/java/com/corundumstudio/socketio/transport/XHRPollingClient.java @@ -19,8 +19,6 @@ import java.util.UUID; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelFuture; -import org.jboss.netty.handler.codec.http.HttpHeaders; -import org.jboss.netty.handler.codec.http.HttpRequest; import com.corundumstudio.socketio.Disconnectable; import com.corundumstudio.socketio.messages.XHRNewChannelMessage; @@ -39,8 +37,8 @@ public class XHRPollingClient extends BaseClient { this.disconnectable = disconnectable; } - public void update(Channel channel, HttpRequest req) { - this.origin = req.getHeader(HttpHeaders.Names.ORIGIN); + public void update(Channel channel, String origin) { + this.origin = origin; this.channel = channel; channel.write(new XHRNewChannelMessage(sessionId, origin)); } diff --git a/src/main/java/com/corundumstudio/socketio/transport/XHRPollingTransport.java b/src/main/java/com/corundumstudio/socketio/transport/XHRPollingTransport.java index 283212b..b686085 100644 --- a/src/main/java/com/corundumstudio/socketio/transport/XHRPollingTransport.java +++ b/src/main/java/com/corundumstudio/socketio/transport/XHRPollingTransport.java @@ -16,20 +16,19 @@ package com.corundumstudio.socketio.transport; import java.io.IOException; -import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.channel.Channels; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.SimpleChannelUpstreamHandler; import org.jboss.netty.handler.codec.http.HttpHeaders; import org.jboss.netty.handler.codec.http.HttpMethod; import org.jboss.netty.handler.codec.http.HttpRequest; import org.jboss.netty.handler.codec.http.QueryStringDecoder; -import org.jboss.netty.util.CharsetUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,11 +36,10 @@ import com.corundumstudio.socketio.AuthorizeHandler; import com.corundumstudio.socketio.Configuration; import com.corundumstudio.socketio.Disconnectable; import com.corundumstudio.socketio.HeartbeatHandler; -import com.corundumstudio.socketio.PacketListener; import com.corundumstudio.socketio.SocketIOClient; +import com.corundumstudio.socketio.messages.PacketsMessage; import com.corundumstudio.socketio.messages.XHRErrorMessage; import com.corundumstudio.socketio.messages.XHRPostMessage; -import com.corundumstudio.socketio.parser.Decoder; import com.corundumstudio.socketio.parser.ErrorAdvice; import com.corundumstudio.socketio.parser.ErrorReason; import com.corundumstudio.socketio.parser.Packet; @@ -55,22 +53,17 @@ public class XHRPollingTransport extends SimpleChannelUpstreamHandler implements private final AuthorizeHandler authorizeHandler; private final HeartbeatHandler heartbeatHandler; - private final PacketListener packetListener; private final Disconnectable disconnectable; - private final Decoder decoder; private final String path; private final Configuration configuration; - public XHRPollingTransport(String connectPath, Decoder decoder, - PacketListener packetListener, Disconnectable disconnectable, + public XHRPollingTransport(String connectPath, Disconnectable disconnectable, HeartbeatHandler heartbeatHandler, AuthorizeHandler authorizeHandler, Configuration configuration) { this.path = connectPath + "xhr-polling/"; this.authorizeHandler = authorizeHandler; this.configuration = configuration; this.heartbeatHandler = heartbeatHandler; this.disconnectable = disconnectable; - this.decoder = decoder; - this.packetListener = packetListener; } public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception { @@ -112,14 +105,8 @@ public class XHRPollingTransport extends SimpleChannelUpstreamHandler implements return; } - String content = req.getContent().toString(CharsetUtil.UTF_8); - log.trace("In message: {} sessionId: {}", new Object[] {content, sessionId}); - List packets = decoder.decodePayload(content); - for (Packet packet : packets) { - packetListener.onPacket(packet, client); - } - String origin = req.getHeader(HttpHeaders.Names.ORIGIN); + Channels.fireMessageReceived(channel, new PacketsMessage(client, req.getContent())); channel.write(new XHRPostMessage(origin)); } @@ -128,18 +115,19 @@ public class XHRPollingTransport extends SimpleChannelUpstreamHandler implements sendError(channel, req, sessionId); return; } + String origin = req.getHeader(HttpHeaders.Names.ORIGIN); XHRPollingClient client = sessionId2Client.get(sessionId); if (client == null) { - client = createClient(req, channel, sessionId); + client = createClient(origin, channel, sessionId); } - client.update(channel, req); + client.update(channel, origin); } - private XHRPollingClient createClient(HttpRequest req, Channel channel, UUID sessionId) { + private XHRPollingClient createClient(String origin, Channel channel, UUID sessionId) { XHRPollingClient client = new XHRPollingClient(authorizeHandler, sessionId); sessionId2Client.put(sessionId, client); - client.update(channel, req); + client.update(channel, origin); authorizeHandler.connect(client); if (configuration.isHeartbeatsEnabled()) { diff --git a/src/test/java/com/corundumstudio/socketio/PacketHandlerTest.java b/src/test/java/com/corundumstudio/socketio/PacketHandlerTest.java new file mode 100644 index 0000000..9999030 --- /dev/null +++ b/src/test/java/com/corundumstudio/socketio/PacketHandlerTest.java @@ -0,0 +1,123 @@ +package com.corundumstudio.socketio; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; + +import junit.framework.Assert; +import mockit.Mocked; + +import org.codehaus.jackson.map.ObjectMapper; +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBuffers; +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.UpstreamMessageEvent; +import org.junit.Test; + +import com.corundumstudio.socketio.messages.PacketsMessage; +import com.corundumstudio.socketio.parser.Decoder; +import com.corundumstudio.socketio.parser.Encoder; +import com.corundumstudio.socketio.parser.Packet; +import com.corundumstudio.socketio.parser.PacketType; + +public class PacketHandlerTest { + + private ObjectMapper map = new ObjectMapper(); + private Decoder decoder = new Decoder(map); + private Encoder encoder = new Encoder(map); + @Mocked + private Channel channel; + + @Test + public void testOnePacket() throws Exception { + final AtomicInteger invocations = new AtomicInteger(); + PacketListener listener = new PacketListener(null, null, null) { + @Override + public void onPacket(Packet packet, SocketIOClient client) { + invocations.incrementAndGet(); + Assert.assertEquals(PacketType.JSON, packet.getType()); + Map map = (Map) packet.getData(); + Assert.assertTrue(map.keySet().size() == 1); + Assert.assertTrue(map.keySet().contains("test1")); + } + }; + PacketHandler handler = new PacketHandler(listener, decoder); + + List packets = new ArrayList(); + Packet packet = new Packet(PacketType.JSON); + packet.setData(Collections.singletonMap("test1", "test2")); + packets.add(packet); + + testHandler(invocations, handler, packets); + } + + @Test + public void testMultiplePackets() throws Exception { + final AtomicInteger invocations = new AtomicInteger(); + PacketListener listener = new PacketListener(null, null, null) { + @Override + public void onPacket(Packet packet, SocketIOClient client) { + if (packet.getType() == PacketType.CONNECT) { + invocations.incrementAndGet(); + return; + } + Assert.assertEquals(PacketType.JSON, packet.getType()); + Map map = (Map) packet.getData(); + Set keys = new HashSet(); + keys.add("test1"); + keys.add("fsdfdf"); + Assert.assertTrue(map.keySet().size() == 1); + Assert.assertTrue(map.keySet().removeAll(keys)); + invocations.incrementAndGet(); + } + }; + PacketHandler handler = new PacketHandler(listener, decoder); + + List packets = new ArrayList(); + Packet packet3 = new Packet(PacketType.CONNECT); + packets.add(packet3); + + Packet packet = new Packet(PacketType.JSON); + packet.setData(Collections.singletonMap("test1", "test2")); + packets.add(packet); + + Packet packet1 = new Packet(PacketType.JSON); + packet1.setData(Collections.singletonMap("fsdfdf", "wqeq")); + packets.add(packet1); + + testHandler(invocations, handler, packets); + } + + private void testHandler(final AtomicInteger invocations, + PacketHandler handler, List packets) throws Exception { + String str = encoder.encodePackets(packets); + ChannelBuffer buffer = ChannelBuffers.wrappedBuffer(str.getBytes()); + handler.messageReceived(null, new UpstreamMessageEvent(channel, new PacketsMessage(null, buffer), null)); + Assert.assertEquals(packets.size(), invocations.get()); + } + + //@Test + public void testDecodePerf() throws Exception { + PacketListener listener = new PacketListener(null, null, null) { + @Override + public void onPacket(Packet packet, SocketIOClient client) { + } + }; + PacketHandler handler = new PacketHandler(listener, decoder); + long start = System.currentTimeMillis(); + ChannelBuffer buffer = ChannelBuffers.wrappedBuffer("\ufffd5\ufffd3:::5\ufffd7\ufffd3:::53d\ufffd3\ufffd0::\ufffd5\ufffd3:::5\ufffd7\ufffd3:::53d\ufffd3\ufffd0::\ufffd5\ufffd3:::5\ufffd7\ufffd3:::53d\ufffd3\ufffd0::\ufffd5\ufffd3:::5\ufffd7\ufffd3:::53d\ufffd3\ufffd0::\ufffd5\ufffd3:::5\ufffd7\ufffd3:::53d\ufffd3\ufffd0::".getBytes()); + for (int i = 0; i < 50000; i++) { + ChannelBuffer t = buffer.copy(); + handler.messageReceived(null, new UpstreamMessageEvent(channel, new PacketsMessage(null, t), null)); + } + long end = System.currentTimeMillis() - start; + System.out.println(end + "ms"); + // 1143ms + } + + +}