業(yè)余從事網(wǎng)站開發(fā)杭州seo按天計(jì)費(fèi)
????????ZooKeeper應(yīng)用的開發(fā)主要通過Java客戶端API去連接和操作ZooKeeper集群??晒┻x擇的Java客戶端API有:
- ZooKeeper官方的Java客戶端API。
- 第三方的Java客戶端API,比如Curator。
? ? ? ? 接下來我們將逐一學(xué)習(xí)一下這兩個(gè)java客戶端是如何操作zookeeper的。
1.?ZooKeeper官方的Java客戶端
1.1 簡介
????????ZooKeeper官方的客戶端API提供了基本的操作。例如,創(chuàng)建會(huì)話、創(chuàng)建節(jié)點(diǎn)、讀取節(jié)點(diǎn)、更新數(shù)據(jù)、刪除節(jié)點(diǎn)和檢查節(jié)點(diǎn)是否存在等。不過,對于實(shí)際開發(fā)來說,ZooKeeper官方API有一些不足之處,具體如下:
- ZooKeeper的Watcher監(jiān)聽是一次性的,每次觸發(fā)之后都需要重新進(jìn)行注冊。
- 會(huì)話超時(shí)之后沒有實(shí)現(xiàn)重連機(jī)制。
- 異常處理煩瑣,ZooKeeper提供了很多異常,對于開發(fā)人員來說可能根本不知道應(yīng)該如何處理這些拋出的異常。
- 僅提供了簡單的byte[]數(shù)組類型的接口,沒有提供Java POJO級別的序列化數(shù)據(jù)處理接口。
- 創(chuàng)建節(jié)點(diǎn)時(shí)如果拋出異常,需要自行檢查節(jié)點(diǎn)是否存在。
- 無法實(shí)現(xiàn)級聯(lián)刪除。
總之,ZooKeeper官方API功能比較簡單,在實(shí)際開發(fā)過程中比較笨重,一般不推薦使用。
1.2 基礎(chǔ)使用
使用zookeeper原生客戶端,需要引入zookeeper客戶端的依賴。
<dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.8.3</version></dependency>
注意:保持與服務(wù)端版本一致,不然會(huì)有很多兼容性的問題。
????????ZooKeeper原生客戶端主要使用org.apache.zookeeper.ZooKeeper這個(gè)類來調(diào)用ZooKeeper服務(wù)的。
1.2.1 連接zk集群
ZooKeeper構(gòu)造器:
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher) throws IOException {this(connectString, sessionTimeout, watcher, false);}
?connectString:使用逗號分隔的列表,每個(gè)ZooKeeper節(jié)點(diǎn)是一個(gè)host.port對,host 是機(jī)器名或者IP地址,port是ZooKeeper節(jié)點(diǎn)對客戶端提供服務(wù)的端口號??蛻舳藭?huì)任意選取connectString 中的一個(gè)節(jié)點(diǎn)建立連接。
sessionTimeout : session timeout時(shí)間。
watcher:用于接收到來自ZooKeeper集群的事件。
? ? 如何使用客戶端構(gòu)造器與服務(wù)端建立連接:
????建立連接的工具類;因?yàn)閦ookepper建立連接時(shí)特別慢,所以采用了CountDownLatch同步工具類,等待zookeeper客戶端與服務(wù)端建立完成后,繼續(xù)后續(xù)操作。
public class ZooKeeperFacotry {private static final int SESSION_TIMEOUT = 5000;public static ZooKeeper create(String connectionString) throws Exception {final CountDownLatch connectionLatch = new CountDownLatch(1);ZooKeeper zooKeeper = new ZooKeeper(connectionString, SESSION_TIMEOUT, new Watcher() {@Overridepublic void process(WatchedEvent event) {if (event.getType()== Event.EventType.None&& event.getState() == Watcher.Event.KeeperState.SyncConnected) {connectionLatch.countDown();System.out.println("連接建立");}}});System.out.println("等待連接建立...");connectionLatch.await();return zooKeeper;}}
public class ZkClientDemo {private final static String CLUSTER_CONNECT_STR="192.168.31.5:2181,192.168.31.176:2181,192.168.31.232:2181";public static void main(String[] args) throws Exception {//創(chuàng)建zookeeper對象ZooKeeper zooKeeper = ZooKeeperFacotry.create(CLUSTER_CONNECT_STR);//連接System.out.println(zooKeeper.getState()); }
運(yùn)行結(jié)果:
1.2.2?操作節(jié)點(diǎn)
以下是Zookeeper原生客戶端操作服務(wù)端的一些主要API:
- create(path, data, acl,createMode): 創(chuàng)建一個(gè)給定路徑的 znode,并在 znode 保存 data[]的 數(shù)據(jù),createMode指定 znode 的類型。
- delete(path, version):如果給定 path 上的 znode 的版本和給定的 version 匹配, 刪除 znode。
- exists(path, watch):判斷給定 path 上的 znode 是否存在,并在 znode 設(shè)置一個(gè) watch。
- getData(path, watch):返回給定 path 上的 znode 數(shù)據(jù),并在 znode 設(shè)置一個(gè) watch。
- setData(path, data, version):如果給定 path 上的 znode 的版本和給定的 version 匹配,設(shè)置znode 數(shù)據(jù)。
- getChildren(path, watch):返回給定 path 上的 znode 的孩子 znode 名字,并在 znode 設(shè)置一個(gè) watch。
- sync(path):把客戶端 session 連接節(jié)點(diǎn)和 leader 節(jié)點(diǎn)進(jìn)行同步。
API特點(diǎn):
- 所有獲取 znode 數(shù)據(jù)的 API 都可以設(shè)置一個(gè) watch 用來監(jiān)控 znode 的變化。
- 所有更新 znode 數(shù)據(jù)的 API 都有兩個(gè)版本: 無條件更新版本和條件更新版本。如果 version 為 -1,更新為無條件更新。否則只有給定的 version 和 znode 當(dāng)前的 version 一樣,才會(huì)進(jìn)行更新,這樣的更新是條件更新。
- 所有的方法都有同步和異步兩個(gè)版本。同步版本的方法發(fā)送請求給 ZooKeeper 并等待服務(wù)器的響 應(yīng)。異步版本把請求放入客戶端的請求隊(duì)列,然后馬上返回。異步版本通過 callback 來接受來 自服務(wù)端的響應(yīng)。
接下來,我們利用這些API對zk的節(jié)點(diǎn)進(jìn)行簡單的操作
1.2.2.1 創(chuàng)建持久節(jié)點(diǎn)
代碼:
public class ZkClientDemo {private final static String CLUSTER_CONNECT_STR="192.168.31.5:2181,192.168.31.176:2181,192.168.31.232:2181";public static void main(String[] args) throws Exception {//創(chuàng)建zookeeper對象ZooKeeper zooKeeper = ZooKeeperFacotry.create(CLUSTER_CONNECT_STR);//連接System.out.println(zooKeeper.getState());Stat stat = zooKeeper.exists("/order",false);if(null ==stat){//創(chuàng)建持久節(jié)點(diǎn)zooKeeper.create("/order","001".getBytes(),ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);}System.out.println("執(zhí)行完了");
}
?運(yùn)行的結(jié)果:
我們在服務(wù)器的客戶端查看一下數(shù)據(jù):
有數(shù)據(jù),創(chuàng)建成功了。?
1.2.2.2 永久監(jiān)聽節(jié)點(diǎn)
代碼:
public class ZkClientDemo {private final static String CLUSTER_CONNECT_STR="192.168.31.5:2181,192.168.31.176:2181,192.168.31.232:2181";public static void main(String[] args) throws Exception {//創(chuàng)建zookeeper對象ZooKeeper zooKeeper = ZooKeeperFacotry.create(CLUSTER_CONNECT_STR);//連接System.out.println(zooKeeper.getState());Stat stat = zooKeeper.exists("/order",false);//永久監(jiān)聽 addWatch -m modezooKeeper.addWatch("/order",new Watcher() {@Overridepublic void process(WatchedEvent event) {System.out.println(event);//TODO}},AddWatchMode.PERSISTENT);Thread.sleep(Integer.MAX_VALUE);
}
啟動(dòng)程序后,在服務(wù)器的客戶端修改監(jiān)聽的節(jié)點(diǎn)。?
?程序監(jiān)聽到?/order這個(gè)節(jié)點(diǎn)被修改了。
1.2.2.3?根據(jù)版本更新
代碼:
public class ZkClientDemo {private final static String CLUSTER_CONNECT_STR="192.168.31.5:2181,192.168.31.176:2181,192.168.31.232:2181";public static void main(String[] args) throws Exception {//創(chuàng)建zookeeper對象ZooKeeper zooKeeper = ZooKeeperFacotry.create(CLUSTER_CONNECT_STR);//連接System.out.println(zooKeeper.getState());Stat stat = zooKeeper.exists("/order",false);stat = new Stat();byte[] data = zooKeeper.getData("/order", false, stat);System.out.println(" data: "+new String(data));// -1: 無條件更新//zooKeeper.setData("/user", "third".getBytes(), -1);// 帶版本條件更新int version = stat.getVersion();zooKeeper.setData("/order", "updateByVersion".getBytes(), version);Thread.sleep(Integer.MAX_VALUE);}}
第一次打印的數(shù)據(jù)結(jié)果:?
待程序執(zhí)行完,在服務(wù)器客戶端查詢的結(jié)果:更具版本更新數(shù)據(jù)成功了。
????????對于zookeeper java原生客戶端的使用,就簡單介紹這么多,不在過多贅述,只是為了是讓大家感受一下,原生客戶端的使用。實(shí)際開發(fā)中并不推薦使用zk的原生客戶端,過于笨重。
2.?開源的第三方客戶端:Curator
2.1 簡介
? ? ? ? 官網(wǎng):https://curator.apache.org/
????????Curator是Netflix公司開源的一套ZooKeeper客戶端框架,和ZkClient一樣它解決了非常底層的細(xì)節(jié)開發(fā)工作,包括連接、重連、反復(fù)注冊Watcher的問題以及NodeExistsException異常等。
????????Curator是Apache基金會(huì)的頂級項(xiàng)目之一,Curator具有更加完善的文檔,另外還提供了一套易用性和可讀性更強(qiáng)的Fluent風(fēng)格的客戶端API框架。
????????Curator還為ZooKeeper客戶端框架提供了一些比較普遍的、開箱即用的、分布式開發(fā)用的解決方案,例如Recipe、共享鎖服務(wù)、Master選舉機(jī)制和分布式計(jì)算器等,幫助開發(fā)者避免了“重復(fù)造輪子”的無效開發(fā)工作。
????????在實(shí)際的開發(fā)場景中,使用Curator客戶端就足以應(yīng)付日常的ZooKeeper集群操作的需求。
2.2 基礎(chǔ)使用
????????使用Curator我們需要引入依賴,官網(wǎng)上介紹到,Curator有多個(gè)?artifacts,使用者根據(jù)自己的需求,進(jìn)行引入依賴。對于大多數(shù)使用者來說,引入curator-recipes就足夠了。
Curator的幾個(gè)重要包介紹:
curator-framework:是對ZooKeeper的底層API的一些封裝。
curator-client:提供了一些客戶端的操作,例如重試策略等。
curator-recipes:封裝了一些高級特性,如:Cache事件監(jiān)聽、選舉、分布式鎖、分布式計(jì)數(shù)器、分布式Barrier等。
依賴: 以防Curator中的zookeeper版本和我們的服務(wù)端不匹配,我們需要排除Curator中的zookeeper的客戶端,自己手動(dòng)引入合適版本的zk的客戶端。
<!-- zookeeper client --><dependency><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId><version>3.8.3</version></dependency><!--curator--><dependency><groupId>org.apache.curator</groupId><artifactId>curator-recipes</artifactId><version>5.1.0</version><exclusions><exclusion><groupId>org.apache.zookeeper</groupId><artifactId>zookeeper</artifactId></exclusion></exclusions></dependency>
2.2.1 連接zk集群
? ? ? ? 連接zk集群,我們需要?jiǎng)?chuàng)建一個(gè)客戶端示例,然后啟動(dòng)客戶端。創(chuàng)建客戶端實(shí)例的方法有兩種。
方法一:使用工廠類CuratorFrameworkFactory的靜態(tài)newClient()方法。
public class CuratorDemo {private final static String CLUSTER_CONNECT_STR="192.168.31.5:2181,192.168.31.176:2181,192.168.31.232:2181";public static void main(String[] args) throws Exception {// 重試策略RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);//創(chuàng)建客戶端實(shí)例CuratorFramework client = CuratorFrameworkFactory.newClient(CLUSTER_CONNECT_STR, retryPolicy);//啟動(dòng)客戶端client.start();Thread.sleep(Integer.MAX_VALUE);} }
方法二:使用工廠類CuratorFrameworkFactory的靜態(tài)builder構(gòu)造者方法。?
public class CuratorDemo {private final static String CLUSTER_CONNECT_STR="192.168.31.5:2181,192.168.31.176:2181,192.168.31.232:2181";public static void main(String[] args) throws Exception {//構(gòu)建客戶端實(shí)例CuratorFramework curatorFramework= CuratorFrameworkFactory.builder().connectString(CLUSTER_CONNECT_STR).retryPolicy(new ExponentialBackoffRetry(1000,3)) // 設(shè)置重試策略.build();//啟動(dòng)客戶端curatorFramework.start();Thread.sleep(Integer.MAX_VALUE);} }
參數(shù)解釋:
connectionString:服務(wù)器地址列表,在指定服務(wù)器地址列表的時(shí)候可以是一個(gè)地址,也可以是多個(gè)地址。如果是多個(gè)地址,那么每個(gè)服務(wù)器地址列表用逗號分隔, 如 host1:port1,host2:port2,host3;port3 。
retryPolicy:重試策略,當(dāng)客戶端異常退出或者與服務(wù)端失去連接的時(shí)候,可以通過設(shè)置客戶端重新連接ZooKeeper 服務(wù)端。而 Curator 提供了 一次重試、多次重試等不同種類的實(shí)現(xiàn)方式。在 Curator 內(nèi)部,可以通過判斷服務(wù)器返回的 keeperException 的狀態(tài)代碼來判斷是否進(jìn)行重試處理,如果返回的是 OK 表示一切操作都沒有問題,而 SYSTEMERROR 表示系統(tǒng)或服務(wù)端錯(cuò)誤。
超時(shí)時(shí)間:Curator 客戶端創(chuàng)建過程中,有兩個(gè)超時(shí)時(shí)間的設(shè)置。一個(gè)是 sessionTimeoutMs 會(huì)話超時(shí)時(shí)間,用來設(shè)置該條會(huì)話在 ZooKeeper 服務(wù)端的失效時(shí)間。另一個(gè)是 connectionTimeoutMs 客戶端創(chuàng)建會(huì)話的超時(shí)時(shí)間,用來限制客戶端發(fā)起一個(gè)會(huì)話連接到接收 ZooKeeper 服務(wù)端應(yīng)答的時(shí)間。sessionTimeoutMs 作用在服務(wù)端,而 connectionTimeoutMs 作用在客戶端。
2.2.2 節(jié)點(diǎn)操作
????????描述一個(gè)節(jié)點(diǎn)要包括節(jié)點(diǎn)的類型,即臨時(shí)節(jié)點(diǎn)還是持久節(jié)點(diǎn)、節(jié)點(diǎn)的數(shù)據(jù)信息、節(jié)點(diǎn)是否是有序節(jié)點(diǎn)等屬性和性質(zhì)。接下來,我們逐一看一下利用Curator如何操作一個(gè)節(jié)點(diǎn)的。? ?
2.2.2.1?創(chuàng)建節(jié)點(diǎn)
? ? 那么如何用Curator來創(chuàng)建一個(gè)節(jié)點(diǎn)呢?
????????在 Curator 中,可以使用 create 函數(shù)創(chuàng)建數(shù)據(jù)節(jié)點(diǎn),并通過 withMode 函數(shù)指定節(jié)點(diǎn)類型(持久化節(jié)點(diǎn),臨時(shí)節(jié)點(diǎn),順序節(jié)點(diǎn),臨時(shí)順序節(jié)點(diǎn),持久化順序節(jié)點(diǎn)等),默認(rèn)是持久化節(jié)點(diǎn),之后調(diào)用forPath 函數(shù)來指定節(jié)點(diǎn)的路徑和數(shù)據(jù)信息。
第一步: 我們在服務(wù)器客戶端,查看一下當(dāng)前服務(wù)端有哪些節(jié)點(diǎn)
第二步:利用Curator創(chuàng)建一個(gè)持久節(jié)點(diǎn),運(yùn)行下述代碼
public class CuratorDemo {private final static String CLUSTER_CONNECT_STR="192.168.31.5:2181,192.168.31.176:2181,192.168.31.232:2181";public static void main(String[] args) throws Exception {//構(gòu)建客戶端實(shí)例CuratorFramework curatorFramework= CuratorFrameworkFactory.builder().connectString(CLUSTER_CONNECT_STR).retryPolicy(new ExponentialBackoffRetry(1000,3)) // 設(shè)置重試策略.build();//啟動(dòng)客戶端curatorFramework.start();String path = "/users/user";// 檢查節(jié)點(diǎn)是否存在Stat stat = curatorFramework.checkExists().forPath(path);if (stat != null) {// 刪除節(jié)點(diǎn)curatorFramework.delete().deletingChildrenIfNeeded() // 如果存在子節(jié)點(diǎn),則刪除所有子節(jié)點(diǎn).forPath(path); // 刪除指定節(jié)點(diǎn)}// 創(chuàng)建節(jié)點(diǎn)curatorFramework.create().creatingParentsIfNeeded() // 如果父節(jié)點(diǎn)不存在,則創(chuàng)建父節(jié)點(diǎn).withMode(CreateMode.PERSISTENT).forPath(path, "user".getBytes());Thread.sleep(Integer.MAX_VALUE);} }
第三步: 我們再次在服務(wù)器的客戶端查看一下,節(jié)點(diǎn)創(chuàng)建成功了沒有
創(chuàng)建了users和其子節(jié)點(diǎn)user成功。?
?如果我們需要?jiǎng)?chuàng)建有序節(jié)點(diǎn),臨時(shí)節(jié)點(diǎn)則只需要改變代碼中的withMode的參數(shù)。具體參數(shù)如下:
PERSISTENT:持久節(jié)點(diǎn)
PERSISTENT_SEQUENTIAL:持久有序節(jié)點(diǎn)
EPHEMERAL:臨時(shí)節(jié)點(diǎn)
EPHEMERAL_SEQUENTIAL:臨時(shí)有序節(jié)點(diǎn)
CONTAINER:容器節(jié)點(diǎn)
PERSISTENT_WITH_TTL:持久TTL節(jié)點(diǎn)
PERSISTENT_SEQUENTIAL_WITH_TTL:持久有序TTL節(jié)點(diǎn)
?2.2.2.2 更新節(jié)點(diǎn)
? ? ? ? 那么我們?nèi)绾胃乱粋€(gè)節(jié)點(diǎn)呢?
????????我們通過客戶端實(shí)例的 setData() 方法更新 ZooKeeper 服務(wù)上的數(shù)據(jù)節(jié)點(diǎn),在setData 方法的后邊,通過 forPath 函數(shù)來指定更新的數(shù)據(jù)節(jié)點(diǎn)路徑以及要更新的數(shù)據(jù)。
第一步:服務(wù)器端的客戶端查看節(jié)點(diǎn)/users/user原始數(shù)據(jù):
第二步:更改/users/user節(jié)點(diǎn)的數(shù)據(jù),執(zhí)行以下代碼
public class CuratorDemo {private final static String CLUSTER_CONNECT_STR="192.168.31.5:2181,192.168.31.176:2181,192.168.31.232:2181";public static void main(String[] args) throws Exception {//構(gòu)建客戶端實(shí)例CuratorFramework curatorFramework= CuratorFrameworkFactory.builder().connectString(CLUSTER_CONNECT_STR).retryPolicy(new ExponentialBackoffRetry(1000,3)) // 設(shè)置重試策略.build();//啟動(dòng)客戶端curatorFramework.start();String path = "/users/user";// 更新節(jié)點(diǎn)數(shù)據(jù) set //users/user Update usercuratorFramework.setData().forPath(path, "Update user".getBytes());Thread.sleep(Integer.MAX_VALUE);} }
第三步:再次使用服務(wù)器端的客戶端查看/users/user的數(shù)據(jù)有沒有更新成功
更新成功了。?
2.2.2.3 查看節(jié)點(diǎn)?
? ? ? 那么我們?nèi)绾尾榭匆粋€(gè)節(jié)點(diǎn)呢?
????????我們通過客戶端實(shí)例的?getData() 方法更新 ZooKeeper 服務(wù)上的數(shù)據(jù)節(jié)點(diǎn),在getData 方法的后邊,通過 forPath 函數(shù)來指定查看的節(jié)點(diǎn)路徑。
第一步: 服務(wù)器端的客戶端查看/users/user的節(jié)點(diǎn)數(shù)據(jù)
第二步:執(zhí)行以下代碼,在java客戶端,查看/users/user的數(shù)據(jù)
public class CuratorDemo {private final static String CLUSTER_CONNECT_STR="192.168.31.5:2181,192.168.31.176:2181,192.168.31.232:2181";public static void main(String[] args) throws Exception {//構(gòu)建客戶端實(shí)例CuratorFramework curatorFramework= CuratorFrameworkFactory.builder().connectString(CLUSTER_CONNECT_STR).retryPolicy(new ExponentialBackoffRetry(1000,3)) // 設(shè)置重試策略.build();//啟動(dòng)客戶端curatorFramework.start();String path = "/users/user";byte[] bytes = curatorFramework.getData().forPath(path);System.out.println("get data from node :{/users/user} successfully."+new String(bytes));Thread.sleep(Integer.MAX_VALUE);} }
執(zhí)行結(jié)果:
?
2.2.2.4 刪除節(jié)點(diǎn)
? 那么我們?nèi)绾蝿h除一個(gè)節(jié)點(diǎn)呢?
????????我們通過客戶端實(shí)例的?delete() 方法刪除?ZooKeeper 服務(wù)上的數(shù)據(jù)節(jié)點(diǎn),在delete方法的后邊,通過 forPath 函數(shù)來指定刪除的節(jié)點(diǎn)路徑。
第一步:服務(wù)器端查看節(jié)點(diǎn)
第二步:執(zhí)行以下代碼 刪除users節(jié)點(diǎn)
public class CuratorDemo {private final static String CLUSTER_CONNECT_STR="192.168.31.5:2181,192.168.31.176:2181,192.168.31.232:2181";public static void main(String[] args) throws Exception {//構(gòu)建客戶端實(shí)例CuratorFramework curatorFramework= CuratorFrameworkFactory.builder().connectString(CLUSTER_CONNECT_STR).retryPolicy(new ExponentialBackoffRetry(1000,3)) // 設(shè)置重試策略.build();//啟動(dòng)客戶端curatorFramework.start();String path = "/users";//刪除節(jié)點(diǎn)curatorFramework.delete().guaranteed().deletingChildrenIfNeeded().forPath(path);Thread.sleep(Integer.MAX_VALUE);} }
第三步:服務(wù)器端客戶端查看節(jié)點(diǎn)是否還存在:users和其子節(jié)點(diǎn)user都被刪除了
?
刪除中一些函數(shù)含義:
guaranteed:該函數(shù)的功能如字面意思一樣,主要起到一個(gè)保障刪除成功的作用,其底層工作方式是:只要該客戶端的會(huì)話有效,就會(huì)在后臺持續(xù)發(fā)起刪除請求,直到該數(shù)據(jù)節(jié)點(diǎn)在ZooKeeper 服務(wù)端被刪除。
deletingChildrenIfNeeded:指定了該函數(shù)后,系統(tǒng)在刪除該數(shù)據(jù)節(jié)點(diǎn)的時(shí)候會(huì)以遞歸的方式直接刪除其子節(jié)點(diǎn),以及子節(jié)點(diǎn)的子節(jié)點(diǎn)。?
2.2.2.4 監(jiān)聽
????????Curator Caches:Curator 引入了 Cache 來實(shí)現(xiàn)對 Zookeeper 服務(wù)端事件監(jiān)聽,Cache 事件監(jiān)聽可以理解為一個(gè)本地緩存視圖與遠(yuǎn)程 Zookeeper 視圖的對比過程。Cache 提供了反復(fù)注冊的功能。Cache 分為兩類注冊類型:節(jié)點(diǎn)監(jiān)聽和子節(jié)點(diǎn)監(jiān)聽。
2.2.2.4.1 監(jiān)聽節(jié)點(diǎn)數(shù)據(jù)變化
第一步: 服務(wù)器端客戶端查看數(shù)據(jù)
?第二步: 啟動(dòng)以下代碼,監(jiān)聽/user節(jié)點(diǎn)數(shù)據(jù)
public class CuratorDemo {private final static String CLUSTER_CONNECT_STR="192.168.31.5:2181,192.168.31.176:2181,192.168.31.232:2181";public static void main(String[] args) throws Exception {//構(gòu)建客戶端實(shí)例CuratorFramework curatorFramework= CuratorFrameworkFactory.builder().connectString(CLUSTER_CONNECT_STR).retryPolicy(new ExponentialBackoffRetry(1000,3)) // 設(shè)置重試策略.build();//啟動(dòng)客戶端curatorFramework.start();String path = "/users";// 創(chuàng)建節(jié)點(diǎn)緩存,用于監(jiān)聽指定節(jié)點(diǎn)的變化final NodeCache nodeCache = new NodeCache(curatorFramework, path);// 啟動(dòng)NodeCache并立即從服務(wù)端獲取最新數(shù)據(jù)nodeCache.start(true);// 注冊節(jié)點(diǎn)變化監(jiān)聽器nodeCache.getListenable().addListener(new NodeCacheListener() {@Overridepublic void nodeChanged() throws Exception {byte[] newData = nodeCache.getCurrentData().getData();System.out.println("Node data changed: " + new String(newData));}});Thread.sleep(Integer.MAX_VALUE);} }
第三步:服務(wù)器端的客戶端更新/users節(jié)點(diǎn)的數(shù)據(jù),更改了4次
第四步:查看java客戶端 監(jiān)聽到了數(shù)據(jù)
?
2.2.2.4.2 監(jiān)聽一級子節(jié)點(diǎn)
第一步服務(wù)器端客戶端查看節(jié)點(diǎn)數(shù)據(jù):
第二步: 執(zhí)行以下代碼,監(jiān)聽節(jié)點(diǎn):
注意:PathChildrenCache 會(huì)對子節(jié)點(diǎn)進(jìn)行監(jiān)聽,但是不會(huì)對二級子節(jié)點(diǎn)進(jìn)行監(jiān)聽
?
public class CuratorDemo {private final static String CLUSTER_CONNECT_STR="192.168.31.5:2181,192.168.31.176:2181,192.168.31.232:2181";public static void main(String[] args) throws Exception {//構(gòu)建客戶端實(shí)例CuratorFramework curatorFramework= CuratorFrameworkFactory.builder().connectString(CLUSTER_CONNECT_STR).retryPolicy(new ExponentialBackoffRetry(1000,3)) // 設(shè)置重試策略.build();//啟動(dòng)客戶端curatorFramework.start();String path = "/users";// 創(chuàng)建PathChildrenCachePathChildrenCache pathChildrenCache = new PathChildrenCache(curatorFramework, path, true);pathChildrenCache.start();// 注冊子節(jié)點(diǎn)變化監(jiān)聽器pathChildrenCache.getListenable().addListener(new PathChildrenCacheListener() {@Overridepublic void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {if (event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED) {ChildData childData = event.getData();System.out.println("Child added: " + childData.getPath());} else if (event.getType() == PathChildrenCacheEvent.Type.CHILD_REMOVED) {ChildData childData = event.getData();System.out.println("Child removed: " + childData.getPath());} else if (event.getType() == PathChildrenCacheEvent.Type.CHILD_UPDATED) {ChildData childData = event.getData();System.out.println("Child updated: " + childData.getPath());}}});Thread.sleep(Integer.MAX_VALUE);} }
第三步:服務(wù)器端客戶端增加子節(jié)點(diǎn)、修改子節(jié)點(diǎn)、刪除子節(jié)點(diǎn)?
第四步:查看java客戶端控制臺消息,監(jiān)聽到了對應(yīng)修改
?
2.2.2.4.3 監(jiān)聽所有子節(jié)點(diǎn)?
第一步:服務(wù)器端客戶端查看節(jié)點(diǎn):
第二步:執(zhí)行以下代碼,進(jìn)行監(jiān)聽
public class CuratorDemo {private final static String CLUSTER_CONNECT_STR="192.168.31.5:2181,192.168.31.176:2181,192.168.31.232:2181";public static void main(String[] args) throws Exception {//構(gòu)建客戶端實(shí)例CuratorFramework curatorFramework= CuratorFrameworkFactory.builder().connectString(CLUSTER_CONNECT_STR).retryPolicy(new ExponentialBackoffRetry(1000,3)) // 設(shè)置重試策略.build();//啟動(dòng)客戶端curatorFramework.start();String path = "/users";Stat stat = curatorFramework.checkExists().forPath(path);if (stat==null){String s = curatorFramework.create().forPath(path);}TreeCache treeCache = new TreeCache(curatorFramework, path);treeCache.getListenable().addListener(new TreeCacheListener() {@Overridepublic void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {System.out.println(" tree cache: {}"+event.toString());Map<String, ChildData> currentChildren = treeCache.getCurrentChildren(path);System.out.println("currentChildren: {}"+ currentChildren.toString());}});treeCache.start();Thread.sleep(Integer.MAX_VALUE);} }
第三步:服務(wù)器端客戶端對節(jié)點(diǎn)進(jìn)行一些操作
?
第四步:查看java客戶端控制臺信息:監(jiān)聽到了對應(yīng)操作
3. 總結(jié)?
? ? ? ? 以上是對zookeeper java客戶端對zookeeper的基本操作,到此zookeeper java客戶端的介紹到此為止,更詳細(xì)的實(shí)踐應(yīng)用,后續(xù)會(huì)出一些zk常用場景是如何實(shí)現(xiàn)的文章。