junx-netty结合Springboot快速开发长连接服务端+客户端

  |   0 评论   |   0 浏览

项目目标

  基于junx-netty,结合springboot,开发一个支持长连接的服务端和客户端,长连接支持客户端服务端双向心跳检测、网络拆包粘包处理,能够正常发送和接收消息。
首先在项目中引入junx-netty:

<dependency>
  <groupId>io.github.junxworks</groupId>
  <artifactId>junx-netty</artifactId>
  <version>1.0.12</version>
</dependency>
源码:https://gitee.com/junxworks/junx.git

服务器端实现

  1、springboot的启动类:

@SpringBootApplication
public class ServerApplication {

	public static void main(String[] args) {
		SpringApplication.run(ServerApplication.class, args);
	}
}

  2、bean配置文件Config:

import java.io.IOException;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import io.github.junxworks.junx.core.lifecycle.Service;
import io.github.junxworks.junx.netty.NettyServer;
import io.github.junxworks.junx.netty.ServerConfig;
import io.github.junxworks.junx.netty.heartbeat.CommonServerHeartbeatHandlerFactory;
import io.github.junxworks.junx.netty.initializer.CommonChannelInitializer;

@Configuration
public class Config {

	@Bean(name = "nettyServer", initMethod = "start", destroyMethod = "stop")
	public Service nettyServer( ServerEventHandler handler) throws IOException {
		ServerConfig config = new ServerConfig(); //服务器配置可以通过spring自定义配置来实现,百度很多例子,搜springboot自定义配置
		config.setPort(8080);
	
		//标准的通道初始化器,内部封装了基于长度帧的粘包机制、心跳检测
		CommonChannelInitializer initializer = new CommonChannelInitializer(handler); //handler是通过外部注入的,实现代码见ServerEventHandler
		//下面三个可以根据具体情况配置,一般常用AllIdle即可,因为会加入到定时任务中,所以不宜全部设置。
		initializer.setAllIdle(config.getAllIdle());   //设置读、写超时
		initializer.setReadIdle(config.getReadIdle());  //设置读超时
		initializer.setWriteIdle(config.getWriteIdle());  //设置写超时   
		initializer.setHeartbeatHandlerFactory(new CommonServerHeartbeatHandlerFactory()); //设置服务器端心跳检测
		NettyServer server = new NettyServer(config, initializer);
		server.setName("TestServer");
		return server;
	}
}

  3、服务器端业务逻辑实现类ServerEventHandler:

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import io.github.junxworks.junx.core.util.ExceptionUtils;
import io.github.junxworks.junx.core.util.StringUtils;
import io.github.junxworks.junx.netty.message.IoRequest;
import io.github.junxworks.junx.netty.message.IoResponse;
import io.github.junxworks.junx.netty.message.MessageConstants;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

@Component   //自动注入
@Sharable   //handler一定是无状态的,支持多线程共享
public class ServerEventHandler extends ChannelInboundHandlerAdapter {
	private static final Logger logger = LoggerFactory.getLogger(ServerEventHandler.class);

	private static final byte[] data = "I`m Server!!!".getBytes();

	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		IoRequest req = new IoRequest().readFromBytes((byte[]) msg); //IoRequest对象是封装的标准序列化请求对象,支持属性扩展
		System.out.println(new String(req.getData()));
		IoResponse res = new IoResponse();//IoRequest对象是封装的标准序列化应答对象,支持属性扩展
		res.setRequestId(req.getId());
		res.setServerAddr(ctx.channel().localAddress().toString());
		res.setData(data);
		ctx.writeAndFlush(res.toBytes());
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		if (ctx.channel().isActive()) {
			IoResponse response = new IoResponse();
			response.setRequestId("unknown");
			response.setStatusCode(MessageConstants.STATUS_CODE_SERVER_INTERNAL_ERROR);
			String errorMsg = ExceptionUtils.getCauseMessage(cause);
			response.setData(StringUtils.isNull(errorMsg) ? "Server internal error.".getBytes() : errorMsg.getBytes());
			response.setServerAddr(ctx.channel().localAddress().toString());
			ctx.writeAndFlush(response.toBytes());
		}
		logger.error("Netty handler catch exption.", cause);
	}
}

  4、服务器端测试类编写,基于Junit的测试类:

@RunWith(SpringRunner.class)
@SpringBootTest(classes = ServerApplication.class)
public class ServerTest {

	@Test
	public void serverTest() throws Exception {
		new CountDownLatch(1).await(); //block住主线程
	}

}

  执行ServerTest,将会看到如下提示:
1598260787221508096.png
服务器端是以非守护线程执行的,不会block住主线程。

客户端实现

  客户端支持长连接,客户端的连接获取是基于连接池的方式获取,连接的调用支持同步和异步调用,同步调用会block住当前线程,直到服务器返回。异步调用的回调方法,是由netty的worker线程调用,因此不能执行耗时太长的操作。客户端的代码如下:
  1、ClientTest类,其中syncRequestTest为同步请求服务器端,asyncRequestTest为异步请求服务器端。

