项目目标   基于 junx-netty,结合 springboot,开发一个支持长连接的服务端和客户端,长连接支持客户端服务端双向心跳检测、网络拆包粘包处理,能够正常发送和接收消息。 首先在项目中引入 junx-netty: io.github.junxworks junx-netty 1.0.6 服务器端实现 ..

junx-netty 结合 Springboot 快速开发长连接服务端 + 客户端

本贴最后更新于 317 天前,其中的信息可能已经物是人非

项目目标

  基于 junx-netty,结合 springboot,开发一个支持长连接的服务端和客户端,长连接支持客户端服务端双向心跳检测、网络拆包粘包处理,能够正常发送和接收消息。
首先在项目中引入 junx-netty:

<dependency>
  <groupId>io.github.junxworks</groupId>
  <artifactId>junx-netty</artifactId>
  <version>1.0.6</version>
</dependency>

服务器端实现

  1、springboot 的启动类:

@SpringBootApplication
public class ServerApplication {

	public static void main(String[] args) {
		SpringApplication.run(ServerApplication.class, args);
	}
}

  2、bean 配置文件 Config:

import java.io.IOException;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import io.github.junxworks.junx.core.lifecycle.Service;
import io.github.junxworks.junx.netty.NettyServer;
import io.github.junxworks.junx.netty.ServerConfig;
import io.github.junxworks.junx.netty.heartbeat.CommonServerHeartbeatHandlerFactory;
import io.github.junxworks.junx.netty.initializer.CommonChannelInitializer;

@Configuration
public class Config {
	
	@Bean(name = "nettyServer", initMethod = "start", destroyMethod = "stop")
	public Service nettyServer( ServerEventHandler handler) throws IOException {
		ServerConfig config = new ServerConfig(); //服务器配置可以通过spring自定义配置来实现,百度很多例子,搜springboot自定义配置
		config.setPort(8080);
		
		//标准的通道初始化器,内部封装了基于长度帧的粘包机制、心跳检测
		CommonChannelInitializer initializer = new CommonChannelInitializer(handler); //handler是通过外部注入的,实现代码见ServerEventHandler
		//下面三个可以根据具体情况配置,一般常用AllIdle即可,因为会加入到定时任务中,所以不宜全部设置。
		initializer.setAllIdle(config.getAllIdle());   //设置读、写超时
		initializer.setReadIdle(config.getReadIdle());  //设置读超时
		initializer.setWriteIdle(config.getWriteIdle());  //设置写超时   
		initializer.setHeartbeatHandlerFactory(new CommonServerHeartbeatHandlerFactory()); //设置服务器端心跳检测
		NettyServer server = new NettyServer(config, initializer);
		server.setName("TestServer");
		return server;
	}
}

  3、服务器端业务逻辑实现类 ServerEventHandler:

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import io.github.junxworks.junx.core.util.ExceptionUtils;
import io.github.junxworks.junx.core.util.StringUtils;
import io.github.junxworks.junx.netty.message.IoRequest;
import io.github.junxworks.junx.netty.message.IoResponse;
import io.github.junxworks.junx.netty.message.MessageConstants;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

@Component   //自动注入
@Sharable   //handler一定是无状态的,支持多线程共享
public class ServerEventHandler extends ChannelInboundHandlerAdapter {
	private static final Logger logger = LoggerFactory.getLogger(ServerEventHandler.class);

	private static final byte[] data = "I`m Server!!!".getBytes();

	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		IoRequest req = new IoRequest().readFromBytes((byte[]) msg); //IoRequest对象是封装的标准序列化请求对象,支持属性扩展
		System.out.println(new String(req.getData()));
		IoResponse res = new IoResponse();//IoRequest对象是封装的标准序列化应答对象,支持属性扩展
		res.setRequestId(req.getId());
		res.setServerAddr(ctx.channel().localAddress().toString());
		res.setData(data);
		ctx.writeAndFlush(res.toBytes());
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		if (ctx.channel().isActive()) {
			IoResponse response = new IoResponse();
			response.setRequestId("unknown");
			response.setStatusCode(MessageConstants.STATUS_CODE_SERVER_INTERNAL_ERROR);
			String errorMsg = ExceptionUtils.getCauseMessage(cause);
			response.setData(StringUtils.isNull(errorMsg) ? "Server internal error.".getBytes() : errorMsg.getBytes());
			response.setServerAddr(ctx.channel().localAddress().toString());
			ctx.writeAndFlush(response.toBytes());
		}
		logger.error("Netty handler catch exption.", cause);
	}
}

  4、服务器端测试类编写,基于 Junit 的测试类:

@RunWith(SpringRunner.class)
@SpringBootTest(classes = ServerApplication.class)
public class ServerTest {

	@Test
	public void serverTest() throws Exception {
		new CountDownLatch(1).await(); //block住主线程
	}

}

  执行 ServerTest,将会看到如下提示:
imagepng
服务器端是以非守护线程执行的,不会 block 住主线程。

客户端实现

  客户端支持长连接,客户端的连接获取是基于连接池的方式获取,连接的调用支持同步和异步调用,同步调用会 block 住当前线程,直到服务器返回。异步调用的回调方法,是由 netty 的 worker 线程调用,因此不能执行耗时太长的操作。客户端的代码如下:
  1、ClientTest 类,其中 syncRequestTest 为同步请求服务器端,asyncRequestTest 为异步请求服务器端。

