Browse Source

read netty document chapter 1

master
叶俊锋 4 years ago
parent
commit
29b8d6874e
  1. 65
      src/main/java/com/corundumstudio/socketio/demo/discard/DiscardServer.java
  2. 46
      src/main/java/com/corundumstudio/socketio/demo/discard/DiscardServerHandler.java
  3. 49
      src/main/java/com/corundumstudio/socketio/demo/time/TimeClient.java
  4. 68
      src/main/java/com/corundumstudio/socketio/demo/time/TimeClientHandler.java
  5. 65
      src/main/java/com/corundumstudio/socketio/demo/time/TimeServer.java
  6. 44
      src/main/java/com/corundumstudio/socketio/demo/time/TimeServerHandler.java

65
src/main/java/com/corundumstudio/socketio/demo/discard/DiscardServer.java

@ -0,0 +1,65 @@
package com.corundumstudio.socketio.demo.discard;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* TODO :
*
* @author ye.jf
* @version 0.1.0
* @date 2022/1/1 14:18
* @copyright Wonhigh Information Technology (Shenzhen) Co.,Ltd.
*/
public class DiscardServer {
private int port;
public DiscardServer(int port) {
this.port = port;
}
public void run() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap(); // (2)
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // (3)
.childHandler(new ChannelInitializer<SocketChannel>() { // (4)
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new DiscardServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128) // (5)
.childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
// Bind and start to accept incoming connections.
ChannelFuture f = b.bind(port).sync(); // (7)
// Wait until the server socket is closed.
// In this example, this does not happen, but you can do that to gracefully
// shut down your server.
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port = 8080;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
}
new DiscardServer(port).run();
}
}

46
src/main/java/com/corundumstudio/socketio/demo/discard/DiscardServerHandler.java

@ -0,0 +1,46 @@
package com.corundumstudio.socketio.demo.discard;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.ReferenceCountUtil;
/**
* TODO :
*
* @author ye.jf
* @version 0.1.0
* @date 2022/1/1 12:33
* @copyright Wonhigh Information Technology (Shenzhen) Co.,Ltd.
*/
public class DiscardServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) { // (2)
// Discard the received data silently.
System.out.println("DiscardServerHandler channelRead");
/*ByteBuf in = (ByteBuf) msg;
try {
// 丢弃协议实现
while (in.isReadable()) { // (1)
System.out.print((char) in.readByte());
System.out.flush();
}
} finally {
ReferenceCountUtil.release(msg);
}*/
// 响应服务实现
ctx.write(msg); // (1)
ctx.flush(); // (2)
//ctx.writeAndFlush(msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (4)
// Close the connection when an exception is raised.
cause.printStackTrace();
ctx.close();
}
}

49
src/main/java/com/corundumstudio/socketio/demo/time/TimeClient.java

@ -0,0 +1,49 @@
package com.corundumstudio.socketio.demo.time;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
/**
* TODO :
*
* @author ye.jf
* @version 0.1.0
* @date 2022/1/3 13:54
* @copyright Wonhigh Information Technology (Shenzhen) Co.,Ltd.
*/
public class TimeClient {
public static void main(String[] args) throws Exception {
//String host = args[0];
//int port = Integer.parseInt(args[1]);
String host = "localhost";
int port = 8080;
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
Bootstrap b = new Bootstrap(); // (1)
b.group(workerGroup); // (2)
b.channel(NioSocketChannel.class); // (3)
b.option(ChannelOption.SO_KEEPALIVE, true); // (4)
b.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeClientHandler());
}
});
// Start the client.
ChannelFuture f = b.connect(host, port).sync(); // (5)
// Wait until the connection is closed.
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
}
}
}

68
src/main/java/com/corundumstudio/socketio/demo/time/TimeClientHandler.java

@ -0,0 +1,68 @@
package com.corundumstudio.socketio.demo.time;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.util.Date;
/**
* TODO :
*
* @author ye.jf
* @version 0.1.0
* @date 2022/1/3 14:04
* @copyright Wonhigh Information Technology (Shenzhen) Co.,Ltd.
*/
public class TimeClientHandler extends ChannelInboundHandlerAdapter {
/**
* 以下这种实现方案有可能出现越界异常,并且在处理大数据量时,会出现数据碎片化 例如 发送方三个数据包 abc def ghi, 接收时变成 ab cdefg h i
*
* 简单的解决方案是创建一个内部累积缓冲区并等待直到所有4个字节都被接收到内部缓冲区
* @param ctx
* @param msg
*/
/*@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf m = (ByteBuf) msg; // (1)
try {
long currentTimeMillis = (m.readUnsignedInt() - 2208988800L) * 1000L;
System.out.println(new Date(currentTimeMillis));
ctx.close();
} finally {
m.release();
}
}*/
private ByteBuf buf;
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
buf = ctx.alloc().buffer(4); // (1)
}
@Override
public void handlerRemoved(ChannelHandlerContext ctx) {
buf.release(); // (1)
buf = null;
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf m = (ByteBuf) msg;
buf.writeBytes(m); // (2)
m.release();
if (buf.readableBytes() >= 4) { // (3)
long currentTimeMillis = (buf.readUnsignedInt() - 2208988800L) * 1000L;
System.out.println(new Date(currentTimeMillis));
ctx.close();
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}

65
src/main/java/com/corundumstudio/socketio/demo/time/TimeServer.java

@ -0,0 +1,65 @@
package com.corundumstudio.socketio.demo.time;
import com.corundumstudio.socketio.demo.discard.DiscardServerHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
/**
* TODO :
*
* @author ye.jf
* @version 0.1.0
* @date 2022/1/1 14:18
* @copyright Wonhigh Information Technology (Shenzhen) Co.,Ltd.
*/
public class TimeServer {
private int port;
public TimeServer(int port) {
this.port = port;
}
public void run() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap(); // (2)
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // (3)
.childHandler(new ChannelInitializer<SocketChannel>() { // (4)
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new TimeServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG, 128) // (5)
.childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
// Bind and start to accept incoming connections.
ChannelFuture f = b.bind(port).sync(); // (7)
// Wait until the server socket is closed.
// In this example, this does not happen, but you can do that to gracefully
// shut down your server.
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
public static void main(String[] args) throws Exception {
int port = 8080;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
}
new TimeServer(port).run();
}
}

44
src/main/java/com/corundumstudio/socketio/demo/time/TimeServerHandler.java

@ -0,0 +1,44 @@
package com.corundumstudio.socketio.demo.time;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.sql.Time;
/**
* TODO :
*
* @author ye.jf
* @version 0.1.0
* @date 2022/1/1 14:55
* @copyright Wonhigh Information Technology (Shenzhen) Co.,Ltd.
*/
public class TimeServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(final ChannelHandlerContext ctx) { // (1)
//忽略任何接收到的数据,建立连接后立即返回
System.out.println("TimeServerHandler channelActive" );
final ByteBuf time = ctx.alloc().buffer(4); // (2)
time.writeInt((int) (System.currentTimeMillis() / 1000L + 2208988800L));
final ChannelFuture f = ctx.writeAndFlush(time); // (3)
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) {
assert f == future;
ctx.close();
}
}); // (4)
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
Loading…
Cancel
Save