行业资讯

首页-天美娱乐-天美平台擎天联合【天美娱乐注册】擎天联合

2022-11-25 09:46:14 yqs888 0

首页-天美娱乐-天美平台擎天联合【天美娱乐注册】擎天联合报道,RocketMQ使用了IdleStateHandler来进行心跳检测,客户端和服务端保持长连接需要通过一个检测机制来确保链接的有效性,在链接处于空闲状态或者一方宕机又或者网络延迟,在这种情况下就要确认链接是否有效,无效链接就需要客户端和服务端都关闭当前链路,释放文件句柄资源。

二、RocketMQ使用IdleStateHandler场景

读者可能看到这里有可能会比较懵,我们前文有解析的,我再带一下;

在NettyRemotingServer网络通信服务器的start方法中会去构建一个正在的netty server组件ServerBootstrap,在这里里面就直接构建了一个IdleStateHandler用来监听端口accept事件和客户端read/write事件;

ChannelInitializer是一个抽象类,不能直接使用,需要重写initChannel方法。

ChannelInitializer的使用场景:

  • 在ServerBootstrap初始化时,为监听端口accept事件的Channel添加ServerBootstrapAcceptor;

  • 在有新链接进入时,为监听客户端read/write事件的Channel添加用户自定义的ChannelHandler;

// 网络连接最大的空闲时间,120s,超过2分钟没有进行通信,就直接断开一个长连接private int serverChannelMaxIdleTimeSeconds = 120;// 真正的在这里创建netty serverServerBootstrap childHandler = this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)         .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)         .option(ChannelOption.SO_BACKLOG, nettyServerConfig.getServerSocketBacklog()) // tcp三次握手的accept队列长度         .option(ChannelOption.SO_REUSEADDR, true) // server socket channel unbind端口监听了以后,还处于延迟unbind状态,重新启动允许我们可以立马监听这个端口         .option(ChannelOption.SO_KEEPALIVE, false) // 是否自动发送探测包探测网络连接是否还存活         .childOption(ChannelOption.TCP_NODELAY, true) // nodelay,禁止打包传输,避免通信有延迟         .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))         .childHandler(new ChannelInitializer<SocketChannel>() {            @Override             public void initChannel(SocketChannel ch) throws Exception {                 ch.pipeline()                     .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME, handshakeHandler)                     .addLast(defaultEventExecutorGroup,                         encoder, // 编码器                         new NettyDecoder(), // 解码器                         // 空闲探测                         new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),                         connectionManageHandler,                         serverHandler                     );             }         });

规定在120秒之内,没有发生read或write事件的时候,就会触发fireUserEventTriggered,触发用户自定义事件的执行(关闭链接)。

三、源码分析导读

  1. 分析IdleStateHandler构造函数,有那些参数;

  2. 内部是如何进行定时调度的;

  3. 如果120秒之内没有read或write事件,如何通知RouteInfoManager路由管理组件关闭链接;

  4. IdleStateHandler如何在发生read或write事件的时候如何更新最近一次读或写时间;

四、源码分析

1、IdleStateHandler构造函数

public IdleStateHandler(boolean observeOutput,            long readerIdleTime, long writerIdleTime, long allIdleTime,             TimeUnit unit) {        if (unit == null) {            throw new NullPointerException("unit");         }        this.observeOutput = observeOutput;        //读空闲时间         if (readerIdleTime <= 0) {             readerIdleTimeNanos = 0;         } else {             readerIdleTimeNanos = Math.max(unit.toNanos(readerIdleTime), MIN_TIMEOUT_NANOS);         }        //写空闲时间         if (writerIdleTime <= 0) {             writerIdleTimeNanos = 0;         } else {             writerIdleTimeNanos = Math.max(unit.toNanos(writerIdleTime), MIN_TIMEOUT_NANOS);         }        //读或写空闲时间         if (allIdleTime <= 0) {             allIdleTimeNanos = 0;         } else {             allIdleTimeNanos = Math.max(unit.toNanos(allIdleTime), MIN_TIMEOUT_NANOS);         }     }
  • readerIdleTime:定义读空闲时间,在这个时间间隔内如果链路没有发生读事件,会触发定时任务的执行。

  • writerIdleTime:定义写空闲时间,在这个时间间隔内如果链路没有发生写事件,也会触发定时任务的执行。

  • allIdleTime:定义读或写空闲时间,在这个时间间隔内如果链路既没有发生读事件也没有发生写事件,会触发定时任务的执行。

public void handlerAdded(ChannelHandlerContext ctx) throws Exception {        if (ctx.channel().isActive() && ctx.channel().isRegistered()) {            // channelActive() event has been fired already, which means this.channelActive() will             // not be invoked. We have to initialize here instead.             initialize(ctx);         } else {            // channelActive() event has not been fired yet.  this.channelActive() will be invoked             // and initialization will occur there.         }     }private void initialize(ChannelHandlerContext ctx) {        // Avoid the case where destroy() is called before scheduling timeouts.         // See: https://github.com/netty/netty/issues/143         switch (state) {        case 1:        case 2:            return;         }          state = 1;         initOutputChanged(ctx);          lastReadTime = lastWriteTime = ticksInNanos();        if (readerIdleTimeNanos > 0) {             readerIdleTimeout = schedule(ctx, new ReaderIdleTimeoutTask(ctx),                     readerIdleTimeNanos, TimeUnit.NANOSECONDS);         }        if (writerIdleTimeNanos > 0) {             writerIdleTimeout = schedule(ctx, new WriterIdleTimeoutTask(ctx),                     writerIdleTimeNanos, TimeUnit.NANOSECONDS);         }        if (allIdleTimeNanos > 0) {             allIdleTimeout = schedule(ctx, new AllIdleTimeoutTask(ctx),                     allIdleTimeNanos, TimeUnit.NANOSECONDS);         }     }  ScheduledFuture<?> schedule(ChannelHandlerContext ctx, Runnable task, long delay, TimeUnit unit) {        //当前channel所在线程池也是一个周期性任务线程池         return ctx.executor().schedule(task, delay, unit);     }

IdleStateHandler也是被作为一个Handle添加到ChannelPipeline中,当其添加成功的时候就会触发hadnlerAdded事件:他会判断当前channel链路是否已生效并且已经注册到selector监听器上,此时就会执行initialize函数。

  • state:作为IdleStateHandler的状态字段,0 - none, 1 - initialized, 2 - destroyed,已初始化或被销毁就不再执行初始化方法;

  • lastReaTime:当前时间作为最近一次读或写时间;

  • 如果定义了读或写或者读写空闲时间的时候,就开启对应的IdleTimeoutTask周期性超时任务;