import java.net.InetSocketAddress;
import org.junit.Test;
import io.github.junxworks.junx.netty.call.CallFuture;
import io.github.junxworks.junx.netty.call.CallUtils;
import io.github.junxworks.junx.netty.heartbeat.CommonClientHeartbeatHandlerFactory;
import io.github.junxworks.junx.netty.initializer.CommonChannelInitializer;
import io.github.junxworks.junx.netty.message.IoRequest;
import io.github.junxworks.junx.netty.message.IoResponse;
import io.github.junxworks.junx.netty.pool.NettyChannelPool;
import io.github.junxworks.junx.netty.pool.NettyChannelPoolManager;
import io.netty.channel.Channel;

public class ClientTest {

	private String host = "localhost";

	private int port = 8080;

	private byte[] data = "michael".getBytes();

	/**
	 * 同步调用测试
	 */
	@Test
	public void syncRequestTest() throws Exception {
		NettyChannelPoolManager poolManager = new NettyChannelPoolManager();
		poolManager.start();
		InetSocketAddress serverAddr = InetSocketAddress.createUnresolved(host, port);
		NettyChannelPool pool = poolManager.getPool(serverAddr);
		if (pool == null) {
			CommonChannelInitializer initializer = new CommonChannelInitializer(new ClientChannelHandler()); //通用通道初始化器跟server的类似
			initializer.setHeartbeatHandlerFactory(new CommonClientHeartbeatHandlerFactory());//长连接,客户端心跳检测
			pool = poolManager.getPool(serverAddr, initializer);
		}
		Channel ch = pool.acquire(1000);
		try {
			IoRequest req = new IoRequest();
			req.setRequestTimeout(1000);
			req.setData(data);
			CallFuture<IoResponse> syn = CallUtils.call(ch, req);
			IoResponse res = syn.get(); //线程同步等待应答
			System.out.println("[Sync] Handler request id:" + res.getRequestId() + " result:" + new String(res.getData()));
		} finally {
			pool.release(ch);
		}
		poolManager.stop();
	}

	/**
	 * 异步调用测试
	 * @throws Exception 
	 */
	@Test
	public void asyncRequestTest() throws Exception {
		NettyChannelPoolManager poolManager = new NettyChannelPoolManager();
		poolManager.start();
		InetSocketAddress serverAddr = InetSocketAddress.createUnresolved(host, port);
		NettyChannelPool pool = poolManager.getPool(serverAddr);
		if (pool == null) {
			CommonChannelInitializer initializer = new CommonChannelInitializer(new ClientChannelHandler()); //通用通道初始化器跟server的类似
			initializer.setHeartbeatHandlerFactory(new CommonClientHeartbeatHandlerFactory());//长连接,客户端心跳检测
			pool = poolManager.getPool(serverAddr, initializer);
		}
		Channel ch = pool.acquire(1000);
		try {
			IoRequest req = new IoRequest();
			req.setRequestTimeout(1000);
			req.setData(data);
			CallUtils.call(ch, req, new AsyncCallback(req)); //异步回调
		} finally {
			pool.release(ch);
		}
		Thread.sleep(1000); //block主线程
		poolManager.stop();
	}

}

  2、客户端业务处理类 ClientChannelHandler:

import io.github.junxworks.junx.netty.call.ReferenceManager;
import io.github.junxworks.junx.netty.message.IoResponse;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

@Sharable  //业务处理类必须无状态
public class ClientChannelHandler extends ChannelInboundHandlerAdapter {

	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		IoResponse res = (IoResponse) new IoResponse().readFromBytes((byte[]) msg);
		ReferenceManager.future(res.getUUID(), ctx, res);
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		cause.printStackTrace();
		ctx.close();
	}
}

  3、客户端异步调用的回调函数实现类 AsyncCallback:

import io.github.junxworks.junx.core.util.StringUtils;
import io.github.junxworks.junx.netty.call.Callback;
import io.github.junxworks.junx.netty.message.IoRequest;
import io.github.junxworks.junx.netty.message.IoResponse;
import io.netty.channel.ChannelHandlerContext;

public class AsyncCallback implements Callback<IoResponse> {

	private IoRequest req;

	public AsyncCallback(IoRequest req) {
		this.req = req;
	}

	@Override
	public void callback(ChannelHandlerContext ctx, IoResponse t) {
		System.out.println("[async] Handler request id:" + t.getRequestId() + " result:" + new String(t.getData()));
	}

	@Override
	public void dead() {
		System.out.println(StringUtils.format("Request \"%s\" is dead.", req.getId()));
	}

}

  demo 的源码可以参考:https://github.com/junxworks/junx/tree/master/junx-sample/src/main/java/io/github/junxworks/junx/test/netty

  • B3log

    B3log 是一个开源组织,名字来源于“Bulletin Board Blog”缩写,目标是将独立博客与论坛结合,形成一种新的网络社区体验,详细请看 B3log 构思。目前 B3log 已经开源了多款产品:PipeSoloSymWide 等。

    1561 引用 • 3495 回帖 • 554 关注
  • Netty

    Netty 是一个基于 NIO 的客户端 - 服务器编程框架,使用 Netty 可以让你快速、简单地开发出一个可维护、高性能的网络应用,例如实现了某种协议的客户、服务端应用。

    23 引用 • 24 回帖
  • Java

    Java 是一种可以撰写跨平台应用软件的面向对象的程序设计语言,是由 Sun Microsystems 公司于 1995 年 5 月推出的。Java 技术具有卓越的通用性、高效性、平台移植性和安全性。

    2464 引用 • 7872 回帖 • 863 关注
  • Junx
    2 引用
回帖
请输入回帖内容...