前言
在实现TCP长连接功能中,客户端断线重连是一个很常见的问题,当我们使用netty实现断线重连时,是否考虑过如下几个问题:
如何监听到客户端和服务端连接断开 ?如何实现断线后重新连接 ?netty客户端线程给多大比较合理 ?
其实上面都是笔者在做断线重连时所遇到的问题,而 “netty客户端线程给多大比较合理?” 这个问题更是笔者在做断线重连时因一个异常引发的思考。下面讲讲整个过程:
因为本节讲解内容主要涉及在客户端,但是为了读者能够运行整个程序,所以这里先给出服务端及公共的依赖和实体类。
服务端及common代码
maven依赖:
<dependencies><!–只是用到了spring-boot的日志框架–><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId><version>2.4.1</version></dependency><dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId><version>4.1.56.Final</version></dependency><dependency><groupId>org.jboss.marshalling</groupId><artifactId>jboss-marshalling-serial</artifactId><version>2.0.10.Final</version></dependency></dependencies>服务端业务处理代码
主要用于记录打印当前客户端连接数,当接收到客户端信息后返回“hello netty”字符串
@ChannelHandler.SharablepublicclassSimpleServerHandlerextendsChannelInboundHandlerAdapter{privatestaticfinalInternalLoggerlog=InternalLoggerFactory.getInstance(SimpleServerHandler.class);publicstaticfinalChannelGroupchannels=newDefaultChannelGroup(GlobalEventExecutor.INSTANCE);@OverridepublicvoidchannelActive(ChannelHandlerContextctx)throwsException{channels.add(ctx.channel());log.info(“客户端连接成功:clientaddress:{}”,ctx.channel().remoteAddress());log.info(“当前共有{}个客户端连接”,channels.size());}@OverridepublicvoidchannelRead(ChannelHandlerContextctx,Objectmsg)throwsException{log.info(“serverchannelRead:{}”,msg);ctx.channel().writeAndFlush(“hellonetty”);}@OverridepublicvoidchannelInactive(ChannelHandlerContextctx)throwsException{log.info(“channelInactive:clientclose”);}@OverridepublicvoidexceptionCaught(ChannelHandlerContextctx,Throwablecause)throwsException{if(causeinstanceofjava.io.IOException){log.warn(“exceptionCaught:clientclose”);}else{cause.printStackTrace();}}}服务端心跳检查代码
当接收心跳”ping”信息后,返回客户端’’pong”信息。如果客户端在指定时间内没有发送任何信息则关闭客户端。
publicclassServerHeartbeatHandlerextendsChannelInboundHandlerAdapter{privatestaticfinalInternalLoggerlog=InternalLoggerFactory.getInstance(ServerHeartbeatHandler.class);@OverridepublicvoidchannelRead(ChannelHandlerContextctx,Objectmsg)throwsException{log.info(“serverchannelRead:{}”,msg);if(msg.equals(“ping”)){ctx.channel().writeAndFlush(“pong”);}else{//由下一个handler处理,示例中则为SimpleServerHandlerctx.fireChannelRead(msg);}}@OverridepublicvoiduserEventTriggered(ChannelHandlerContextctx,Objectevt)throwsException{if(evtinstanceofIdleStateEvent){//该事件需要配合io.netty.handler.timeout.IdleStateHandler使用IdleStateEventidleStateEvent=(IdleStateEvent)evt;if(idleStateEvent.state()==IdleState.READER_IDLE){//超过指定时间没有读事件,关闭连接log.info(“超过心跳时间,关闭和服务端的连接:{}”,ctx.channel().remoteAddress());//ctx.channel().close();}}else{super.userEventTriggered(ctx,evt);}}}编解码工具类
主要使用jboss-marshalling-serial编解码工具,可自行查询其优缺点,这里只是示例使用。
publicfinalclassMarshallingCodeFactory{/**创建Jbossmarshalling解码器*/publicstaticMarshallingDecoderbuildMarshallingDecoder(){//参数serial表示创建的是Java序列化工厂对象,由jboss-marshalling-serial提供MarshallerFactoryfactory=Marshalling.getProvidedMarshallerFactory(“serial”);MarshallingConfigurationconfiguration=newMarshallingConfiguration();configuration.setVersion(5);DefaultUnmarshallerProviderprovider=newDefaultUnmarshallerProvider(factory,configuration);returnnewMarshallingDecoder(provider,1024);}/**创建Jbossmarshalling编码器*/publicstaticMarshallingEncoderbuildMarshallingEncoder(){MarshallerFactoryfactory=Marshalling.getProvidedMarshallerFactory(“serial”);MarshallingConfigurationconfiguration=newMarshallingConfiguration();configuration.setVersion(5);DefaultMarshallerProviderprovider=newDefaultMarshallerProvider(factory,configuration);returnnewMarshallingEncoder(provider);}}公共实体类publicclassUserInfoimplementsSerializable{privatestaticfinallongserialVersionUID=6271330872494117382L;privateStringusername;privateintage;publicUserInfo(){}publicUserInfo(Stringusername,intage){this.username=username;this.age=age;}//省略getter/setter/toString}
下面开始本文的重点,客户端断线重连以及问题思考。
客户端实现刚开始启动时需要进行同步连接,指定连接次数内没用通过则抛出异常,进程退出。客户端启动后,开启定时任务,模拟客户端数据发送。
客户端业务处理handler,接收到数据后,通过日志打印。
publicclassSimpleClientHandlerextendsChannelInboundHandlerAdapter{privatestaticfinalInternalLoggerlog=InternalLoggerFactory.getInstance(SimpleClientHandler.class);privateNettyClientclient;publicSimpleClientHandler(NettyClientclient){this.client=client;}@OverridepublicvoidchannelRead(ChannelHandlerContextctx,Objectmsg)throwsException{log.info(“clientreceive:{}”,msg);}}
封装连接方法、断开连接方法、getChannel()返回io.netty.channel.Channel用于向服务端发送数据。boolean connect()是一个同步连接方法,如果连接成功返回true,连接失败返回false。
publicclassNettyClient{privatestaticfinalInternalLoggerlog=InternalLoggerFactory.getInstance(NettyClient.class);privateEventLoopGroupworkerGroup;privateBootstrapbootstrap;privatevolatileChannelclientChannel;publicNettyClient(){this(-1);}publicNettyClient(intthreads){workerGroup=threads>0?newNioEventLoopGroup(threads):newNioEventLoopGroup();bootstrap=newBootstrap();bootstrap.group(workerGroup).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY,true).option(ChannelOption.SO_KEEPALIVE,false).option(ChannelOption.CONNECT_TIMEOUT_MILLIS,30000).handler(newClientHandlerInitializer(this));}publicbooleanconnect(){log.info(“尝试连接到服务端:127.0.0.1:8088”);try{ChannelFuturechannelFuture=bootstrap.connect(“127.0.0.1”,8088);booleannotTimeout=channelFuture.awaitUninterruptibly(30,TimeUnit.SECONDS);clientChannel=channelFuture.channel();if(notTimeout){if(clientChannel!=null&&clientChannel.isActive()){log.info(“nettyclientstarted!!!{}connecttoserver”,clientChannel.localAddress());returntrue;}Throwablecause=channelFuture.cause();if(cause!=null){exceptionHandler(cause);}}else{log.warn(“connectremotehost[{}]timeout{}s”,clientChannel.remoteAddress(),30);}}catch(Exceptione){exceptionHandler(e);}clientChannel.close();returnfalse;}privatevoidexceptionHandler(Throwablecause){if(causeinstanceofConnectException){log.error(“连接异常:{}”,cause.getMessage());}elseif(causeinstanceofClosedChannelException){log.error(“connecterror:{}”,”clienthasdestroy”);}else{log.error(“connecterror:”,cause);}}publicvoidclose(){if(clientChannel!=null){clientChannel.close();}if(workerGroup!=null){workerGroup.shutdownGracefully();}}publicChannelgetChannel(){returnclientChannel;}staticclassClientHandlerInitializerextendsChannelInitializer<SocketChannel>{privatestaticfinalInternalLoggerlog=InternalLoggerFactory.getInstance(NettyClient.class);privateNettyClientclient;publicClientHandlerInitializer(NettyClientclient){this.client=client;}@OverrideprotectedvoidinitChannel(SocketChannelch)throwsException{ChannelPipelinepipeline=ch.pipeline();pipeline.addLast(MarshallingCodeFactory.buildMarshallingDecoder());pipeline.addLast(MarshallingCodeFactory.buildMarshallingEncoder());//pipeline.addLast(newIdleStateHandler(25,0,10));//pipeline.addLast(newClientHeartbeatHandler());pipeline.addLast(newSimpleClientHandler(client));}}}
客户端启动类
publicclassNettyClientMain{privatestaticfinalInternalLoggerlog=InternalLoggerFactory.getInstance(NettyClientMain.class);privatestaticfinalScheduledExecutorServicescheduledExecutor=Executors.newSingleThreadScheduledExecutor();publicstaticvoidmain(String[]args){NettyClientnettyClient=newNettyClient();booleanconnect=false;//刚启动时尝试连接10次,都无法建立连接则不在尝试//如果想在刚启动后,一直尝试连接,需要放在线程中,异步执行,防止阻塞程序for(inti=0;i<10;i ){connect=nettyClient.connect();if(connect){break;}//连接不成功,隔5s之后重新尝试连接try{Thread.sleep(5000);}catch(InterruptedExceptione){e.printStackTrace();}}if(connect){log.info(“定时发送数据”);send(nettyClient);}else{nettyClient.close();log.info(“进程退出”);}}/**定时发送数据*/staticvoidsend(NettyClientclient){scheduledExecutor.schedule(newSendTask(client,scheduledExecutor),2,TimeUnit.SECONDS);}}客户端断线重连
断线重连需求:
服务端和客户端之间网络异常,或响应超时(例如有个很长时间的fullGC),客户端需要主动重连其他节点。服务端宕机时或者和客户端之间发生任何异常时,客户端需要主动重连其他节点。服务端主动向客户端发送(服务端)下线通知时,客户端需要主动重连其他节点。如何监听到客户端和服务端连接断开 ?
netty的io.netty.channel.ChannelInboundHandler接口中给我们提供了许多重要的接口方法。为了避免实现全部的接口方法,可以通过继承io.netty.channel.ChannelInboundHandlerAdapter来重写相应的方法即可。
1.void channelInactive(ChannelHandlerContext ctx);在客户端关闭时被调用,表示客户端断开连接。当如下几种情况发生时会触发:
客户端在正常active状态下,主动调用channel或者ctx的close方法。服务端主动调用channel或者ctx的close方法关闭客户端的连接 。发生java.io.IOException(一般情况下是双方连接断开)或者java.lang.OutOfMemoryError(4.1.52版本中新增)时
2.void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;则是在入栈发生任何异常时被调用。如果异常是java.io.IOException或者java.lang.OutOfMemoryError(4.1.52版本新增)时,还会触发channelInactive方法,也就是上面channelInactive被触发的第3条情况。
3.心跳检查也是检查客户端与服务端之间连接状态的必要方式,因为在一些状态下,两端实际上已经断开连接,但客户端无法感知,这时候就需要通过心跳来判断两端的连接状态。心跳可以是客户端心跳和服务端心跳。
客户端心跳:即为客户端发送心跳ping信息,服务端回复pong信息。这样在指定时间内,双方有数据交互则认为是正常连接状态。服务端心跳:则是服务端向客户端发送ping信息,客户端回复pong信息。在指定时间内没有收到回复,则认为对方下线。
netty给我们提供了非常简单的心跳检查方式,只需要在channel的handler链上,添加io.netty.handler.timeout.IdleStateHandler即可实现。
IdleStateHandler有如下几个重要的参数:
readerIdleTimeSeconds, 读超时. 即当在指定的时间间隔内没有从 Channel 读取到数据时, 会触发一个READER_IDLE的IdleStateEvent 事件.writerIdleTimeSeconds, 写超时. 即当在指定的时间间隔内没有数据写入到 Channel 时, 会触发一个WRITER_IDLE的IdleStateEvent 事件.allIdleTimeSeconds, 读/写超时. 即当在指定的时间间隔内没有读或写操作时, 会触发一个ALL_IDLE的IdleStateEvent 事件.
为了能够监听到这些事件的触发,还需要重写ChannelInboundHandler#userEventTriggered(ChannelHandlerContext ctx, Object evt)方法,通过参数evt判断事件类型。在指定的时间类如果没有读写则发送一条心跳的ping请求,在指定时间内没有收到读操作则任务已经和服务端断开连接。则调用channel或者ctx的close方法,使客户端Handler执行channelInactive方法。
到这里看来我们只要在channelInactive和exceptionCaught两个方法中实现自己的重连逻辑即可,但是笔者遇到了第一个坑,重连方法执行了两次。
先看示例代码和结果,在com.bruce.netty.rpc.client.SimpleClientHandler中添加如下代码:
publicclassSimpleClientHandlerextendsChannelInboundHandlerAdapter{privatestaticfinalInternalLoggerlog=InternalLoggerFactory.getInstance(SimpleClientHandler.class);//省略部分代码……/**客户端正常下线时执行该方法*/@OverridepublicvoidchannelInactive(ChannelHandlerContextctx)throwsException{log.warn(“channelInactive:{}”,ctx.channel().localAddress());reconnection(ctx);}/**入栈发生异常时执行exceptionCaught*/@OverridepublicvoidexceptionCaught(ChannelHandlerContextctx,Throwablecause)throwsException{if(causeinstanceofIOException){log.warn(“exceptionCaught:客户端[{}]和远程断开连接”,ctx.channel().localAddress());}else{log.error(cause);}reconnection(ctx);}privatevoidreconnection(ChannelHandlerContextctx){log.info(“5s之后重新建立连接”);//暂时为空实现}}
ClientHandlerInitializer 中添加io.netty.handler.timeout.IdleStateHandler用于心跳检查,ClientHeartbeatHandler用于监听心跳事件,接收心跳pong回复。
staticclassClientHandlerInitializerextendsChannelInitializer<SocketChannel>{privatestaticfinalInternalLoggerlog=InternalLoggerFactory.getInstance(NettyClient.class);privateNettyClientclient;publicClientHandlerInitializer(NettyClientclient){this.client=client;}@OverrideprotectedvoidinitChannel(SocketChannelch)throwsException{ChannelPipelinepipeline=ch.pipeline();pipeline.addLast(MarshallingCodeFactory.buildMarshallingDecoder());pipeline.addLast(MarshallingCodeFactory.buildMarshallingEncoder());//25s内没有read操作则触发READER_IDLE事件//10s内既没有read又没有write操作则触发ALL_IDLE事件pipeline.addLast(newIdleStateHandler(25,0,10));pipeline.addLast(newClientHeartbeatHandler());pipeline.addLast(newSimpleClientHandler(client));}}
com.bruce.netty.rpc.client.ClientHeartbeatHandler
publicclassClientHeartbeatHandlerextendsChannelInboundHandlerAdapter{privatestaticfinalInternalLoggerlog=InternalLoggerFactory.getInstance(ClientHeartbeatHandler.class);@OverridepublicvoidchannelRead(ChannelHandlerContextctx,Objectmsg)throwsException{if(msg.equals(“pong”)){log.info(“收到心跳回复”);}else{super.channelRead(ctx,msg);}}@OverridepublicvoiduserEventTriggered(ChannelHandlerContextctx,Objectevt)throwsException{if(evtinstanceofIdleStateEvent){//该事件需要配合io.netty.handler.timeout.IdleStateHandler使用IdleStateEventidleStateEvent=(IdleStateEvent)evt;if(idleStateEvent.state()==IdleState.ALL_IDLE){//向服务端发送心跳检测ctx.writeAndFlush(“ping”);log.info(“发送心跳数据”);}elseif(idleStateEvent.state()==IdleState.READER_IDLE){//超过指定时间没有读事件,关闭连接log.info(“超过心跳时间,关闭和服务端的连接:{}”,ctx.channel().remoteAddress());ctx.channel().close();}}else{super.userEventTriggered(ctx,evt);}}}
先启动server端,再启动client端,待连接成功之后kill掉 server端进程。
通过客户端日志可以看出,先是执行了exceptionCaught方法然后执行了channelInactive方法,但是这两个方法中都调用了reconnection方法,导致同时执行了两次重连。
为什么执行了exceptionCaught方法又执行了channelInactive方法呢?
我们可以在exceptionCaught和channelInactive方法添加断点一步步查看源码
当NioEventLoop执行select操作之后,处理相应的SelectionKey,发生异常后,会调用AbstractNioByteChannel.NioByteUnsafe#handleReadException方法进行处理,并触发pipeline.fireExceptionCaught(cause),最终调用到用户handler的fireExceptionCaught方法。
privatevoidhandleReadException(ChannelPipelinepipeline,ByteBufbyteBuf,Throwablecause,booleanclose,RecvByteBufAllocator.HandleallocHandle){if(byteBuf!=null){if(byteBuf.isReadable()){readPending=false;pipeline.fireChannelRead(byteBuf);}else{byteBuf.release();}}allocHandle.readComplete();pipeline.fireChannelReadComplete();pipeline.fireExceptionCaught(cause);//Ifoomwillclosethereadevent,releaseconnection.//Seehttps://github.com/netty/netty/issues/10434if(close||causeinstanceofOutOfMemoryError||causeinstanceofIOException){closeOnRead(pipeline);}}
该方法最后会判断异常类型,执行close连接的方法。在连接断线的场景中,这里即为java.io.IOException,所以执行了close方法,当debug到AbstractChannel.AbstractUnsafe#close(ChannelPromise, Throwable, ClosedChannelException, notify)方法中会发现最后又调用了AbstractUnsafe#fireChannelInactiveAndDeregister方法,继续debug最后则会执行自定义的fireChannelInactive方法。
到这里可以总结一个知识点:netty中当执行到handler地fireExceptionCaught方法时,可能会继续触发到fireChannelInactive,也可能不会触发fireChannelInactive。
除了netty根据异常类型判断是否执行close方法外,其实开发人员也可以自己通过ctx或者channel去调用close方法,代码如下:
@OverridepublicvoidexceptionCaught(ChannelHandlerContextctx,Throwablecause)throwsException{if(causeinstanceofIOException){log.warn(“exceptionCaught:客户端[{}]和远程断开连接”,ctx.channel().localAddress());}else{log.error(cause);}//ctx.close();ctx.channel().close();}
但这种显示调用close方法,是否一定会触发调用fireChannelInactive呢?
如果是,那么只需要在exceptionCaught中调用close方法,fireChannelInactive中做重连的逻辑即可!!
在笔者通过日志观察到,在exceptionCaught中调用close方法每次都会调用fireChannelInactive方法。但是查看源码,笔者认为这是不一定的,因为在AbstractChannel.AbstractUnsafe#close(ChannelPromise,Throwable, ClosedChannelException, notify)中会调用io.netty.channel.Channel#isActive进行判断,只有为true,才会执行fireChannelInactive方法。
//io.netty.channel.socket.nio.NioSocketChannel#isActive@OverridepublicbooleanisActive(){SocketChannelch=javaChannel();returnch.isOpen()&&ch.isConnected();}
如何解决同时执行两次问题呢?
在netty初始化时,我们都会添加一系列的handler处理器,这些handler实际上会在netty创建Channel对象(NioSocketChannel)时,被封装在DefaultChannelPipeline中,而DefaultChannelPipeline实际上是一个双向链表,头节点为TailContext,尾节点为TailContext,而中间的节点则是我们添加的一个个handler(被封装成DefaultChannelHandlerContext),当执行Pipeline上的方法时,会从链表上遍历handler执行,因此当执行exceptionCaught方法时,我们只需要提前移除链表上自定义的Handler则无法执行fireChannelInactive方法。
最后实现代码如下:
publicclassSimpleClientHandlerextendsChannelInboundHandlerAdapter{privatestaticfinalInternalLoggerlog=InternalLoggerFactory.getInstance(SimpleClientHandler.class);@OverridepublicvoidchannelInactive(ChannelHandlerContextctx)throwsException{log.warn(“channelInactive:{}”,ctx.channel().localAddress());ctx.pipeline().remove(this);ctx.channel().close();reconnection(ctx);}@OverridepublicvoidexceptionCaught(ChannelHandlerContextctx,Throwablecause)throwsException{if(causeinstanceofIOException){log.warn(“exceptionCaught:客户端[{}]和远程断开连接”,ctx.channel().localAddress());}else{log.error(cause);}ctx.pipeline().remove(this);//ctx.close();ctx.channel().close();reconnection(ctx);}}
执行效果如下,可以看到当发生异常时,只是执行了exceptionCaught方法,并且通过channel关闭了上一次连接资源,也没有执行当前handler的fireChannelInactive方法。
如何实现断线后重新连接 ?
通过上面分析,我们已经知道在什么方法中实现自己的重连逻辑,但是具体该怎么实现呢,怀着好奇的心态搜索了一下各大码友的实现方案。大多做法是通过ctx.channel().eventLoop().schedule添加一个定时任务调用客户端的连接方法。笔者也参考该方式实现代码如下:
privatevoidreconnection(ChannelHandlerContextctx){log.info(“5s之后重新建立连接”);ctx.channel().eventLoop().schedule(newRunnable(){@Overridepublicvoidrun(){booleanconnect=client.connect();if(connect){log.info(“重新连接成功”);}else{reconnection(ctx);}}},5,TimeUnit.SECONDS);}
测试:先启动server端,再启动client端,待连接成功之后kill掉 server端进程。客户端如期定时执行重连,但也就去茶水间倒杯水的时间,回来后发现了如下异常。
……省略14条相同的重试日志[2021-01-1718:46:45.032]INFO[nioEventLoopGroup-2-1][com.bruce.netty.rpc.client.SimpleClientHandler]:5s之后重新建立连接[2021-01-1718:46:48.032]INFO[nioEventLoopGroup-2-1][com.bruce.netty.rpc.client.NettyClient]:尝试连接到服务端:127.0.0.1:8088[2021-01-1718:46:50.038]ERROR[nioEventLoopGroup-2-1][com.bruce.netty.rpc.client.NettyClient]:连接异常:Connectionrefused:nofurtherinformation:/127.0.0.1:8088[2021-01-1718:46:50.038]INFO[nioEventLoopGroup-2-1][com.bruce.netty.rpc.client.SimpleClientHandler]:5s之后重新建立连接[2021-01-1718:46:53.040]INFO[nioEventLoopGroup-2-1][com.bruce.netty.rpc.client.NettyClient]:尝试连接到服务端:127.0.0.1:8088[2021-01-1718:46:53.048]ERROR[nioEventLoopGroup-2-1][com.bruce.netty.rpc.client.NettyClient]:connecterror:io.netty.util.concurrent.BlockingOperationException:DefaultChannelPromise@10122121(incomplete)atio.netty.util.concurrent.DefaultPromise.checkDeadLock(DefaultPromise.java:462)atio.netty.channel.DefaultChannelPromise.checkDeadLock(DefaultChannelPromise.java:159)atio.netty.util.concurrent.DefaultPromise.await0(DefaultPromise.java:667)atio.netty.util.concurrent.DefaultPromise.awaitUninterruptibly(DefaultPromise.java:305)atcom.bruce.netty.rpc.client.NettyClient.connect(NettyClient.java:49)atcom.bruce.netty.rpc.client.SimpleClientHandler$1.run(SimpleClientHandler.java:65)atio.netty.util.concurrent.PromiseTask.runTask(PromiseTask.java:98)atio.netty.util.concurrent.ScheduledFutureTask.run(ScheduledFutureTask.java:170)atio.netty.util.concurrent.AbstractEventExecutor.safeExecute$$$capture(AbstractEventExecutor.java:164)atio.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java)atio.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:472)atio.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:500)
根据异常栈,可以发现是com.bruce.netty.rpc.client.NettyClient#connect方法中调用了等待方法
booleannotTimeout=channelFuture.awaitUninterruptibly(20,TimeUnit.SECONDS);
而该方法内部会进行检测,是否在io线程上执行了同步等待,这会导致抛出异常BlockingOperationException。
@OverrideprotectedvoidcheckDeadLock(){if(channel().isRegistered()){super.checkDeadLock();}}protectedvoidcheckDeadLock(){EventExecutore=executor();if(e!=null&&e.inEventLoop()){thrownewBlockingOperationException(toString());}}
奇怪的是为什么不是每次尝试重连都抛出该异常,而是每隔16次抛出一次呢?
这让我联想到自己的笔记本是8核处理器,而netty默认线程池是2 * c,就是16条线程,这之间似乎有些关联。
实际上在调用ChannelFuture channelFuture = bootstrap.connect(“127.0.0.1”, 8088);,netty首先会创建一个io.netty.channel.Channel(示例中是NioSocketChannel),然后通过io.netty.util.concurrent.EventExecutorChooserFactory.EventExecutorChooser依次选择一个NioEventLoop,将Channel绑定到NioEventLoop上。
io.netty.util.concurrent.SingleThreadEventExecutor#inEventLoop
//ReturntrueifthegivenThreadisexecutedintheeventloop,falseotherwise.@OverridepublicbooleaninEventLoop(Threadthread){returnthread==this.thread;}
重连的方法是在一个NioEventLoop(也就是io线程)上被调用,第1次重连实际上是选择了第2个NioEventLoop,第2次重连实际上是选择了第3个NioEventLoop,以此类推,当一轮选择过后,重新选到第一个NioEventLoop时,boolean inEventLoop()返回true,则抛出了BlockingOperationException。
方案1
不要在netty的io线程上执行同步连接,使用单独的线程池定时执行重试,该线程还可以执行自己重连的业务逻辑操作,不阻塞io线程。(如果不需要业务操作之后销毁线程池)。
com.bruce.netty.rpc.client.SimpleClientHandler 修改reconnection方法
privatestaticScheduledExecutorServiceSCHEDULED_EXECUTOR;privatevoidinitScheduledExecutor(){if(SCHEDULED_EXECUTOR==null){synchronized(SimpleClientHandler.class){if(SCHEDULED_EXECUTOR==null){SCHEDULED_EXECUTOR=Executors.newSingleThreadScheduledExecutor(r->{Threadt=newThread(r,”Client-Reconnect-1″);t.setDaemon(true);returnt;});}}}}privatevoidreconnection(ChannelHandlerContextctx){log.info(“5s之后重新建立连接”);initScheduledExecutor();SCHEDULED_EXECUTOR.schedule(()->{booleanconnect=client.connect();if(connect){//连接成功,关闭线程池SCHEDULED_EXECUTOR.shutdown();log.info(“重新连接成功”);}else{reconnection(ctx);}},3,TimeUnit.SECONDS);}方案2
可以在io线程上使用异步重连:
com.bruce.netty.rpc.client.NettyClient添加方法connectAsync方法,两者的区别在于connectAsync方法中没有调用channelFuture的同步等待方法。而是改成监听器(ChannelFutureListener)的方式,实际上这个监听器是运行在io线程上。
publicvoidconnectAsync(){log.info(“尝试连接到服务端:127.0.0.1:8088”);ChannelFuturechannelFuture=bootstrap.connect(“127.0.0.1”,8088);channelFuture.addListener((ChannelFutureListener)future->{Throwablecause=future.cause();if(cause!=null){exceptionHandler(cause);log.info(“等待下一次重连”);channelFuture.channel().eventLoop().schedule(this::connectAsync,5,TimeUnit.SECONDS);}else{clientChannel=channelFuture.channel();if(clientChannel!=null&&clientChannel.isActive()){log.info(“Nettyclientstarted!!!{}connecttoserver”,clientChannel.localAddress());}}});}
com.bruce.netty.rpc.client.SimpleClientHandler
publicclassSimpleClientHandlerextendsChannelInboundHandlerAdapter{privatestaticfinalInternalLoggerlog=InternalLoggerFactory.getInstance(SimpleClientHandler.class);privateNettyClientclient;publicSimpleClientHandler(NettyClientclient){this.client=client;}@OverridepublicvoidchannelRead(ChannelHandlerContextctx,Objectmsg)throwsException{log.info(“clientreceive:{}”,msg);}@OverridepublicvoidchannelInactive(ChannelHandlerContextctx)throwsException{log.warn(“channelInactive:{}”,ctx.channel().localAddress());ctx.pipeline().remove(this);ctx.channel().close();reconnectionAsync(ctx);}@OverridepublicvoidexceptionCaught(ChannelHandlerContextctx,Throwablecause)throwsException{if(causeinstanceofIOException){log.warn(“exceptionCaught:客户端[{}]和远程断开连接”,ctx.channel().localAddress());}else{log.error(cause);}ctx.pipeline().remove(this);ctx.close();reconnectionAsync(ctx);}privatevoidreconnectionAsync(ChannelHandlerContextctx){log.info(“5s之后重新建立连接”);ctx.channel().eventLoop().schedule(newRunnable(){@Overridepublicvoidrun(){client.connectAsync();}},5,TimeUnit.SECONDS);}}
netty客户端线程给多大比较合理 ?
netty中一个NioEventLoopGroup默认创建的线程数是cpu核心数 * 2 ,这些线程都是用于io操作,那么对于客户端应用程序来说真的需要这么多io线程么?
通过上面分析BlockingOperationException异常时我们分析到,实际上netty在创建一个Channel对象后只会从NioEventLoopGroup中选择一个NioEventLoop来绑定,只有创建多个Channel才会依次选择下一个NioEventLoop,也就是说一个Channel只会对应一个NioEventLoop,而NioEventLoop可以绑定多个Channel。
1.对于客户端来说,如果只是连接的一个server节点,那么只要设置1条线程即可。即使出现了断线重连,在连接断开之后,之前的Channel会从NioEventLoop移除。重连之后,仍然只会在仅有的一个NioEventLoop注册一个新的Channel。
2.如果客户端同时如下方式多次调用io.netty.bootstrap.Bootstrap#connect(String inetHost, int inetPort)连接多个Server节点,那么线程可以设置大一点,但不要超过2*c,而且只要出现断线重连,同样不能保证每个NioEventLoop都会绑定一个客户端Channel。
publicbooleanconnect(){try{ChannelFuturechannelFuture1=bootstrap.connect(“127.0.0.1”,8088);ChannelFuturechannelFuture2=bootstrap.connect(“127.0.0.1”,8088);ChannelFuturechannelFuture3=bootstrap.connect(“127.0.0.1”,8088);}catch(Exceptione){exceptionHandler(e);}clientChannel.close();returnfalse;}
3.如果netty客户端线程数设置大于1有什么影响么?
明显的异常肯定是不会有的,但是造成资源浪费,首先会创建多个NioEventLoop对象,但NioEventLoop是处于非运行状态。一旦出现断线重连,那么重新连接时,下一个NioEventLoop则会被选中,并创建/启动线程一直处于runnable状态。而上一个NioEventLoop也是一直处于runnable状态,由于上一个Channel已经被close,所以会造成每次select结果都是空的,没有意义的空轮询。
如下则是netty客户端使用默认线程数,4次断线重连后一共创建的5条NioEventLoop线程,但是实际上只有第5条线程在执行读写操作。
4.如果客户端存在耗时的业务逻辑,应该单独使用业务线程池,避免在netty的io线程中执行耗时逻辑处理。
总结
本篇主要讲解了,netty断线重连的两种实现方案,以及实现过程中遇到的异常问题,通过分析问题,让大家了解netty的实现细节。
推荐:
主流Java进阶技术(学习资料分享)