Reactor單執行緒模型詳解與實現
非同步回撥原理詳解與實現
Reactor(主從)原理詳解與實現
NioEventLoopGroup原始碼分析
netty伺服器端啟動流程原始碼分析
netty伺服器端啟動流程總結
netty處理使用者端新連線原始碼分析
//使用設定構建器,設定需要繫結的處理器
serverBootstrap.group(bossEventLoopGroup, workEventExecutors)
.channel(NioServerSocketChannel.class)
.localAddress(port)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new MyHandler()).addLast(new MyHandler2());
}
});
//1.childHandler設定項是給worker的channel繫結的Handler(子通道所屬)
//2.Handler設定項是給boss的channel繫結的Handler(父通道所屬)
//ServerBootstrapAcceptor的channelRead方法
//這個是父通道處理器的處理邏輯,我們這直接分析子通道的處理器鏈,這也是我們比較關心的
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
//ServerBootstrapAcceptor是我們ServerBootstra的靜態內部類,直接參照childHandler
//新連線的通道直接把我們構建時候的ChannelInitializer物件新增到處理器鏈中
//同時看到,ChannelHandler是新增到pipeline中的
child.pipeline().addLast(childHandler);
//省去不需要程式碼
}
//DefaultChannelPipeline的addLast方法
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler);
//將處理器包裝為一個AbstractChannelHandlerContext,這種處理器包裝了一些上下文資訊在裡面
//處理器鏈的所有處理器都是AbstractChannelHandlerContext型別
newCtx = newContext(group, filterName(name, handler), handler);
//將新的handler新增到處理鏈中
addLast0(newCtx);
//省去不需要程式碼
return this;
}
//將新的處理器新增到一個雙向連結串列中
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}
//1.這裡只是對處理器鏈做了一個簡單的初始化,只是把ChannelInitializer加入到處理器鏈中
//2.ChannelInitializer可以理解為我們設定的handler的一個包裝
//3.為什麼不直接將我們所有自定義的處理器新增到處理器鏈中,而是將一個處理器包裝暫時放入
// 處理器鏈中?
//ServerBootstrapAcceptor的channelRead方法
public void channelRead(ChannelHandlerContext ctx, Object msg) {
//省去不需要程式碼
try {
//這裡將完成一個新子通道所有初始化的動作,其實這裡直接是做了非同步去直接做這些事情的
//處理器鏈的真正新增是在這個非同步任務裡面的
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
//AbstractUnsafe的register
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
//省去不需要程式碼
//使用非同步任務去做所有初始化通道的事情,像迴圈新增所有處理器鏈這種動作,也很耗時
//容易影響處理新連線的效率,不適合放在主執行緒中
eventLoop.execute(new Runnable() {
@Override
public void run() {
//迴圈新增所有自定義處理器,放入處理器鏈中
register0(promise);
}
});
//省去不需要程式碼
}
//AbstractUnsafe的register0
private void register0(ChannelPromise promise) {
//省去不需要程式碼
//迴圈新增所有自定義處理器,放入處理器鏈中
pipeline.invokeHandlerAddedIfNeeded();
}
//ChannelInitializer的initChannel
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance.
try {
//回撥我們的內部類實現initChannel(SocketChannel socketChannel)方法
initChannel((C) ctx.channel());
} catch (Throwable cause) {
exceptionCaught(ctx, cause);
} finally {
//新增完畢後移除之前處理器包裝
remove(ctx);
}
return true;
}
return false;
}
//我們自定義啟動類裡面
//將包裝處理裡面的自定義處理器放入處理器鏈中
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
//將每個處理器放入處理器鏈中
socketChannel.pipeline().addLast(new MyHandler()).addLast(new MyHandler2());
}
});
//迴圈處理
@Override
public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
if (handlers == null) {
throw new NullPointerException("handlers");
}
//迴圈處理
for (ChannelHandler h: handlers) {
if (h == null) {
break;
}
addLast(executor, null, h);
}
return this;
}
//刪除處理鏈中的包裝處理器
private void remove(ChannelHandlerContext ctx) {
try {
ChannelPipeline pipeline = ctx.pipeline();
if (pipeline.context(this) != null) {
pipeline.remove(this);
}
} finally {
initMap.remove(ctx);
}
}
總結:現在知道為什麼在這裡正真新增我們的處理器了吧,註冊一個channel,整體都是在非同步任務的子執行緒中的
這樣不會阻塞或者影響連線處理