diff --git a/src/main/java/com/corundumstudio/socketio/demo/discard/DiscardServer.java b/src/main/java/com/corundumstudio/socketio/demo/discard/DiscardServer.java new file mode 100644 index 0000000..7292e1e --- /dev/null +++ b/src/main/java/com/corundumstudio/socketio/demo/discard/DiscardServer.java @@ -0,0 +1,65 @@ +package com.corundumstudio.socketio.demo.discard; + +import io.netty.bootstrap.ServerBootstrap; + +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; + +/** + * TODO : + * + * @author ye.jf + * @version 0.1.0 + * @date 2022/1/1 14:18 + * @copyright Wonhigh Information Technology (Shenzhen) Co.,Ltd. + */ +public class DiscardServer { + private int port; + + public DiscardServer(int port) { + this.port = port; + } + + public void run() throws Exception { + EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1) + EventLoopGroup workerGroup = new NioEventLoopGroup(); + try { + ServerBootstrap b = new ServerBootstrap(); // (2) + b.group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) // (3) + .childHandler(new ChannelInitializer() { // (4) + @Override + public void initChannel(SocketChannel ch) throws Exception { + ch.pipeline().addLast(new DiscardServerHandler()); + } + }) + .option(ChannelOption.SO_BACKLOG, 128) // (5) + .childOption(ChannelOption.SO_KEEPALIVE, true); // (6) + + // Bind and start to accept incoming connections. + ChannelFuture f = b.bind(port).sync(); // (7) + + // Wait until the server socket is closed. + // In this example, this does not happen, but you can do that to gracefully + // shut down your server. + f.channel().closeFuture().sync(); + } finally { + workerGroup.shutdownGracefully(); + bossGroup.shutdownGracefully(); + } + } + + public static void main(String[] args) throws Exception { + int port = 8080; + if (args.length > 0) { + port = Integer.parseInt(args[0]); + } + + new DiscardServer(port).run(); + } +} diff --git a/src/main/java/com/corundumstudio/socketio/demo/discard/DiscardServerHandler.java b/src/main/java/com/corundumstudio/socketio/demo/discard/DiscardServerHandler.java new file mode 100644 index 0000000..4c3f3be --- /dev/null +++ b/src/main/java/com/corundumstudio/socketio/demo/discard/DiscardServerHandler.java @@ -0,0 +1,46 @@ +package com.corundumstudio.socketio.demo.discard; + +import io.netty.buffer.ByteBuf; + +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.util.ReferenceCountUtil; + +/** + * TODO : + * + * @author ye.jf + * @version 0.1.0 + * @date 2022/1/1 12:33 + * @copyright Wonhigh Information Technology (Shenzhen) Co.,Ltd. + */ +public class DiscardServerHandler extends ChannelInboundHandlerAdapter { + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2) + // Discard the received data silently. + System.out.println("DiscardServerHandler channelRead"); + /*ByteBuf in = (ByteBuf) msg; + try { + // 丢弃协议实现 + while (in.isReadable()) { // (1) + System.out.print((char) in.readByte()); + System.out.flush(); + } + } finally { + ReferenceCountUtil.release(msg); + }*/ + // 响应服务实现 + ctx.write(msg); // (1) + ctx.flush(); // (2) + + //ctx.writeAndFlush(msg); + + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4) + // Close the connection when an exception is raised. + cause.printStackTrace(); + ctx.close(); + } +} diff --git a/src/main/java/com/corundumstudio/socketio/demo/time/TimeClient.java b/src/main/java/com/corundumstudio/socketio/demo/time/TimeClient.java new file mode 100644 index 0000000..ecb8c67 --- /dev/null +++ b/src/main/java/com/corundumstudio/socketio/demo/time/TimeClient.java @@ -0,0 +1,49 @@ +package com.corundumstudio.socketio.demo.time; + +import io.netty.bootstrap.Bootstrap; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; + +/** + * TODO : + * + * @author ye.jf + * @version 0.1.0 + * @date 2022/1/3 13:54 + * @copyright Wonhigh Information Technology (Shenzhen) Co.,Ltd. + */ +public class TimeClient { + public static void main(String[] args) throws Exception { + //String host = args[0]; + //int port = Integer.parseInt(args[1]); + String host = "localhost"; + int port = 8080; + EventLoopGroup workerGroup = new NioEventLoopGroup(); + + try { + Bootstrap b = new Bootstrap(); // (1) + b.group(workerGroup); // (2) + b.channel(NioSocketChannel.class); // (3) + b.option(ChannelOption.SO_KEEPALIVE, true); // (4) + b.handler(new ChannelInitializer() { + @Override + public void initChannel(SocketChannel ch) throws Exception { + ch.pipeline().addLast(new TimeClientHandler()); + } + }); + + // Start the client. + ChannelFuture f = b.connect(host, port).sync(); // (5) + + // Wait until the connection is closed. + f.channel().closeFuture().sync(); + } finally { + workerGroup.shutdownGracefully(); + } + } +} diff --git a/src/main/java/com/corundumstudio/socketio/demo/time/TimeClientHandler.java b/src/main/java/com/corundumstudio/socketio/demo/time/TimeClientHandler.java new file mode 100644 index 0000000..13e4c65 --- /dev/null +++ b/src/main/java/com/corundumstudio/socketio/demo/time/TimeClientHandler.java @@ -0,0 +1,68 @@ +package com.corundumstudio.socketio.demo.time; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; + +import java.util.Date; + +/** + * TODO : + * + * @author ye.jf + * @version 0.1.0 + * @date 2022/1/3 14:04 + * @copyright Wonhigh Information Technology (Shenzhen) Co.,Ltd. + */ +public class TimeClientHandler extends ChannelInboundHandlerAdapter { + /** + * 以下这种实现方案有可能出现越界异常,并且在处理大数据量时,会出现数据碎片化 例如 发送方三个数据包 abc def ghi, 接收时变成 ab cdefg h i + * + * 简单的解决方案是创建一个内部累积缓冲区,并等待直到所有4个字节都被接收到内部缓冲区。 + * @param ctx + * @param msg + */ + /*@Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + ByteBuf m = (ByteBuf) msg; // (1) + try { + long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L; + System.out.println(new Date(currentTimeMillis)); + ctx.close(); + } finally { + m.release(); + } + }*/ + + private ByteBuf buf; + + @Override + public void handlerAdded(ChannelHandlerContext ctx) { + buf = ctx.alloc().buffer(4); // (1) + } + + @Override + public void handlerRemoved(ChannelHandlerContext ctx) { + buf.release(); // (1) + buf = null; + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + ByteBuf m = (ByteBuf) msg; + buf.writeBytes(m); // (2) + m.release(); + + if (buf.readableBytes() >= 4) { // (3) + long currentTimeMillis = (buf.readUnsignedInt() - 2208988800L) * 1000L; + System.out.println(new Date(currentTimeMillis)); + ctx.close(); + } + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + cause.printStackTrace(); + ctx.close(); + } +} diff --git a/src/main/java/com/corundumstudio/socketio/demo/time/TimeServer.java b/src/main/java/com/corundumstudio/socketio/demo/time/TimeServer.java new file mode 100644 index 0000000..be6efd9 --- /dev/null +++ b/src/main/java/com/corundumstudio/socketio/demo/time/TimeServer.java @@ -0,0 +1,65 @@ +package com.corundumstudio.socketio.demo.time; + +import com.corundumstudio.socketio.demo.discard.DiscardServerHandler; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.SocketChannel; +import io.netty.channel.socket.nio.NioServerSocketChannel; + +/** + * TODO : + * + * @author ye.jf + * @version 0.1.0 + * @date 2022/1/1 14:18 + * @copyright Wonhigh Information Technology (Shenzhen) Co.,Ltd. + */ +public class TimeServer { + private int port; + + public TimeServer(int port) { + this.port = port; + } + + public void run() throws Exception { + EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1) + EventLoopGroup workerGroup = new NioEventLoopGroup(); + try { + ServerBootstrap b = new ServerBootstrap(); // (2) + b.group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) // (3) + .childHandler(new ChannelInitializer() { // (4) + @Override + public void initChannel(SocketChannel ch) throws Exception { + ch.pipeline().addLast(new TimeServerHandler()); + } + }) + .option(ChannelOption.SO_BACKLOG, 128) // (5) + .childOption(ChannelOption.SO_KEEPALIVE, true); // (6) + + // Bind and start to accept incoming connections. + ChannelFuture f = b.bind(port).sync(); // (7) + + // Wait until the server socket is closed. + // In this example, this does not happen, but you can do that to gracefully + // shut down your server. + f.channel().closeFuture().sync(); + } finally { + workerGroup.shutdownGracefully(); + bossGroup.shutdownGracefully(); + } + } + + public static void main(String[] args) throws Exception { + int port = 8080; + if (args.length > 0) { + port = Integer.parseInt(args[0]); + } + + new TimeServer(port).run(); + } +} diff --git a/src/main/java/com/corundumstudio/socketio/demo/time/TimeServerHandler.java b/src/main/java/com/corundumstudio/socketio/demo/time/TimeServerHandler.java new file mode 100644 index 0000000..90de0d9 --- /dev/null +++ b/src/main/java/com/corundumstudio/socketio/demo/time/TimeServerHandler.java @@ -0,0 +1,44 @@ +package com.corundumstudio.socketio.demo.time; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; + +import java.sql.Time; + +/** + * TODO : + * + * @author ye.jf + * @version 0.1.0 + * @date 2022/1/1 14:55 + * @copyright Wonhigh Information Technology (Shenzhen) Co.,Ltd. + */ +public class TimeServerHandler extends ChannelInboundHandlerAdapter { + + @Override + public void channelActive(final ChannelHandlerContext ctx) { // (1) + //忽略任何接收到的数据,建立连接后立即返回 + System.out.println("TimeServerHandler channelActive" ); + final ByteBuf time = ctx.alloc().buffer(4); // (2) + time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L)); + + final ChannelFuture f = ctx.writeAndFlush(time); // (3) + f.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) { + assert f == future; + ctx.close(); + } + }); // (4) + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + cause.printStackTrace(); + ctx.close(); + } + +}