Netty
wzg 9/1/2022 Netty
# IO
# Reactor 和 Proactor
# Netty
# 客户端请求实体类
@AllArgsConstructor
@Getter
@NoArgsConstructor
@Builder
@ToString
public class RpcRequest {
private String interfaceName;
private String methodName;
}
1
2
3
4
5
6
7
8
9
2
3
4
5
6
7
8
9
# 服务端响应实体类
@AllArgsConstructor
@Getter
@NoArgsConstructor
@Builder
@ToString
public class RpcResponse {
private String message;
}
1
2
3
4
5
6
7
8
2
3
4
5
6
7
8
# 初始化服务端
public class NettyServer {
private static final Logger logger = LoggerFactory.getLogger(NettyServer.class);
private final int port;
private NettyServer(int port) {
this.port = port;
}
private void run() {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
KryoSerializer kryoSerializer = new KryoSerializer();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
// TCP默认开启了 Nagle 算法,该算法的作用是尽可能的发送大数据快,减少网络传输。TCP_NODELAY 参数的作用就是控制是否启用 Nagle 算法。
.childOption(ChannelOption.TCP_NODELAY, true)
// 是否开启 TCP 底层心跳机制
.childOption(ChannelOption.SO_KEEPALIVE, true)
//表示系统用于临时存放已完成三次握手的请求的队列的最大长度,如果连接建立频繁,服务器处理创建新连接较慢,可以适当调大这个参数
.option(ChannelOption.SO_BACKLOG, 128)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new NettyKryoDecoder(kryoSerializer, RpcRequest.class));
ch.pipeline().addLast(new NettyKryoEncoder(kryoSerializer, RpcResponse.class));
ch.pipeline().addLast(new NettyServerHandler());
}
});
// 绑定端口,同步等待绑定成功
ChannelFuture f = b.bind(port).sync();
// 等待服务端监听端口关闭
f.channel().closeFuture().sync();
} catch (InterruptedException e) {
logger.error("occur exception when start server:", e);
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# 自定义 ChannelHandler 处理客户端消息
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
private static final Logger logger = LoggerFactory.getLogger(NettyServerHandler.class);
private static final AtomicInteger atomicInteger = new AtomicInteger(1);
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
try {
RpcRequest rpcRequest = (RpcRequest) msg;
logger.info("server receive msg: [{}] ,times:[{}]", rpcRequest, atomicInteger.getAndIncrement());
RpcResponse messageFromServer = RpcResponse.builder().message("message from server").build();
ChannelFuture f = ctx.writeAndFlush(messageFromServer);
f.addListener(ChannelFutureListener.CLOSE);
} finally {
ReferenceCountUtil.release(msg);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
logger.error("server catch exception",cause);
ctx.close();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
# 初始化客户端
public class NettyClient {
private static final Logger logger = LoggerFactory.getLogger(NettyClient.class);
private final String host;
private final int port;
private static final Bootstrap b;
public NettyClient(String host, int port) {
this.host = host;
this.port = port;
}
// 初始化相关资源比如 EventLoopGroup, Bootstrap
static {
EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
b = new Bootstrap();
KryoSerializer kryoSerializer = new KryoSerializer();
b.group(eventLoopGroup)
.channel(NioSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
// 连接的超时时间,超过这个时间还是建立不上的话则代表连接失败
// 如果 15 秒之内没有发送数据给服务端的话,就发送一次心跳请求
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 5000)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
/*
自定义序列化编解码器
*/
// RpcResponse -> ByteBuf
ch.pipeline().addLast(new NettyKryoDecoder(kryoSerializer, RpcResponse.class));
// ByteBuf -> RpcRequest
ch.pipeline().addLast(new NettyKryoEncoder(kryoSerializer, RpcRequest.class));
ch.pipeline().addLast(new NettyClientHandler());
}
});
}
/**
* 发送消息到服务端
*
* @param rpcRequest 消息体
* @return 服务端返回的数据
*/
public RpcResponse sendMessage(RpcRequest rpcRequest) {
try {
ChannelFuture f = b.connect(host, port).sync();
logger.info("client connect {}", host + ":" + port);
Channel futureChannel = f.channel();
logger.info("send message");
if (futureChannel != null) {
futureChannel.writeAndFlush(rpcRequest).addListener(future -> {
if (future.isSuccess()) {
logger.info("client send message: [{}]", rpcRequest.toString());
} else {
logger.error("Send failed:", future.cause());
}
});
//阻塞等待 ,直到Channel关闭
futureChannel.closeFuture().sync();
// 将服务端返回的数据也就是RpcResponse对象取出
AttributeKey<RpcResponse> key = AttributeKey.valueOf("rpcResponse");
return futureChannel.attr(key).get();
}
} catch (InterruptedException e) {
logger.error("occur exception when connect server:", e);
}
return null;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# 自定义 ChannelHandler 处理服务端消息
public class NettyClientHandler extends ChannelInboundHandlerAdapter {
private static final Logger logger = LoggerFactory.getLogger(NettyClientHandler.class);
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
try {
RpcResponse rpcResponse = (RpcResponse) msg;
logger.info("client receive msg: [{}]", rpcResponse.toString());
// 声明一个 AttributeKey 对象
AttributeKey<RpcResponse> key = AttributeKey.valueOf("rpcResponse");
// 将服务端的返回结果保存到 AttributeMap 上,AttributeMap 可以看作是一个Channel的共享数据源
// AttributeMap的key是AttributeKey,value是Attribute
ctx.channel().attr(key).set(rpcResponse);
ctx.channel().close();
} finally {
ReferenceCountUtil.release(msg);
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
logger.error("client caught exception", cause);
ctx.close();
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# 自定义编码器
/**
* 自定义编码器。
* <p>
* 网络传输需要通过字节流来实现,ByteBuf 可以看作是 Netty 提供的字节数据的容器,使用它会让我们更加方便地处理字节数据。
*
*/
@AllArgsConstructor
public class NettyKryoEncoder extends MessageToByteEncoder<Object> {
private final Serializer serializer;
private final Class<?> genericClass;
/**
* 将对象转换为字节码然后写入到 ByteBuf 对象中
*/
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, Object o, ByteBuf byteBuf) {
if (genericClass.isInstance(o)) {
// 1. 将对象转换为byte
byte[] body = serializer.serialize(o);
// 2. 读取消息的长度
int dataLength = body.length;
// 3.写入消息对应的字节数组长度,writerIndex 加 4
byteBuf.writeInt(dataLength);
//4.将字节数组写入 ByteBuf 对象中
byteBuf.writeBytes(body);
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
# 自定义解码器
/**
* 自定义解码器。
*
*/
@AllArgsConstructor
@Slf4j
public class NettyKryoDecoder extends ByteToMessageDecoder {
private final Serializer serializer;
private final Class<?> genericClass;
/**
* Netty传输的消息长度也就是对象序列化后对应的字节数组的大小,存储在 ByteBuf 头部
*/
private static final int BODY_LENGTH = 4;
/**
* 解码 ByteBuf 对象
*
* @param ctx 解码器关联的 ChannelHandlerContext 对象
* @param in "入站"数据,也就是 ByteBuf 对象
* @param out 解码之后的数据对象需要添加到 out 对象里面
*/
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
//1.byteBuf中写入的消息长度所占的字节数已经是4了,所以 byteBuf 的可读字节必须大于 4,
if (in.readableBytes() >= BODY_LENGTH) {
//2.标记当前readIndex的位置,以便后面重置readIndex 的时候使用
in.markReaderIndex();
//3.读取消息的长度
//注意: 消息长度是encode的时候我们自己写入的,参见 NettyKryoEncoder 的encode方法
int dataLength = in.readInt();
//4.遇到不合理的情况直接 return
if (dataLength < 0 || in.readableBytes() < 0) {
log.error("data length or byteBuf readableBytes is not valid");
return;
}
//5.如果可读字节数小于消息长度的话,说明是不完整的消息,重置readIndex
if (in.readableBytes() < dataLength) {
in.resetReaderIndex();
return;
}
// 6.走到这里说明没什么问题了,可以序列化了
byte[] body = new byte[dataLength];
in.readBytes(body);
// 将bytes数组转换为我们需要的对象
Object obj = serializer.deserialize(body, genericClass);
out.add(obj);
log.info("successful decode ByteBuf to Object");
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# 自定义序列化接口
public interface Serializer {
/**
* 序列化
*
* @param obj 要序列化的对象
* @return 字节数组
*/
byte[] serialize(Object obj);
/**
* 反序列化
*
* @param bytes 序列化后的字节数组
* @param clazz 类
* @param <T>
* @return 反序列化的对象
*/
<T> T deserialize(byte[] bytes, Class<T> clazz);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 实现序列化接口
public class KryoSerializer implements Serializer {
/**
* 由于 Kryo 不是线程安全的。每个线程都应该有自己的 Kryo,Input 和 Output 实例。
* 所以,使用 ThreadLocal 存放 Kryo 对象
*/
private static final ThreadLocal<Kryo> kryoThreadLocal = ThreadLocal.withInitial(() -> {
Kryo kryo = new Kryo();
kryo.register(RpcResponse.class);
kryo.register(RpcRequest.class);
kryo.setReferences(true);//默认值为true,是否关闭注册行为,关闭之后可能存在序列化问题,一般推荐设置为 true
kryo.setRegistrationRequired(false);//默认值为false,是否关闭循环引用,可以提高性能,但是一般不推荐设置为 true
return kryo;
});
@Override
public byte[] serialize(Object obj) {
try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
Output output = new Output(byteArrayOutputStream)) {
Kryo kryo = kryoThreadLocal.get();
// Object->byte:将对象序列化为byte数组
kryo.writeObject(output, obj);
kryoThreadLocal.remove();
return output.toBytes();
} catch (Exception e) {
throw new SerializeException("序列化失败");
}
}
@Override
public <T> T deserialize(byte[] bytes, Class<T> clazz) {
try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bytes);
Input input = new Input(byteArrayInputStream)) {
Kryo kryo = kryoThreadLocal.get();
// byte->Object:从byte数组中反序列化出对对象
Object o = kryo.readObject(input, clazz);
kryoThreadLocal.remove();
return clazz.cast(o);
} catch (Exception e) {
throw new SerializeException("反序列化失败");
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43