import java.net.InetSocketAddress;
import org.junit.Test;
import io.github.junxworks.junx.netty.call.CallFuture;
import io.github.junxworks.junx.netty.call.CallUtils;
import io.github.junxworks.junx.netty.heartbeat.CommonClientHeartbeatHandlerFactory;
import io.github.junxworks.junx.netty.initializer.CommonChannelInitializer;
import io.github.junxworks.junx.netty.message.IoRequest;
import io.github.junxworks.junx.netty.message.IoResponse;
import io.github.junxworks.junx.netty.pool.NettyChannelPool;
import io.github.junxworks.junx.netty.pool.NettyChannelPoolManager;
import io.netty.channel.Channel;

public class ClientTest {

	private String host = "localhost";

	private int port = 8080;

	private byte[] data = "michael".getBytes();

	/**
	 * 同步调用测试
	 */
	@Test
	public void syncRequestTest() throws Exception {
		NettyChannelPoolManager poolManager = new NettyChannelPoolManager();
		poolManager.start();
		NettyChannelPool pool = getPool(poolManager, host, port);
		Channel ch = null;
		try {
			ch = pool.acquire();
			IoRequest req = new IoRequest();
			req.setRequestTimeout(1000);
			req.setData(data);
			CallFuture<IoResponse> syn = CallUtils.call(ch, req);
			IoResponse res = syn.get(); //线程同步等待应答
			System.out.println("[Sync] Handler request id:" + res.getRequestId() + " result:" + new String(res.getData()));
		} finally {
			if (ch != null)
				pool.release(ch);
		}
		Thread.sleep(1000);
		poolManager.stop();
	}

	/**
	 * 异步调用测试
	 * @throws Exception 
	 */
	@Test
	public void asyncRequestTest() throws Exception {
		NettyChannelPoolManager poolManager = new NettyChannelPoolManager();
		poolManager.start();
		NettyChannelPool pool = getPool(poolManager, host, port);
		Channel ch = null;
		try {
			ch = pool.acquire();
			IoRequest req = new IoRequest();
			req.setRequestTimeout(1000);
			req.setData(data);
			CallUtils.call(ch, req, new AsyncCallback(req)); //异步回调
		} finally {
			if (ch != null)
				pool.release(ch);
		}
		Thread.sleep(1000); //block主线程
		poolManager.stop();
	}

	private NettyChannelPool getPool(NettyChannelPoolManager poolManager, String host, int port) throws Exception {
		InetSocketAddress serverAddr = InetSocketAddress.createUnresolved(host, port);
		NettyChannelPool pool = poolManager.getPool(serverAddr);
		if (pool == null) {
			CommonChannelInitializer initializer = new CommonChannelInitializer(new ClientChannelHandler());
			initializer.setHeartbeatHandlerFactory(new CommonClientHeartbeatHandlerFactory());//长连接,客户端心跳
			pool = poolManager.getPool(serverAddr, initializer);
		}
		return pool;
	}

}

  2、客户端业务处理类ClientChannelHandler:

import io.github.junxworks.junx.netty.call.ReferenceManager;
import io.github.junxworks.junx.netty.message.IoResponse;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

@Sharable  //业务处理类必须无状态
public class ClientChannelHandler extends ChannelInboundHandlerAdapter {

	@Override
	public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
		IoResponse res = (IoResponse) new IoResponse().readFromBytes((byte[]) msg);
		ReferenceManager.future(res.getUUID(), ctx, res);
	}

	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
		cause.printStackTrace();
		ctx.close();
	}
}

  3、客户端异步调用的回调函数实现类AsyncCallback:

import io.github.junxworks.junx.core.util.StringUtils;
import io.github.junxworks.junx.netty.call.Callback;
import io.github.junxworks.junx.netty.message.IoRequest;
import io.github.junxworks.junx.netty.message.IoResponse;
import io.netty.channel.ChannelHandlerContext;

public class AsyncCallback implements Callback<IoResponse> {

	private IoRequest req;

	public AsyncCallback(IoRequest req) {
		this.req = req;
	}

	@Override
	public void callback(ChannelHandlerContext ctx, IoResponse t) {
		System.out.println("[async] Handler request id:" + t.getRequestId() + " result:" + new String(t.getData()));
	}

	@Override
	public void dead() {
		System.out.println(StringUtils.format("Request \"%s\" is dead.", req.getId()));
	}

}

  demo的源码可以参考:https://gitee.com/junxworks/junx/tree/master/junx-sample/src/main/java/io/github/junxworks/junx/test/netty


标题:junx-netty结合Springboot快速开发长连接服务端+客户端
作者:michael
地址:https://blog.junxworks.cn/articles/2018/09/22/1537621711549.html