Zookeeper是一個開源的分佈式的,爲分佈式應用提供協調服務的Apache專案。
在學習zookeeper之前,我們可以想下微信公衆號推播文章,微信伺服器有兩個功能。當公衆號作者寫文章後,點擊發布,這個時候微信伺服器做了兩件事情,第一件事:將作者的文章儲存下來,即實現了檔案系統的功能,儲存檔案。第二件事:給每個訂閱了該公衆號的微信使用者發送通知,而不給那些未訂閱的使用者推播,即實現了通知功能。
而zookeeper也有這兩大功能,即Zookeeper=檔案系統(可以在zk上儲存數據)+通知機制 機製。我們也可以在zookeeper上儲存數據,也可以連線到zookeeper伺服器事先訂閱的某個數據,然後zookeeper發現數據變化後,會通知用戶端。
1)Zookeeper:一個領導者(leader),多個跟隨者(follower)組成的叢集。
2)Leader負責進行投票的發起和決議,更新系統狀態
3)Follower用於接收客戶請求並向用戶端返回結果,在選舉Leader過程中參與投票
4)叢集中只要有半數以上節點存活,Zookeeper叢集就能正常服務。
5)全域性數據一致:每個server儲存一份相同的數據副本,client無論連線到哪個server,數據都是一致的。
6)更新請求順序進行,來自同一個client的更新請求按其發送順序依次執行。
7)數據更新原子性,一次數據更新要麼成功,要麼失敗。
8)實時性,在一定時間範圍內,client能讀到最新數據。
ZooKeeper數據模型的結構與Unix檔案系統很類似,整體上可以看作是一棵樹,每個節點稱做一個Znode,zookeeper把數據儲存在節點上。
很顯然zookeeper叢集自身維護了一套數據結構。這個儲存結構是一個樹形結構,其上的每一個節點,我們稱之爲"znode",每一個znode預設能夠儲存1MB的數據,每個ZNode都可以通過其路徑唯一標識,如圖1所示
1.官網首頁:
https://zookeeper.apache.org/
2.下載截圖,如圖5,6,7所示
1.叢集規劃
在hadoop003、hadoop004和hadoop005三個節點上部署Zookeeper。
在一個節點上設定好,再拷貝分發到其他機器上。
2.解壓安裝
1)解壓zookeeper安裝包到/opt/module/目錄下
[root@hadoop003 software]$ tar -zxvf zookeeper-3.4.10.tar.gz -C /opt/module/
(2)在/opt/module/zookeeper-3.4.10/這個目錄下建立zkData
mkdir -p zkData
(3)重新命名/opt/module/zookeeper-3.4.10/conf這個目錄下的zoo_sample.cfg爲zoo.cfg
cp zoo_sample.cfg zoo.cfg
3.設定zoo.cfg檔案
(1)具體設定
dataDir=/opt/module/zookeeper-3.4.10/zkData
增加如下設定
#######################cluster##########################
server.1=hadoop003:2888:3888
server.2=hadoop004:2888:3888
server.3=hadoop005:2888:3888
(2)設定參數解讀
server.A=B:C:D。
A是一個數字,表示這個是第幾號伺服器;
B是這個伺服器的ip地址;
C是這個伺服器與叢集中的Leader伺服器交換資訊的埠;
D是萬一叢集中的Leader伺服器掛了,需要一個埠來重新進行選舉,選出一個新的Leader,而這個埠就是用來執行選舉時伺服器相互通訊的埠。
叢集模式下設定一個檔案myid,這個檔案在dataDir目錄下,這個檔案裏面有一個數據就是A的值,Zookeeper啓動時讀取此檔案,拿到裏面的數據與zoo.cfg裏面的設定資訊比較從而判斷到底是哪個server。
4.叢集操作
(1)在/opt/module/zookeeper-3.4.10/zkData目錄下建立一個myid的檔案
touch myid
新增myid檔案,注意一定要在linux裏面建立,在notepad++裏面很可能亂碼
(2)編輯myid檔案
vim myid
在檔案中新增與server對應的編號:如1
(3)拷貝設定好的zookeeper分發到其他機器上
scp -r zookeeper-3.4.10/ hadoop004:/opt/module
scp -r zookeeper-3.4.10/ hadoop005:/opt/module
並分別修改myid檔案中內容爲2、3
(4)分別啓動zookeeper
[root@hadoop003 zookeeper-3.4.10]# bin/zkServer.sh start
[root@hadoop004 zookeeper-3.4.10]# bin/zkServer.sh start
[root@hadoop005 zookeeper-3.4.10]# bin/zkServer.sh start
(5)檢視狀態
jps
[root@hadoop003 zookeeper-3.4.10]# bin/zkServer.sh status
JMX enabled by default
Using config: /opt/module/zookeeper-3.4.10/bin/../conf/zoo.cfg
Mode: follower
[root@hadoop004 zookeeper-3.4.10]# bin/zkServer.sh status
JMX enabled by default
Using config: /opt/module/zookeeper-3.4.10/bin/../conf/zoo.cfg
Mode: leader
[root@hadoop005 zookeeper-3.4.5]# bin/zkServer.sh status
JMX enabled by default
Using config: /opt/module/zookeeper-3.4.10/bin/../conf/zoo.cfg
Mode: follower
解讀zoo.cfg 檔案中參數含義
1)czxid- create 引起這個znode建立的zxid,建立節點的事務的zxid
每次修改ZooKeeper狀態都會收到一個zxid形式的時間戳,也就是ZooKeeper事務ID。
事務ID是ZooKeeper中所有修改總的次序。每個修改都有唯一的zxid,如果zxid1小於zxid2,那麼zxid1在zxid2之前發生。
2)ctime - create znode被建立的毫秒數(從1970年開始)
3)mzxid - modify znode最後更新的zxid
4)mtime - modify znode最後修改的毫秒數(從1970年開始)
5)pZxid-znode最後更新的子節點zxid
6)cversion - znode子節點變化號,znode子節點修改次數
7)dataversion - znode數據變化號
8)aclVersion - znode存取控制列表的變化號
9)ephemeralOwner- 如果是臨時節點,這個是znode擁有者的session id。如果不是臨時節點則是0。
10)dataLength- znode的數據長度
11)numChildren - znode子節點數量
ZooKeeper 的寫數據流程主要分爲以下幾步,如圖11所示:
1)比如 Client 向 ZooKeeper 的 Server1 上寫數據,發送一個寫請求。
2)如果Server1不是Leader,那麼Server1 會把接受到的請求進一步轉發給Leader,因爲每個ZooKeeper的Server裏面有一個是Leader。這個Leader 會將寫請求廣播給各個Server,比如Server1和Server2, 各個Server寫成功後就會通知Leader。
3)當Leader收到大多數 Server 數據寫成功了,那麼就說明數據寫成功了。如果這裏三個節點的話,只要有兩個節點數據寫成功了,那麼就認爲數據寫成功了。寫成功之後,Leader會告訴Server1數據寫成功了。
4)Server1會進一步通知 Client 數據寫成功了,這時就認爲整個寫操作成功。ZooKeeper 整個寫數據流程就是這樣的。
1.啓動用戶端
[root@hadoop003 zookeeper-3.4.10]$ bin/zkCli.sh
2.顯示所有操作命令
[zk: localhost:2181(CONNECTED) 1] help
3.檢視當前znode中所包含的內容
[zk: localhost:2181(CONNECTED) 0] ls /
[zookeeper]
4.檢視當前節點數據並能看到更新次數等數據
[zk: localhost:2181(CONNECTED) 1] ls2 /
[zookeeper]
cZxid = 0x0
ctime = Thu Jan 01 08:00:00 CST 1970
mZxid = 0x0
mtime = Thu Jan 01 08:00:00 CST 1970
pZxid = 0x0
cversion = -1
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 1
5.建立普通節點
[zk: localhost:2181(CONNECTED) 2] create /app1 "hello app1"
Created /app1
[zk: localhost:2181(CONNECTED) 4] create /app1/server101 "192.168.1.101"
Created /app1/server101
6.獲得節點的值
[zk: localhost:2181(CONNECTED) 6] get /app1
hello app1
cZxid = 0x20000000a
ctime = Mon Jul 17 16:08:35 CST 2017
mZxid = 0x20000000a
mtime = Mon Jul 17 16:08:35 CST 2017
pZxid = 0x20000000b
cversion = 1
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 10
numChildren = 1
[zk: localhost:2181(CONNECTED) 8] get /app1/server101
192.168.1.101
cZxid = 0x20000000b
ctime = Mon Jul 17 16:11:04 CST 2017
mZxid = 0x20000000b
mtime = Mon Jul 17 16:11:04 CST 2017
pZxid = 0x20000000b
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 13
numChildren = 0
7.建立短暫節點
[zk: localhost:2181(CONNECTED) 9] create -e /app-emphemeral 8888
(1)在當前用戶端是能檢視到的
[zk: localhost:2181(CONNECTED) 10] ls /
[app1, app-emphemeral, zookeeper]
(2)退出當前用戶端然後再重新啓動用戶端
[zk: localhost:2181(CONNECTED) 12] quit
[bigdata@hadoop104 zookeeper-3.4.10]$ bin/zkCli.sh
(3)再次檢視根目錄下短暫節點已經刪除
[zk: localhost:2181(CONNECTED) 0] ls /
[app1, zookeeper]
8.建立帶序號的節點
(1)先建立一個普通的根節點app2
[zk: localhost:2181(CONNECTED) 11] create /app2 "app2"
(2)建立帶序號的節點
[zk: localhost:2181(CONNECTED) 13] create -s /app2/aa 888
Created /app2/aa0000000000
[zk: localhost:2181(CONNECTED) 14] create -s /app2/bb 888
Created /app2/bb0000000001
[zk: localhost:2181(CONNECTED) 15] create -s /app2/cc 888
Created /app2/cc0000000002
如果原節點下有1個節點,則再排序時從1開始,以此類推。
[zk: localhost:2181(CONNECTED) 16] create -s /app1/aa 888
Created /app1/aa0000000001
9.修改節點數據值
[zk: localhost:2181(CONNECTED) 2] set /app1 999
10.節點的值變化監聽
(1)在hadoop005主機上註冊監聽/app1節點數據變化
[zk: localhost:2181(CONNECTED) 26] get /app1 watch
(2)在hadoop004主機上修改/app1節點的數據
[zk: localhost:2181(CONNECTED) 5] set /app1 777
(3)觀察hadoop005主機收到數據變化的監聽
WATCHER::
WatchedEvent state:SyncConnected type:NodeDataChanged path:/app1
11.節點的子節點變化監聽(路徑變化)
(1)在hadoop005主機上註冊監聽/app1節點的子節點變化
[zk: localhost:2181(CONNECTED) 1] ls /app1 watch
[aa0000000001, server101]
(2)在hadoop004主機/app1節點上建立子節點
[zk: localhost:2181(CONNECTED) 6] create /app1/bb 666
Created /app1/bb
(3)觀察hadoop005主機收到子節點變化的監聽
WATCHER::
WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/app1
12.刪除節點
[zk: localhost:2181(CONNECTED) 4] delete /app1/bb
13.遞回刪除節點
[zk: localhost:2181(CONNECTED) 7] rmr /app2
14.檢視節點狀態
[zk: localhost:2181(CONNECTED) 12] stat /app1
cZxid = 0x20000000a
ctime = Mon Jul 17 16:08:35 CST 2017
mZxid = 0x200000018
mtime = Mon Jul 17 16:54:38 CST 2017
pZxid = 0x20000001c
cversion = 4
dataVersion = 2
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 3
numChildren = 2
注意:監聽只管用一次,要想再次監聽,只能再次執行監控命令。
1.建立一個Maven工程
2.新增pom檔案
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.8.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper -->
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.10</version>
</dependency>
</dependencies>
public class WithoutWatch {
ZooKeeper client=null;
@Before//建立用戶端
public void getClient() throws IOException {
client = new ZooKeeper("hadoop003:2181,hadoop004:2181,hadoop005:2181", 2000, null);
}
@Test//建立節點 報NullPointerExceptions=是因爲沒帶監聽,監聽設爲null
//參數一:節點名
//參數二:參數值
//參數三:存取許可權,所有用戶端都能存取
//參數四:永久節點
public void createNode() throws KeeperException, InterruptedException {
String node = client.create("/idea", "hello".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println(node);
}
@Test//獲取節點
public void getChildrenNode() throws KeeperException, InterruptedException {
List<String> children = client.getChildren("/",null,null);
for(String c:children){
System.out.println(c);
}
}
@Test//獲取節點值
//參數三:獲取該節點最新的數據
public void getNodeData() throws KeeperException, InterruptedException {
byte[] data = client.getData("/idea", false, null);
System.out.println(new String(data));
}
@Test//修改節點數據
//參數三:-1,表示修改最新版本號的數據
public void modifyNodeData() throws KeeperException, InterruptedException {
Stat stat = client.setData("/idea", "666".getBytes(), -1);
if(stat==null){
System.out.println("修改失敗");
}else{
System.out.println("修改成功");
}
}
@Test//刪除節點
public void deleteNode() throws KeeperException, InterruptedException {
client.delete("/idea",-1);
System.out.println("刪除成功");
}
}
public class WithWatch {
ZooKeeper client=null;
@Before//建立用戶端
public void getClient() throws IOException {
client = new ZooKeeper("hadoop003:2181,hadoop004:2181,hadoop005:2181", 2000, new Watcher() {
public void process(WatchedEvent watchedEvent) {
System.out.println(watchedEvent.getType()+"-"+watchedEvent.getPath());
try {
client.getChildren("/",true,null);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
@Test//獲取節點
public void getChildrenNode() throws KeeperException, InterruptedException {
// List<String> children = client.getChildren("/",true,null);
// for(String c:children){
// System.out.println(c);
// }
Thread.sleep(Long.MAX_VALUE);
}
}
1.需求
某分佈式系統中,主節點可以有多臺,可以動態上下線,任意一臺用戶端都能實時感知到主節點伺服器的上下線。
2.需求分析
1)提供時間查詢服務,用戶端系統呼叫該時間查詢服務。
2)動態上線,下線該時間查詢服務的節點,讓用戶端實時感知伺服器列表變化,查詢時候存取最新的機器節點。
3)具體實現:
先在叢集上建立/servers節點
[zk: localhost:2181(CONNECTED) 10] create /servers "servers"
Created /servers
public class server {
public static void main(String[] args) throws Exception {
//開啓zookeeper用戶端
ZooKeeper client = new ZooKeeper("hadoop003:2181,hadoop004:2181,hadoop005:2181", 2000, null);
//向zookeeper建立/servers的子節點,//127.0.0.1 6666 建立臨時有序節點,可以監控伺服器上線下線
String node = client.create("/servers/server", (args[0] + ":" + args[1]).getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println("伺服器上線了,地址爲:"+args[0]+":"+args[1]+"---"+node);
//開啓授時服務,發送時間給用戶端
ServerSocket serverSocket = new ServerSocket(Integer.parseInt(args[1]));//埠號
while (true){
Socket accept = serverSocket.accept();//程式卡在此處監聽,有請求才往下執行
OutputStream outputStream = accept.getOutputStream();
outputStream.write(new Date().toString().getBytes());//將時間寫回給用戶端
System.out.println("本次服務結束");
}
}
}
public class client {
List<String> children=null;
ZooKeeper client=null;
List<String> serverList=null;
public static void main(String[] args) throws Exception {
com.bigdata.test.client c= new client();
//開啓zookeeper用戶端 開啓監聽,每當servers下的節點數發生變化,以爲着伺服器上線下線,需要重新獲取節點列表,更新線上的伺服器列表
c.getClient();
//獲取/servers子節點,監聽節點
c.getNodeList();
//向伺服器發送請求,獲取時間
c.getTime();//每隔5秒就發送請求,獲取一次時間
}
public void getClient() throws Exception {
client = new ZooKeeper("hadoop003:2181,hadoop004:2181,hadoop005:2181", 2000, new Watcher() {
public void process(WatchedEvent watchedEvent) {
try {
getNodeList();
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
public void getNodeList() throws KeeperException, InterruptedException {
List<String>idAndPost=new ArrayList<String>();
children = client.getChildren("/servers", true, null);
for(String node:children){
byte[] data = client.getData("/servers/" + node, false, null);
idAndPost.add(new String(data));
}
serverList=idAndPost;//儲存每個節點的ip和port,相當於是伺服器地址列表
}
public void getTime() throws IOException, InterruptedException {
while (true){
Random random = new Random();
int i = random.nextInt(serverList.size());
//127.0.0.1 6666
String ipPort = serverList.get(i);
String ip =ipPort.split(":")[0];
String port = ipPort.split(":")[1];
//發送請求
Socket socket = new Socket(ip,Integer.parseInt(port));
OutputStream outputStream = socket.getOutputStream();
outputStream.write("hello".getBytes());
byte[] b=new byte[1024];
InputStream inputStream = socket.getInputStream();
inputStream.read(b);
String time = new String(b);
System.out.println("存取"+ipPort+"獲取到的時間是:"+time);
outputStream.close();
inputStream.close();
socket.close();
//每個5秒發送一次請求,獲取一次時間
Thread.sleep(5000);
}
}
}