问小白 wenxiaobai
资讯
历史
科技
环境与自然
成长
游戏
财经
文学与艺术
美食
健康
家居
文化
情感
汽车
三农
军事
旅行
运动
教育
生活
星座命理

Netty线程模型、Future、Channel总结和源码分析

创作时间:
作者:
@小白创作中心

Netty线程模型、Future、Channel总结和源码分析

引用
1
来源
1.
http://www.cdweb.net/article/jpoppo.html

Netty线程模型

Netty提供了灵活的线程模型配置,包括单线程Reactor、多线程Reactor和多层线程Reactor。无论采用哪种线程模型,都通过单一的Acceptor接收客户端请求,然后可以创建多个NioEventLoop来处理IO操作。

EventLoop和EventLoopGroup实际上继承了Java的ScheduledExecutorService,具备线程池的特性,线程数量可以根据需要动态配置。例如,配置单线程模型时,只需将线程数量设置为1即可。

Future和Promise机制

Future

Future代表一个异步操作的结果。其主要方法包括:

  • close():关闭Future,但结果未知
  • get():获取操作结果,但会阻塞当前线程
  • isDone():判断操作是否完成

ChannelFuture是专门用于获取异步返回结果的Future类型。

ChannelFutureListener

可以通过实现ChannelFutureListener接口来获得回调,无需等待get方法返回。

public interface ChannelFutureListener extends GenericFutureListener {
    ChannelFutureListener CLOSE = new ChannelFutureListener() {
        public void operationComplete(ChannelFuture future) {
            future.channel().close();
        }
    };
    ChannelFutureListener CLOSE_ON_FAILURE = new ChannelFutureListener() {
        public void operationComplete(ChannelFuture future) {
            if (!future.isSuccess()) {
                future.channel().close();
            }
        }
    };
    ChannelFutureListener FIRE_EXCEPTION_ON_FAILURE = new ChannelFutureListener() {
        public void operationComplete(ChannelFuture future) {
            if (!future.isSuccess()) {
                future.channel().pipeline().fireExceptionCaught(future.cause());
            }
        }
    };
}

连接超时和channel超时配置

在Bootstrap中可以配置连接超时时间:

Bootstrap bootstrap = new Bootstrap();
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 10000);
channelFutrue.awaitUninterruptibly(10, TimeUnit.SECONDS);

需要注意以下两点:

  1. 谨慎使用await,可能导致死锁。
  2. ChannelFuture超时后如果调用了业务代码重连,而此时IO未超时,将可能导致多条连接并存,设置IO超时时间建议小于业务代码超时时间。

Promise

Promise是Future的升级版,不仅支持读取结果,还支持在回调过程中进行操作。例如,DefaultPromise类中的awaitUninterruptibly方法可以手动打断回调,使进程等待。

public Promise awaitUninterruptibly() {
    if (this.isDone()) {
        return this;
    } else {
        boolean interrupted = false;
        synchronized(this) {
            while(!this.isDone()) {
                this.checkDeadLock();
                this.incWaiters();
                try {
                    this.wait();
                } catch (InterruptedException var9) {
                    interrupted = true;
                } finally {
                    this.decWaiters();
                }
            }
        }
        if (interrupted) {
            Thread.currentThread().interrupt();
        }
        return this;
    }
}

其中,checkDeadLock方法用于避免死锁,incWaiters方法限制了最大等待数量为32767。

Channel和Unsafe

Channel负责对外提供操作IO的接口,而Unsafe是Channel的内部接口类,封装了一些不安全的操作,不允许外部直接调用。实际的IO操作最终都在Unsafe中执行。

以Channel的连接操作为例,跟踪其实现过程:

// Channel调用连接
ChannelFuture connect(SocketAddress var1);

// DefaultChannelPipeline中执行,实际是调用尾部的pipeline
public ChannelFuture connect(SocketAddress remoteAddress) {
    return this.tail.connect(remoteAddress);
}

// AbstractChannelHandlerContext持续寻找所有handler执行对象
public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) {
    AbstractChannelHandlerContext next = this.findContextOutbound();
    next.invoker().invokeConnect(next, remoteAddress, localAddress, promise);
    return promise;
}

private AbstractChannelHandlerContext findContextOutbound() {
    AbstractChannelHandlerContext ctx = this;
    do {
        ctx = ctx.prev;
    } while(!ctx.outbound);
    return ctx;
}

// 实际执行是寻找到Unsafe的Invoker
public ChannelHandlerInvoker invoker() {
    return this.invoker == null ? this.channel().unsafe().invoker() : this.invoker;
}

public void invokeConnect(final ChannelHandlerContext ctx, final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
    if (remoteAddress == null) {
        throw new NullPointerException("remoteAddress");
    } else if (ChannelHandlerInvokerUtil.validatePromise(ctx, promise, false)) {
        if (this.executor.inEventLoop()) {
            ChannelHandlerInvokerUtil.invokeConnectNow(ctx, remoteAddress, localAddress, promise);
        } else {
            this.safeExecuteOutbound(new OneTimeTask() {
                public void run() {
                    ChannelHandlerInvokerUtil.invokeConnectNow(ctx, remoteAddress, localAddress, promise);
                }
            }, promise);
        }
    }
}
© 2023 北京元石科技有限公司 ◎ 京公网安备 11010802042949号