简介
本文介绍如何使用Netty进行TCP的收发数据。
依赖及配置
主要依赖
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.36.Final</version> </dependency>
pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.4.4</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.example</groupId> <artifactId>demo</artifactId> <version>0.0.1-SNAPSHOT</version> <name>demo</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>4.1.36.Final</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.76</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.20</version> <scope>provided</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
application.yml
server: port: 8081
服务端
监听端口、发数据的类
package com.example.demo.tcp.server; import io.netty.bootstrap.ServerBootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldPrepender; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.util.CharsetUtil; import lombok.extern.slf4j.Slf4j; @Slf4j public class TCPServer { public static void bind(int port) { Thread thread = new Thread(new Runnable() { @Override public void run() { //服务端要建立两个group,一个负责接收客户端的连接,一个负责处理数据传输 //连接处理group EventLoopGroup boss = new NioEventLoopGroup(); //事件处理group EventLoopGroup worker = new NioEventLoopGroup(); ServerBootstrap bootstrap = new ServerBootstrap(); // 绑定处理group bootstrap.group(boss, worker).channel(NioServerSocketChannel.class) //保持连接的数目 .option(ChannelOption.SO_BACKLOG, 1024) //有数据立即发送 .option(ChannelOption.TCP_NODELAY, true) //保持连接 .childOption(ChannelOption.SO_KEEPALIVE, true) //处理新连接 .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel sc) throws Exception { // 增加任务处理 ChannelPipeline p = sc.pipeline(); p.addLast( new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4), new LengthFieldPrepender(4), new StringDecoder(CharsetUtil.UTF_8), new StringEncoder(CharsetUtil.UTF_8), //心跳检测,读超时,写超时,读写超时 //new IdleStateHandler(5, 0, 0, TimeUnit.SECONDS), //自定义的处理器 new TCPServerHandler() ); } }); //绑定端口,同步等待成功 ChannelFuture future; try { future = bootstrap.bind(port).sync(); if (future.isSuccess()) { log.info("协议==> TCP服务端启动成功(端口:{})", port); } else { log.info("协议==> TCP服务端启动失败(端口:{})", port); } //等待服务监听端口关闭,就是由于这里会将线程阻塞,导致无法发送信息,所以我这里开了线程 future.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { //优雅地退出,释放线程池资源 boss.shutdownGracefully(); worker.shutdownGracefully(); } } }); thread.start(); } public static void sendMsg(ChannelHandlerContext ctx, String message) { ByteBuf byteBuf = Unpooled.copiedBuffer(message, CharsetUtil.UTF_8); ChannelFuture future = ctx.writeAndFlush(byteBuf); future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { log.info("协议==> TCP服务端发送成功。"); } else { log.error("协议==> TCP服务端发送失败。内容:{}", message); } } }); } // public static void main(String[] args) { // Server server = new Server(61000); // } }
Handler
package com.example.demo.tcp.server; import com.example.demo.common.cache.TCPCache; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import lombok.extern.slf4j.Slf4j; import java.net.InetSocketAddress; import java.util.Map; @Slf4j public class TCPServerHandler extends ChannelInboundHandlerAdapter { // 客户端与服务端创建连接的时候调用 @Override public void channelActive(ChannelHandlerContext context) throws Exception { log.info("协议==> TCP:客户端({})与服务端(本机)({})连接成功", context.channel().remoteAddress(), context.channel().localAddress()); } // 客户端与服务端断开连接时调用 @Override public void channelInactive(ChannelHandlerContext context) throws Exception { log.info("协议==> TCP:客户端({})与服务端(本机)({})连接关闭", context.channel().remoteAddress(), context.channel().localAddress()); for (Map.Entry<String, ChannelHandlerContext> entry : TCPCache.INSTANCE.getClientInfoContext().entrySet()) { if (context.equals(entry.getValue())) { TCPCache.INSTANCE.getClientInfoContext().remove(entry.getKey()); } } } // 服务端接收客户端发送过来的数据结束之后调用 @Override public void channelReadComplete(ChannelHandlerContext context) throws Exception { context.flush(); // System.out.println("信息接收完毕..."); } // 工程出现异常的时候调用 @Override public void exceptionCaught(ChannelHandlerContext context, Throwable cause) throws Exception { cause.printStackTrace(); context.close(); } // 服务端处理客户端websocket请求的核心方法,这里接收了客户端发来的信息 @Override public void channelRead(ChannelHandlerContext context, Object info) throws Exception { InetSocketAddress socket = (InetSocketAddress) context.channel().remoteAddress(); String clientIP = socket.getAddress().getHostAddress(); String clientPort = String.valueOf(socket.getPort()); log.info("TCP服务端(本机)收到客户端({}:{})的数据:{}", clientIP, clientPort, info); String key = (String) info; TCPCache.INSTANCE.getClientInfoContext().computeIfAbsent(key, k -> context); } }
配置类
package com.example.demo.config; import com.example.demo.tcp.server.TCPServer; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.stereotype.Component; @Component public class TCPConfig implements ApplicationRunner { @Override public void run(ApplicationArguments args) throws Exception { TCPServer.bind(11000); // TCPClient.connectToServer("127.0.0.1", 11000); } }
客户端
建立连接、发数据的类
package com.example.demo.tcp.client; import com.example.demo.common.cache.TCPCache; import io.netty.bootstrap.Bootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.LengthFieldBasedFrameDecoder; import io.netty.handler.codec.LengthFieldPrepender; import io.netty.handler.codec.string.StringDecoder; import io.netty.handler.codec.string.StringEncoder; import io.netty.util.CharsetUtil; import lombok.extern.slf4j.Slf4j; @Slf4j public class TCPClient { public static Bootstrap bootstrap = getBootstrap(); // 初始化Bootstrap public static Bootstrap getBootstrap() { EventLoopGroup group = new NioEventLoopGroup(); Bootstrap b = new Bootstrap(); b.group(group).channel(NioSocketChannel.class); b.handler(new ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4)); pipeline.addLast("frameEncoder", new LengthFieldPrepender(4)); pipeline.addLast("decoder", new StringDecoder(CharsetUtil.UTF_8)); pipeline.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8)); pipeline.addLast("handler", new TCPClientHandler()); } }); b.option(ChannelOption.SO_KEEPALIVE, true); return b; } public static Channel connectToServer(String host, int port) { Channel channel = null; try { channel = bootstrap.connect(host, port).sync().channel(); } catch (Exception e) { log.error("协议==> TCP客户端连接服务({}:{})失败", host, port, e); return null; } TCPCache.INSTANCE.getServerInfoContext().put(host + ":" + port, channel); return channel; } public static void sendMsg(String host, int port, Object data) throws Exception { Channel channel = TCPCache.INSTANCE.getServerInfoContext().get(host + ":" + port); if (channel != null) { channel.writeAndFlush(data).sync(); } else { log.warn("协议==> TCP客户端消息发送失败,因为连接尚未建立!"); } } }
Handler
package com.example.demo.tcp.client; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import lombok.extern.slf4j.Slf4j; import java.net.InetSocketAddress; import java.nio.charset.StandardCharsets; @Slf4j public class TCPClientHandler extends ChannelInboundHandlerAdapter { private static String clientName = "客户端1"; @Override public void channelActive(ChannelHandlerContext context) throws Exception { log.info("协议==> TCP:客户端(本机)({})与服务器({})连接成功", context.channel().localAddress(), context.channel().remoteAddress()); ByteBuf buf = Unpooled.copiedBuffer(clientName, StandardCharsets.UTF_8); context.writeAndFlush(buf); } @Override public void channelRead(ChannelHandlerContext context, Object msg) throws Exception { InetSocketAddress socket = (InetSocketAddress) context.channel().remoteAddress(); String clientIP = socket.getAddress().getHostAddress(); String clientPort = String.valueOf(socket.getPort()); log.info("TCP客户端(本机)收到服务端({}:{})的数据:{}", clientIP, clientPort, msg); } }
配置类
package com.example.demo.config; import com.example.demo.tcp.client.TCPClient; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.stereotype.Component; @Component public class TCPConfig implements ApplicationRunner { @Override public void run(ApplicationArguments args) throws Exception { // TCPServer.bind(11000); TCPClient.connectToServer("127.0.0.1", 11000); } }
公共代码
保存客户端/服务端 与 TCP连接的对应关系
package com.example.demo.common.cache; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; public enum TCPCache { INSTANCE; private Map<String, ChannelHandlerContext> clientInfoContext = new ConcurrentHashMap<>(); private Map<String, Channel> serverInfoContext = new ConcurrentHashMap<>(); public Map<String, ChannelHandlerContext> getClientInfoContext() { return clientInfoContext; } public Map<String, Channel> getServerInfoContext() { return serverInfoContext; } }
测试代码
客户端Controller
package com.example.demo.controller; import com.example.demo.tcp.client.TCPClient; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @RestController @RequestMapping("client") public class TCPClientController { @PostMapping("send") public String send(String host, Integer port, String msg) { try { TCPClient.sendMsg(host, port, msg); } catch (Exception e) { e.printStackTrace(); } return "success"; } }
服务端Controller
package com.example.demo.controller; import com.example.demo.common.cache.TCPCache; import io.netty.channel.ChannelHandlerContext; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @RestController @RequestMapping("server") public class TCPServerController { @PostMapping("send") public String send(String clientName, String msg) { ChannelHandlerContext context = TCPCache.INSTANCE.getClientInfoContext().get(clientName); context.writeAndFlush(msg); return "success"; } }
测试
启动
以8080端口,启动服务器;以8081端口,启动客户端
服务端打印
2021-04-20 09:56:08.243 INFO 78272 --- [ Thread-5] com.example.demo.tcp.server.TCPServer : 协议==> TCP服务端启动成功(端口:11000) 2021-04-20 09:56:23.311 INFO 78272 --- [ntLoopGroup-3-1] c.e.demo.tcp.server.TCPServerHandler : 协议==> TCP:客户端(/127.0.0.1:51396)与服务端(本机)(/127.0.0.1:11000)连接成功 2021-04-20 09:56:23.331 INFO 78272 --- [ntLoopGroup-3-1] c.e.demo.tcp.server.TCPServerHandler : TCP服务端(本机)收到客户端(127.0.0.1:51396)的数据:客户端1
客户端打印
2021-04-20 09:56:23.298 INFO 79792 --- [ntLoopGroup-2-1] c.e.demo.tcp.client.TCPClientHandler : 协议==> TCP:客户端(本机)(/127.0.0.1:51396)与服务器(/127.0.0.1:11000)连接成功
测试
请求客户端接口:http://localhost:8081/client/send?host=127.0.0.1&port=11000&msg=Message from 客户端1
服务端打印
2021-04-20 09:56:33.283 INFO 78272 --- [ntLoopGroup-3-1] c.e.demo.tcp.server.TCPServerHandler : TCP服务端(本机)收到客户端(127.0.0.1:51396)的数据:Message from 客户端1
请求服务端接口:http://localhost:8080/server/send?clientName=客户端1&msg=Message from 服务器
客户端打印
2021-04-20 09:56:50.765 INFO 79792 --- [ntLoopGroup-2-1] c.e.demo.tcp.client.TCPClientHandler : TCP客户端(本机)收到服务端(127.0.0.1:11000)的数据:Message from 服务器
请先
!