netty處理器鏈初始化原始碼分析

2020-09-20 11:01:07

參考資料

Reactor單執行緒模型詳解與實現
非同步回撥原理詳解與實現
Reactor(主從)原理詳解與實現
NioEventLoopGroup原始碼分析
netty伺服器端啟動流程原始碼分析
netty伺服器端啟動流程總結
netty處理使用者端新連線原始碼分析

先看處理器鏈初始化整體流程圖

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片儲存下來直接上傳(img-i0Xn9sCi-1600503012123)(/Users/wuxinxin/Library/Application Support/typora-user-images/image-20200919160229930.png)]

設定處理器

//使用設定構建器,設定需要繫結的處理器
serverBootstrap.group(bossEventLoopGroup, workEventExecutors)
        .channel(NioServerSocketChannel.class)
        .localAddress(port)
        .childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel socketChannel) throws Exception {
                socketChannel.pipeline().addLast(new MyHandler()).addLast(new MyHandler2());
            }
        });
//1.childHandler設定項是給worker的channel繫結的Handler(子通道所屬)
//2.Handler設定項是給boss的channel繫結的Handler(父通道所屬)

初始化處理器鏈

//ServerBootstrapAcceptor的channelRead方法
//這個是父通道處理器的處理邏輯,我們這直接分析子通道的處理器鏈,這也是我們比較關心的
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    final Channel child = (Channel) msg;

  	//ServerBootstrapAcceptor是我們ServerBootstra的靜態內部類,直接參照childHandler
  	//新連線的通道直接把我們構建時候的ChannelInitializer物件新增到處理器鏈中
  	//同時看到,ChannelHandler是新增到pipeline中的
    child.pipeline().addLast(childHandler);
		
  	//省去不需要程式碼
}

//DefaultChannelPipeline的addLast方法
 public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {
            checkMultiplicity(handler);

          	//將處理器包裝為一個AbstractChannelHandlerContext,這種處理器包裝了一些上下文資訊在裡面
          	//處理器鏈的所有處理器都是AbstractChannelHandlerContext型別
            newCtx = newContext(group, filterName(name, handler), handler);

          	//將新的handler新增到處理鏈中
            addLast0(newCtx);
          
          //省去不需要程式碼
           
        return this;
    }

//將新的處理器新增到一個雙向連結串列中
private void addLast0(AbstractChannelHandlerContext newCtx) {
        AbstractChannelHandlerContext prev = tail.prev;
        newCtx.prev = prev;
        newCtx.next = tail;
        prev.next = newCtx;
        tail.prev = newCtx;
    }
   
//1.這裡只是對處理器鏈做了一個簡單的初始化,只是把ChannelInitializer加入到處理器鏈中
//2.ChannelInitializer可以理解為我們設定的handler的一個包裝
//3.為什麼不直接將我們所有自定義的處理器新增到處理器鏈中,而是將一個處理器包裝暫時放入
//    處理器鏈中?

在這裡插入圖片描述

將自定義處理器迴圈放入處理器鏈中

//ServerBootstrapAcceptor的channelRead方法
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    //省去不需要程式碼
    try {
        //這裡將完成一個新子通道所有初始化的動作,其實這裡直接是做了非同步去直接做這些事情的
        //處理器鏈的真正新增是在這個非同步任務裡面的
        childGroup.register(child).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    forceClose(child, future.cause());
                }
            }
        });
    } catch (Throwable t) {
        forceClose(child, t);
    }
}

//AbstractUnsafe的register
 @Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {

  //省去不需要程式碼
  
  //使用非同步任務去做所有初始化通道的事情,像迴圈新增所有處理器鏈這種動作,也很耗時
  //容易影響處理新連線的效率,不適合放在主執行緒中
  eventLoop.execute(new Runnable() {
    @Override
    public void run() {
      //迴圈新增所有自定義處理器,放入處理器鏈中
      register0(promise);
    }
  });
  //省去不需要程式碼
}

//AbstractUnsafe的register0
private void register0(ChannelPromise promise) {
  //省去不需要程式碼  
  
  //迴圈新增所有自定義處理器,放入處理器鏈中
  pipeline.invokeHandlerAddedIfNeeded();
               
 }

//ChannelInitializer的initChannel
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
        if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance.
            try {
                //回撥我們的內部類實現initChannel(SocketChannel socketChannel)方法
                initChannel((C) ctx.channel());
            } catch (Throwable cause) {
                exceptionCaught(ctx, cause);
            } finally {
               //新增完畢後移除之前處理器包裝
                remove(ctx);
            }
            return true;
        }
        return false;
    }

//我們自定義啟動類裡面
//將包裝處理裡面的自定義處理器放入處理器鏈中
.childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel socketChannel) throws Exception {
                //將每個處理器放入處理器鏈中
                socketChannel.pipeline().addLast(new MyHandler()).addLast(new MyHandler2());
            }
        });

//迴圈處理
 @Override
    public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
        if (handlers == null) {
            throw new NullPointerException("handlers");
        }

       //迴圈處理
        for (ChannelHandler h: handlers) {
            if (h == null) {
                break;
            }
            addLast(executor, null, h);
        }

        return this;
    }

//刪除處理鏈中的包裝處理器
 private void remove(ChannelHandlerContext ctx) {
        try {
            ChannelPipeline pipeline = ctx.pipeline();
            if (pipeline.context(this) != null) {
                pipeline.remove(this);
            }
        } finally {
            initMap.remove(ctx);
        }
    }
總結:現在知道為什麼在這裡正真新增我們的處理器了吧,註冊一個channel,整體都是在非同步任務的子執行緒中的
     這樣不會阻塞或者影響連線處理

在這裡插入圖片描述

總結

  1. boss的執行緒依然是隻是為了快速處理連線,具體邏輯到worker的執行緒中去做,這是ChannelInitializer存在的原因, boss和worker做了很好的邊界處理
  2. 處理器擴充套件方式使用了雙向連結串列,這樣有利於程式的擴充套件,使用了經典的責任鏈模式
  3. 不影響主流程的業務儘量使用非同步任務去做,儘量不要佔用主執行緒資源