視頻公司的網(wǎng)站設(shè)計模板網(wǎng)站建站公司
highlight: arduino-light
服務(wù)編排層:ChannelPipeline協(xié)調(diào)ChannelHandlerHandler
EventLoop可以說是 Netty 的調(diào)度中心,負責(zé)監(jiān)聽多種事件類型:I/O 事件、信號事件、定時事件等,然而實際的業(yè)務(wù)處理邏輯則是由 ChannelPipeline 中所定義的 ChannelHandler 完成的,ChannelPipeline 和 ChannelHandler應(yīng)用開發(fā)的過程中打交道最多的組件。
Netty 服務(wù)編排層的核心組件 ChannelPipeline 和 ChannelHandler 為用戶提供了 I/O 事件的全部控制權(quán)。
在學(xué)習(xí)之前,先思考幾個問題。
- ChannelPipeline 與 ChannelHandler 的關(guān)系是什么?它們之間是如何協(xié)同工作的?
- ChannelHandler 的類型有哪些?有什么區(qū)別?
- Netty 中 I/O 事件是如何傳播的?
ChannelPipeline 概述
Pipeline 的字面意思是管道、流水線。它在 Netty 中起到的作用,和一個工廠的流水線類似。
原始的網(wǎng)絡(luò)字節(jié)流經(jīng)過 Pipeline ,被一步步加工包裝,最后得到加工后的成品。
經(jīng)過前面課程核心組件的初步學(xué)習(xí),我們已經(jīng)對 ChannelPipeline 有了初步的印象:它是 Netty 的核心處理鏈,用以實現(xiàn)網(wǎng)絡(luò)事件的動態(tài)編排和有序傳播。
今天我們將從以下幾個方面一起探討 ChannelPipeline 的實現(xiàn)原理:
- ChannelPipeline 內(nèi)部結(jié)構(gòu);
- ChannelHandler 接口設(shè)計;
- ChannelPipeline 事件傳播機制;
- ChannelPipeline 異常傳播機制。
ChannelPipeline 內(nèi)部結(jié)構(gòu)
ChannelPipeline 可以看作是 ChannelHandler 的容器載體,它是由一組 ChannelHandler 實例組成的,內(nèi)部通過雙向鏈表將不同的 ChannelHandler 鏈接在一起,如下圖所示。
當有 I/O 讀寫事件觸發(fā)時,ChannelPipeline 會依次調(diào)用 ChannelHandler 列表對 Channel 的數(shù)據(jù)進行攔截和處理。
每個 ChannelHandler 都對應(yīng)一個 ChannelHandlerContext。
所以實際上 ChannelPipeline 維護的是它與 ChannelHandlerContext 的關(guān)系。
那么你可能會有疑問,為什么這里會多一層 ChannelHandlerContext 的封裝呢?
其實這是一種比較常用的編程思想。ChannelHandlerContext用于保存ChannelHandler。
ChannelHandlerContext包含了ChannelHandler生命周期的所有事件,如 connect、bind、read、 flush、write、close 等。
可以試想一下,如果沒有ChannelHandlerContext 的這層封裝,那么我們在做 ChannelHandler 之間傳遞的時 候。前置后置的通用邏輯就要在每個 ChannelHandler 里都實現(xiàn)一份。
這樣雖然能解決問題,但是代碼結(jié)構(gòu)的耦合,會非常不優(yōu)雅。
根據(jù)網(wǎng)絡(luò)數(shù)據(jù)的流向,ChannelPipeline 分為入站 ChannelInboundHandler和出站 ChannelOutboundHandler。
服務(wù)端接收到客戶端數(shù)據(jù)需要先經(jīng)過 Decoder 入站處理后,再通過 Encoder 出站通知客戶端。
ChannelPipeline是雙向鏈表的構(gòu)造。
ChannelPipeline 的雙向鏈表分別維護了HeadContext 和 TailContext 的頭尾節(jié)點。
我們自定義的ChannelHandler會插入到 Head 和 Tail 之間,這兩個節(jié)點在 Netty 中已經(jīng)默認實現(xiàn)了,它們在
ChannelPipeline 中起到了至關(guān)重要的作用。
首先我們看下 HeadContext 和 TailContext 的繼承關(guān)系,如下圖所示。
HeadContext:in\&out
對于1個請求先由HeadContext處理入棧,經(jīng)過一系列的入棧處理器然后傳遞到TailContext,由TailContext往下傳遞經(jīng)過一系列的出棧處理器,最后再經(jīng)過HeadContext返回。 ```md
<---6 <---5 <---4 HeadContext InBoundHandlerOne TailContext OutBoundHandlerOne 1---> 2---> 3--->
順序是:12346 其中5是入棧已經(jīng)在2處理過 所以不需要處理。 ```
Inbound第一站
Outbound最后一站
HeadContext 既是 Inbound 處理器,也是 Outbound 處理器。 它分別實現(xiàn)了 ChannelInboundHandler 和 ChannelOutboundHandler。
網(wǎng)絡(luò)數(shù)據(jù)寫入操作的入口就是由 HeadContext 節(jié)點完成的。HeadContext 作為 Pipeline 的頭結(jié)點負責(zé)讀取數(shù)據(jù)并開始傳遞 InBound 事件,當數(shù)據(jù)處理完成后,數(shù)據(jù)會反方向經(jīng)過 Outbound 處理器,最終傳遞到 HeadContext,所以 HeadContext 既是數(shù)據(jù)讀取Inbound事件的第一站又是處理 Outbound 事件的最后一站。
此外 HeadContext 在傳遞事件之前,還會執(zhí)行一些前置操作。
TailContext
Outbound第一站
TailContext 只實現(xiàn)了 ChannelInboundHandler 接口。它會在 ChannelInboundHandler 調(diào)用鏈路的最后一步執(zhí)行,主要用于終止 Inbound 事件傳播,例如釋放 Message 數(shù)據(jù)資源等。
TailContext 節(jié)點作為 OutBound 事件傳播的第一站,僅僅是將 OutBound 事件傳遞給下一個節(jié)點。
從整個 ChannelPipeline 調(diào)用鏈路來看,如果由 Channel 直接觸發(fā)事件傳播,那么調(diào)用鏈路將貫穿整個 ChannelPipeline。
然而也可以在其中某一個 ChannelHandlerContext 觸發(fā)同樣的方法,這樣只會從當前的 ChannelHandler 開始執(zhí)行事件傳播,該過程不會從頭貫穿到尾,在一定場景下,可以提高程序性能。
ChannelHandler 接口設(shè)計
在學(xué)習(xí) ChannelPipeline 事件傳播機制之前,我們需要了解 I/O 事件的生命周期。
整個 ChannelHandler 是圍繞 I/O 事件的生命周期所設(shè)計的,例如建立連接、讀數(shù)據(jù)、寫數(shù)據(jù)、連接銷毀等。
ChannelHandler 有兩個重要的子接口:ChannelInboundHandler和ChannelOutboundHandler,分別攔截入
站和出站的各種 I/O 事件。
1. ChannelInboundHandler 的事件回調(diào)方法與觸發(fā)時機。
?
| 事件回調(diào)方法 | 觸發(fā)時機 | | ------------------------- | --------------------------------- | | channelRegistered | Channel 被注冊到 EventLoop | | channelUnregistered | Channel 從 EventLoop 中取消注冊 | | channelActive | Channel 處于就緒狀態(tài),可以被讀寫 | | channelInactive | Channel 處于非就緒狀態(tài)Channel 可以從遠端讀取到數(shù)據(jù) | | channelRead | Channel 可以從遠端讀取到數(shù)據(jù) | | channelReadComplete | Channel 讀取數(shù)據(jù)完成 | | userEventTriggered | 用戶事件觸發(fā)時 | | channelWritabilityChanged | Channel 的寫狀態(tài)發(fā)生變化 | | handlerAdded | 當該處理器被添加到pipeline時 |
?
2. ChannelOutboundHandler 的事件回調(diào)方法與觸發(fā)時機。
ChannelOutboundHandler 的事件回調(diào)方法非常清晰,直接通過 ChannelOutboundHandler 的接口列表可以看到每種操作所對應(yīng)的回調(diào)方法,如下圖所示。
這里每個回調(diào)方法都是在相應(yīng)操作執(zhí)行之前觸發(fā),在此就不多做贅述了。
此外 ChannelOutboundHandler 中絕大部分接口都包含ChannelPromise 參數(shù),以便于在操作完成時能夠及時獲得通知。
事件
事件枚舉
public static final int OP_READ = 1 << 0;//1
public static final int OP_WRITE = 1 << 2;//4
public static final int OP_CONNECT = 1 << 3;//8
public static final int OP_ACCEPT = 1 << 4;//16
事件傳播機制
在上文中我們介紹了 ChannelPipeline 可分為入站 ChannelInboundHandler 和出站 ChannelOutboundHandler 兩種處理器,與此對應(yīng)傳輸?shù)氖录愋涂梢苑譃?strong>Inbound 事件和Outbound 事件。
我們通過一個代碼示例,一起體驗下 ChannelPipeline 的事件傳播機制。 ```java package io.netty.example.pipeline;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioChannelOption;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;/*** Echoes back any received data from a client.*/
public final class EchoServer {public static void main(String[] args) throws Exception {EventLoopGroup workerGroup = new NioEventLoopGroup();EventLoopGroup bossGroup = new NioEventLoopGroup();try {ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup)//通過反射創(chuàng)建反射工廠類根據(jù)無參構(gòu)造函數(shù) 反射生成實例//將NioServerSocketChannel綁定到了bossGroup//NioServerSocketChannel接收到請求會創(chuàng)建SocketChannel放入workerGroup.channel(NioServerSocketChannel.class)//指的是SocketChannel.childOption(ChannelOption.SO_KEEPALIVE, true)//指的是SocketChannel.childOption(NioChannelOption.SO_KEEPALIVE, Boolean.TRUE)//默認不使用堆外內(nèi)存.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)//false 不使用堆外內(nèi)存.childOption(ChannelOption.ALLOCATOR, new UnpooledByteBufAllocator(false))// .handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ChannelPipeline p = ch.pipeline();//正常情況p.addLast("SampleInBoundHandlerA", new SampleInBoundHandler("SampleInBoundHandlerA", false));p.addLast("SampleInBoundHandlerB", new SampleInBoundHandler("SampleInBoundHandlerB", false));p.addLast("SampleInBoundHandlerC", new SampleInBoundHandler("SampleInBoundHandlerC", true));p.addLast("SampleOutBoundHandlerA", new SampleOutBoundHandler("SampleOutBoundHandlerA"));p.addLast("SampleOutBoundHandlerB", new SampleOutBoundHandler("SampleOutBoundHandlerB"));p.addLast("SampleOutBoundHandlerC", new SampleOutBoundHandler("SampleOutBoundHandlerC"));}});ChannelFuture f = b.bind(8090).sync();f.channel().closeFuture().sync();} finally {workerGroup.shutdownGracefully();}}
}
package io.netty.example.pipeline;import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;public final class EchoClient {public static void main(String[] args) throws Exception {// Configure the client.EventLoopGroup group = new NioEventLoopGroup();try {Bootstrap b = new Bootstrap();b.group(group).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ChannelPipeline p = ch.pipeline();p.addLast(new EchoClientHandler());}});// Start the client.ChannelFuture f = b.connect("127.0.0.1", 8090).sync();// Wait until the connection is closed.f.channel().closeFuture().sync();} finally {// Shut down the event loop to terminate all threads.group.shutdownGracefully();}}
}
package io.netty.example.pipeline;import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;import java.util.concurrent.TimeUnit;/*** Handler implementation for the echo client. It initiates the ping-pong* traffic between the echo client and server by sending the first message to* the server.*/
public class EchoClientHandler extends ChannelInboundHandlerAdapter {private final ByteBuf firstMessage;/*** Creates a client-side handler.*/public EchoClientHandler() {firstMessage = Unpooled.wrappedBuffer("I am echo message".getBytes());}@Overridepublic void channelActive(ChannelHandlerContext ctx) {System.out.println("客戶端發(fā)送消息" + firstMessage.toString());ctx.writeAndFlush(firstMessage);}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {// ctx.write(msg);}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws InterruptedException {TimeUnit.SECONDS.sleep(3);ctx.flush();}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {// cause.printStackTrace();ctx.close();}
}
package io.netty.example.pipeline;import io.netty.channel.*;public class SampleInBoundHandler extends ChannelInboundHandlerAdapter {private final String name;private final boolean flush;public SampleInBoundHandler(String name, boolean flush) {this.name = name;this.flush = flush;}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {//打印消息System.out.println("InBoundHandler: " + name);//只有是true的時候才會寫消息//否則只會讀消息if (flush) {ctx.channel().writeAndFlush(msg);} else {super.channelRead(ctx, msg);}}
}
package io.netty.example.pipeline;import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;public class SampleOutBoundHandler extends ChannelOutboundHandlerAdapter {private final String name;public SampleOutBoundHandler(String name) {this.name = name;}@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {System.out.println("OutBoundHandler: " + name);super.write(ctx, msg, promise);}
}
```
通過 Pipeline 的 addLast 方法分別添加了三個 InboundHandler 和 OutboundHandler,添加順序都是 A -> B -> C,下圖可以表示初始化后 ChannelPipeline 的內(nèi)部結(jié)構(gòu)。
*當客戶端向服務(wù)端發(fā)送請求時,會觸發(fā) SampleInBoundHandler 調(diào)用鏈的 channelRead 事件。經(jīng)過 SampleInBoundHandler 調(diào)用鏈處理完成后,在 SampleInBoundHandlerC 中會調(diào)用 writeAndFlush 方法向客戶端寫回數(shù)據(jù),此時會觸發(fā) SampleOutBoundHandler 調(diào)用鏈的 write 事件。** 最后我們看下代碼示例的控制臺輸出:
方向:IN先進先出OUT:先進后出
由此可見,Inbound 事件和 Outbound 事件的傳播方向是不一樣的。Inbound 事件的傳播方向為 Head -> Tail,而 Outbound 事件傳播方向是 Tail -> Head,兩者恰恰相反。 在 Netty 應(yīng)用編程中一定要理清楚事件傳播的順序。推薦你在系統(tǒng)設(shè)計時模擬客戶端和服務(wù)端的場景畫出 ChannelPipeline 的內(nèi)部結(jié)構(gòu)圖,以避免搞混調(diào)用關(guān)系。
異常傳播機制
ChannelPipeline 事件傳播的實現(xiàn)采用了經(jīng)典的責(zé)任鏈模式,調(diào)用鏈路環(huán)環(huán)相扣。那么如果有一個節(jié)點處理邏輯異常會出現(xiàn)什么現(xiàn)象呢?我們通過修改 SampleInBoundHandler 的實現(xiàn)來模擬業(yè)務(wù)邏輯異常: ```java package io.netty.example.pipeline;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioChannelOption;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;/*** Echoes back any received data from a client.*/
public final class EchoServer {public static void main(String[] args) throws Exception {EventLoopGroup workerGroup = new NioEventLoopGroup();EventLoopGroup bossGroup = new NioEventLoopGroup();try {ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup)//通過反射創(chuàng)建反射工廠類根據(jù)無參構(gòu)造函數(shù) 反射生成實例//將NioServerSocketChannel綁定到了bossGroup//NioServerSocketChannel接收到請求會創(chuàng)建SocketChannel放入workerGroup.channel(NioServerSocketChannel.class)//指的是SocketChannel.childOption(ChannelOption.SO_KEEPALIVE, true)//指的是SocketChannel.childOption(NioChannelOption.SO_KEEPALIVE, Boolean.TRUE)//默認不使用堆外內(nèi)存.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)//false 不使用堆外內(nèi)存.childOption(ChannelOption.ALLOCATOR, new UnpooledByteBufAllocator(false))// .handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ChannelPipeline p = ch.pipeline();//正常情況/*p.addLast("SampleInBoundHandlerA", new SampleInBoundHandler("SampleInBoundHandlerA", false));p.addLast("SampleInBoundHandlerB", new SampleInBoundHandler("SampleInBoundHandlerB", false));p.addLast("SampleInBoundHandlerC", new SampleInBoundHandler("SampleInBoundHandlerC", true));*/p.addLast("SampleInBoundHandlerA", new SampleExceptionInBoundHandler("SampleInBoundHandlerA", false));p.addLast("SampleInBoundHandlerB", new SampleExceptionInBoundHandler("SampleInBoundHandlerB", false));p.addLast("SampleInBoundHandlerC", new SampleExceptionInBoundHandler("SampleInBoundHandlerC", true));p.addLast("SampleOutBoundHandlerA", new SampleOutBoundHandler("SampleOutBoundHandlerA"));p.addLast("SampleOutBoundHandlerB", new SampleOutBoundHandler("SampleOutBoundHandlerB"));p.addLast("SampleOutBoundHandlerC", new SampleOutBoundHandler("SampleOutBoundHandlerC"));}});ChannelFuture f = b.bind(8090).sync();f.channel().closeFuture().sync();} finally {workerGroup.shutdownGracefully();}}
}
package io.netty.example.pipeline;import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;public class SampleExceptionInBoundHandler extends ChannelInboundHandlerAdapter {private final String name;private final boolean flush;public SampleExceptionInBoundHandler(String name, boolean flush) {this.name = name;this.flush = flush;}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) {System.out.println("InBoundHandler: " + name);if (flush) {ctx.channel().writeAndFlush(msg);} else {throw new RuntimeException("InBoundHandler: " + name);}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {System.out.println("InBoundHandlerException: " + name);ctx.fireExceptionCaught(cause);}
}
``` 在SampleExceptionInBoundHandler的 channelRead 事件處理中,第一個 A 節(jié)點就會拋出 RuntimeException。同時我們重寫了 ChannelInboundHandlerAdapter 中的 exceptionCaught 方法,只是在開頭加上了控制臺輸出,方便觀察異常傳播的行為。下面看一下代碼運行的控制臺輸出結(jié)果:
由輸出結(jié)果可以看出 ctx.fireExceptionCaugh 會將異常按順序從 Head 節(jié)點傳播到 Tail 節(jié)點。
如果用戶沒有對異常進行攔截處理,最后將由 Tail 節(jié)點統(tǒng)一處理,在 TailContext 源碼中可以找到具體實現(xiàn):
protected void onUnhandledInboundException(Throwable cause) {try {logger.warn("An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " + "It usually means the last handler in the pipeline did not handle the exception.",cause);} finally {ReferenceCountUtil.release(cause);}
}
雖然 Netty 中 TailContext 提供了兜底的異常處理邏輯,但是在很多場景下,并不能滿足我們的需求。假如你需要攔截指定的異常類型,并做出相應(yīng)的異常處理,應(yīng)該如何實現(xiàn)呢?我們接著往下看。
異常處理的最佳實踐
在 Netty 應(yīng)用開發(fā)的過程中,良好的異常處理機制會讓排查問題的過程事半功倍。所以推薦用戶對異常進行統(tǒng)一攔截,然后根據(jù)實際業(yè)務(wù)場景實現(xiàn)更加完善的異常處理機制。
通過異常傳播機制的學(xué)習(xí),我們應(yīng)該可以想到最好的方法是在 ChannelPipeline 自定義處理器的末端添加統(tǒng)一的異常處理器,此時 ChannelPipeline 的內(nèi)部結(jié)構(gòu)如下圖所示。
用戶自定義的異常處理器代碼示例如下: ```java package io.netty.example.pipeline;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.buffer.UnpooledByteBufAllocator;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioChannelOption;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;/*** Echoes back any received data from a client.*/
public final class EchoServer {public static void main(String[] args) throws Exception {EventLoopGroup workerGroup = new NioEventLoopGroup();EventLoopGroup bossGroup = new NioEventLoopGroup();try {ServerBootstrap b = new ServerBootstrap();b.group(bossGroup, workerGroup)//通過反射創(chuàng)建反射工廠類根據(jù)無參構(gòu)造函數(shù) 反射生成實例//將NioServerSocketChannel綁定到了bossGroup//NioServerSocketChannel接收到請求會創(chuàng)建SocketChannel放入workerGroup.channel(NioServerSocketChannel.class)//指的是SocketChannel.childOption(ChannelOption.SO_KEEPALIVE, true)//指的是SocketChannel.childOption(NioChannelOption.SO_KEEPALIVE, Boolean.TRUE)//默認不使用堆外內(nèi)存.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)//false 不使用堆外內(nèi)存.childOption(ChannelOption.ALLOCATOR, new UnpooledByteBufAllocator(false))// .handler(new LoggingHandler(LogLevel.INFO)).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ChannelPipeline p = ch.pipeline();//正常情況/*p.addLast("SampleInBoundHandlerA", new SampleInBoundHandler("SampleInBoundHandlerA", false));p.addLast("SampleInBoundHandlerB", new SampleInBoundHandler("SampleInBoundHandlerB", false));p.addLast("SampleInBoundHandlerC", new SampleInBoundHandler("SampleInBoundHandlerC", true));*/p.addLast("SampleInBoundHandlerA", new SampleExceptionInBoundHandler("SampleInBoundHandlerA", false));p.addLast("SampleInBoundHandlerB", new SampleExceptionInBoundHandler("SampleInBoundHandlerB", false));p.addLast("SampleInBoundHandlerC", new SampleExceptionInBoundHandler("SampleInBoundHandlerC", true));//添加異常處理器p.addLast(new ExceptionHandler());p.addLast("SampleOutBoundHandlerA", new SampleOutBoundHandler("SampleOutBoundHandlerA"));p.addLast("SampleOutBoundHandlerB", new SampleOutBoundHandler("SampleOutBoundHandlerB"));p.addLast("SampleOutBoundHandlerC", new SampleOutBoundHandler("SampleOutBoundHandlerC"));}});ChannelFuture f = b.bind(8090).sync();f.channel().closeFuture().sync();} finally {workerGroup.shutdownGracefully();}}
}
//進站出站異常處理器
public class ExceptionHandler extends ChannelDuplexHandler {@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {if (cause instanceof RuntimeException) {System.out.println("Handle Business Exception Success.");}}
}
``` 加入統(tǒng)一的異常處理器后,可以看到異常已經(jīng)被優(yōu)雅地攔截并處理掉了。這也是 Netty 推薦的最佳異常處理實踐。
總結(jié)
本節(jié)課我們深入分析了 Pipeline 的設(shè)計原理與事件傳播機制。
那么前面的幾個問題你是否已經(jīng)都找到答案,來做個簡單的總結(jié):
- ChannelPipeline 是雙向鏈表結(jié)構(gòu),包含 ChannelInboundHandler 和 ChannelOutboundHandler 兩種處理器。
- ChannelHandlerContext 是對 ChannelHandler 的封裝,每個 ChannelHandler 都對應(yīng)一個 ChannelHandlerContext,實際上 ChannelPipeline 維護的是與 ChannelHandlerContext 的關(guān)系。
- Inbound 事件和 Outbound 事件的傳播方向相反,Inbound 事件的傳播方向為 Head -> Tail,而 Outbound 事件傳播方向是 Tail -> Head。
- 異常事件的處理順序與 ChannelHandler 的添加順序相同,會依次向后傳播,與 Inbound 事件和 Outbound 事件無關(guān)。