ZooKeeper分佈式協調服務元件 05

2020-08-11 23:57:52

一、 Zookeeper概述

1、 概述

Zookeeper是一個開源的分佈式的,爲分佈式應用提供協調服務的Apache專案。
在學習zookeeper之前,我們可以想下微信公衆號推播文章,微信伺服器有兩個功能。當公衆號作者寫文章後,點擊發布,這個時候微信伺服器做了兩件事情,第一件事:將作者的文章儲存下來,即實現了檔案系統的功能,儲存檔案。第二件事:給每個訂閱了該公衆號的微信使用者發送通知,而不給那些未訂閱的使用者推播,即實現了通知功能。
而zookeeper也有這兩大功能,即Zookeeper=檔案系統(可以在zk上儲存數據)+通知機制 機製。我們也可以在zookeeper上儲存數據,也可以連線到zookeeper伺服器事先訂閱的某個數據,然後zookeeper發現數據變化後,會通知用戶端。

2、 特點

1)Zookeeper:一個領導者(leader),多個跟隨者(follower)組成的叢集。
2)Leader負責進行投票的發起和決議,更新系統狀態
3)Follower用於接收客戶請求並向用戶端返回結果,在選舉Leader過程中參與投票
4)叢集中只要有半數以上節點存活,Zookeeper叢集就能正常服務。
5)全域性數據一致:每個server儲存一份相同的數據副本,client無論連線到哪個server,數據都是一致的。
6)更新請求順序進行,來自同一個client的更新請求按其發送順序依次執行。
7)數據更新原子性,一次數據更新要麼成功,要麼失敗。
8)實時性,在一定時間範圍內,client能讀到最新數據。

3、 數據結構

ZooKeeper數據模型的結構與Unix檔案系統很類似,整體上可以看作是一棵樹,每個節點稱做一個Znode,zookeeper把數據儲存在節點上。
很顯然zookeeper叢集自身維護了一套數據結構。這個儲存結構是一個樹形結構,其上的每一個節點,我們稱之爲"znode",每一個znode預設能夠儲存1MB的數據,每個ZNode都可以通過其路徑唯一標識,如圖1所示
在这里插入图片描述

4、 下載地址

1.官網首頁:
https://zookeeper.apache.org/
2.下載截圖,如圖5,6,7所示
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

二、 Zookeeper安裝

1、 分佈式安裝部署

1.叢集規劃
在hadoop003、hadoop004和hadoop005三個節點上部署Zookeeper。
在一個節點上設定好,再拷貝分發到其他機器上。
2.解壓安裝
1)解壓zookeeper安裝包到/opt/module/目錄下

[root@hadoop003 software]$ tar -zxvf zookeeper-3.4.10.tar.gz -C /opt/module/

(2)在/opt/module/zookeeper-3.4.10/這個目錄下建立zkData

mkdir -p zkData

(3)重新命名/opt/module/zookeeper-3.4.10/conf這個目錄下的zoo_sample.cfg爲zoo.cfg

cp zoo_sample.cfg zoo.cfg

3.設定zoo.cfg檔案
(1)具體設定

dataDir=/opt/module/zookeeper-3.4.10/zkData

增加如下設定

#######################cluster##########################
server.1=hadoop003:2888:3888
server.2=hadoop004:2888:3888
server.3=hadoop005:2888:3888

(2)設定參數解讀
server.A=B:C:D。
A是一個數字,表示這個是第幾號伺服器;
B是這個伺服器的ip地址;
C是這個伺服器與叢集中的Leader伺服器交換資訊的埠;
D是萬一叢集中的Leader伺服器掛了,需要一個埠來重新進行選舉,選出一個新的Leader,而這個埠就是用來執行選舉時伺服器相互通訊的埠。
叢集模式下設定一個檔案myid,這個檔案在dataDir目錄下,這個檔案裏面有一個數據就是A的值,Zookeeper啓動時讀取此檔案,拿到裏面的數據與zoo.cfg裏面的設定資訊比較從而判斷到底是哪個server。
4.叢集操作
(1)在/opt/module/zookeeper-3.4.10/zkData目錄下建立一個myid的檔案

touch myid

新增myid檔案,注意一定要在linux裏面建立,在notepad++裏面很可能亂碼
(2)編輯myid檔案

vim myid

在檔案中新增與server對應的編號:如1
(3)拷貝設定好的zookeeper分發到其他機器上

scp -r zookeeper-3.4.10/ hadoop004:/opt/module
scp -r zookeeper-3.4.10/ hadoop005:/opt/module
並分別修改myid檔案中內容爲2、3

(4)分別啓動zookeeper

[root@hadoop003 zookeeper-3.4.10]# bin/zkServer.sh start
[root@hadoop004 zookeeper-3.4.10]# bin/zkServer.sh start
[root@hadoop005 zookeeper-3.4.10]# bin/zkServer.sh start

(5)檢視狀態

jps
[root@hadoop003 zookeeper-3.4.10]# bin/zkServer.sh status
JMX enabled by default
Using config: /opt/module/zookeeper-3.4.10/bin/../conf/zoo.cfg
Mode: follower
[root@hadoop004 zookeeper-3.4.10]# bin/zkServer.sh status
JMX enabled by default
Using config: /opt/module/zookeeper-3.4.10/bin/../conf/zoo.cfg
Mode: leader
[root@hadoop005 zookeeper-3.4.5]# bin/zkServer.sh status
JMX enabled by default
Using config: /opt/module/zookeeper-3.4.10/bin/../conf/zoo.cfg
Mode: follower

2、 設定參數解讀

解讀zoo.cfg 檔案中參數含義

  • 1.tickTime:通訊心跳數,Zookeeper伺服器心跳時間,單位毫秒
    Zookeeper使用的基本時間,伺服器之間維持心跳的時間間隔,也就是每隔tickTime時間就會發送一個心跳,時間單位爲毫秒。
  • 2.initLimit:LF(leader、follower)初始通訊時限
    叢集中的follower跟隨者伺服器(F)與leader領導者伺服器(L)之間初始連線時能容忍的最多心跳數(tickTime的數量),用它來限定叢集中的Zookeeper伺服器連線到Leader的時限。
  • 3.syncLimit:LF同步通訊時限
    叢集中Leader與Follower之間的最大響應時間單位,假如響應超過syncLimit * tickTime,
    Leader認爲Follwer死掉,從伺服器列表中刪除Follwer。
    在執行過程中,Leader負責與ZK叢集中所有機器進行通訊,例如通過一些心跳檢測機制 機製,來檢測機器的存活狀態。
    如果L發出心跳包在syncLimit之後,還沒有從F那收到響應,那麼就認爲這個F已經不線上了。
  • 4.dataDir:數據檔案目錄+數據持久化路徑
    儲存記憶體數據庫快照資訊的位置,如果沒有其他說明,更新的事務日誌也儲存到數據庫。
  • 5.clientPort:用戶端連線埠
    監聽用戶端連線的埠

三、 Zookeeper內部原理

1、 節點型別

  • 1.Znode有兩種型別
    短暫(ephemeral):臨時的、用戶端和伺服器端斷開連線後,建立的節點自己刪除
    持久(persistent):永久的,用戶端和伺服器端斷開連線後,建立的節點不刪除
  • 2.Znode有四種形式的目錄節點(預設是persistent )
    (1)持久化目錄節點(PERSISTENT)
    用戶端與zookeeper斷開連線後,該節點依舊存在
    (2)持久化順序編號目錄節點(PERSISTENT_SEQUENTIAL)
    用戶端與zookeeper斷開連線後,該節點依舊存在,只是Zookeeper給該節點名稱進行順序編號
    (3)臨時目錄節點(EPHEMERAL)
    用戶端與zookeeper斷開連線後,該節點被刪除
    (4)臨時順序編號目錄節點(EPHEMERAL_SEQUENTIAL)
    用戶端與zookeeper斷開連線後,該節點被刪除,只是Zookeeper給該節點名稱進行順序編號,如圖9所示
    在这里插入图片描述
  • 3.建立znode時設定順序標識,znode名稱後會附加一個值,順序號是一個單調遞增的計數器,由父節點維護
  • 4.在分佈式系統中,順序號可以被用於爲所有的事件進行全域性排序,這樣用戶端可以通過順序號推斷事件的順序

2、 stat結構體

1)czxid- create 引起這個znode建立的zxid,建立節點的事務的zxid
每次修改ZooKeeper狀態都會收到一個zxid形式的時間戳,也就是ZooKeeper事務ID。
事務ID是ZooKeeper中所有修改總的次序。每個修改都有唯一的zxid,如果zxid1小於zxid2,那麼zxid1在zxid2之前發生。
2)ctime - create znode被建立的毫秒數(從1970年開始)
3)mzxid - modify znode最後更新的zxid
4)mtime - modify znode最後修改的毫秒數(從1970年開始)
5)pZxid-znode最後更新的子節點zxid
6)cversion - znode子節點變化號,znode子節點修改次數
7)dataversion - znode數據變化號
8)aclVersion - znode存取控制列表的變化號
9)ephemeralOwner- 如果是臨時節點,這個是znode擁有者的session id。如果不是臨時節點則是0。
10)dataLength- znode的數據長度
11)numChildren - znode子節點數量

3、 監聽器原理

在这里插入图片描述

  • 1.監聽原理詳解,如圖10所示
    1)首先要有一個main()執行緒
    2)在main執行緒中建立Zookeeper用戶端,這時就會建立兩個執行緒,一個負責網路連線通訊(connect),一個負責監聽(listener)。
    3)通過connect執行緒將註冊的監聽事件發送給Zookeeper。
    4)在Zookeeper的註冊監聽器列表中將註冊的監聽事件新增到列表中。
    5)Zookeeper監聽到有數據或路徑變化,就會將這個訊息發送給listener執行緒。
    6)listener執行緒內部呼叫了process()方法。
  • 2.常見的監聽
    (1)監聽節點數據的變化:
    get path [watch]
    (2)監聽子節點增減的變化
    ls path [watch]

4、 寫數據流程(client向zookeeper寫入數據)

在这里插入图片描述
ZooKeeper 的寫數據流程主要分爲以下幾步,如圖11所示:
1)比如 Client 向 ZooKeeper 的 Server1 上寫數據,發送一個寫請求。
2)如果Server1不是Leader,那麼Server1 會把接受到的請求進一步轉發給Leader,因爲每個ZooKeeper的Server裏面有一個是Leader。這個Leader 會將寫請求廣播給各個Server,比如Server1和Server2, 各個Server寫成功後就會通知Leader。
3)當Leader收到大多數 Server 數據寫成功了,那麼就說明數據寫成功了。如果這裏三個節點的話,只要有兩個節點數據寫成功了,那麼就認爲數據寫成功了。寫成功之後,Leader會告訴Server1數據寫成功了。
4)Server1會進一步通知 Client 數據寫成功了,這時就認爲整個寫操作成功。ZooKeeper 整個寫數據流程就是這樣的。

四、 Zookeeper實戰

1、 用戶端命令列操作

在这里插入图片描述
1.啓動用戶端

[root@hadoop003 zookeeper-3.4.10]$ bin/zkCli.sh

2.顯示所有操作命令

[zk: localhost:2181(CONNECTED) 1] help

3.檢視當前znode中所包含的內容

[zk: localhost:2181(CONNECTED) 0] ls /
[zookeeper]

4.檢視當前節點數據並能看到更新次數等數據

[zk: localhost:2181(CONNECTED) 1] ls2 /
[zookeeper]
cZxid = 0x0
ctime = Thu Jan 01 08:00:00 CST 1970
mZxid = 0x0
mtime = Thu Jan 01 08:00:00 CST 1970
pZxid = 0x0
cversion = -1
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 0
numChildren = 1

5.建立普通節點

[zk: localhost:2181(CONNECTED) 2] create /app1 "hello app1"
Created /app1
[zk: localhost:2181(CONNECTED) 4] create /app1/server101 "192.168.1.101"
Created /app1/server101

6.獲得節點的值

[zk: localhost:2181(CONNECTED) 6] get /app1
hello app1
cZxid = 0x20000000a
ctime = Mon Jul 17 16:08:35 CST 2017
mZxid = 0x20000000a
mtime = Mon Jul 17 16:08:35 CST 2017
pZxid = 0x20000000b
cversion = 1
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 10
numChildren = 1
[zk: localhost:2181(CONNECTED) 8] get /app1/server101
192.168.1.101
cZxid = 0x20000000b
ctime = Mon Jul 17 16:11:04 CST 2017
mZxid = 0x20000000b
mtime = Mon Jul 17 16:11:04 CST 2017
pZxid = 0x20000000b
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 13
numChildren = 0

7.建立短暫節點

[zk: localhost:2181(CONNECTED) 9] create -e /app-emphemeral 8888

(1)在當前用戶端是能檢視到的

[zk: localhost:2181(CONNECTED) 10] ls /
[app1, app-emphemeral, zookeeper]

(2)退出當前用戶端然後再重新啓動用戶端

[zk: localhost:2181(CONNECTED) 12] quit
[bigdata@hadoop104 zookeeper-3.4.10]$ bin/zkCli.sh

(3)再次檢視根目錄下短暫節點已經刪除

[zk: localhost:2181(CONNECTED) 0] ls /
[app1, zookeeper]

8.建立帶序號的節點
(1)先建立一個普通的根節點app2

[zk: localhost:2181(CONNECTED) 11] create /app2 "app2"

(2)建立帶序號的節點

[zk: localhost:2181(CONNECTED) 13] create -s /app2/aa 888
Created /app2/aa0000000000
[zk: localhost:2181(CONNECTED) 14] create -s /app2/bb 888
Created /app2/bb0000000001
[zk: localhost:2181(CONNECTED) 15] create -s /app2/cc 888
Created /app2/cc0000000002
如果原節點下有1個節點,則再排序時從1開始,以此類推。
[zk: localhost:2181(CONNECTED) 16] create -s /app1/aa 888
Created /app1/aa0000000001

9.修改節點數據值

[zk: localhost:2181(CONNECTED) 2] set /app1 999

10.節點的值變化監聽
(1)在hadoop005主機上註冊監聽/app1節點數據變化

[zk: localhost:2181(CONNECTED) 26] get /app1 watch

(2)在hadoop004主機上修改/app1節點的數據

[zk: localhost:2181(CONNECTED) 5] set /app1  777

(3)觀察hadoop005主機收到數據變化的監聽

WATCHER::
WatchedEvent state:SyncConnected type:NodeDataChanged path:/app1

11.節點的子節點變化監聽(路徑變化)
(1)在hadoop005主機上註冊監聽/app1節點的子節點變化

[zk: localhost:2181(CONNECTED) 1] ls /app1 watch
[aa0000000001, server101]

(2)在hadoop004主機/app1節點上建立子節點

[zk: localhost:2181(CONNECTED) 6] create /app1/bb 666
Created /app1/bb

(3)觀察hadoop005主機收到子節點變化的監聽

WATCHER::
WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/app1

12.刪除節點

[zk: localhost:2181(CONNECTED) 4] delete /app1/bb

13.遞回刪除節點

[zk: localhost:2181(CONNECTED) 7] rmr /app2

14.檢視節點狀態

[zk: localhost:2181(CONNECTED) 12] stat /app1
cZxid = 0x20000000a
ctime = Mon Jul 17 16:08:35 CST 2017
mZxid = 0x200000018
mtime = Mon Jul 17 16:54:38 CST 2017
pZxid = 0x20000001c
cversion = 4
dataVersion = 2
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 3
numChildren = 2

注意:監聽只管用一次,要想再次監聽,只能再次執行監控命令。

2、 API應用

1) Idea環境搭建

1.建立一個Maven工程
2.新增pom檔案

<dependencies>
		<dependency>
			<groupId>junit</groupId>
			<artifactId>junit</artifactId>
			<version>RELEASE</version>
		</dependency>
		<dependency>
			<groupId>org.apache.logging.log4j</groupId>
			<artifactId>log4j-core</artifactId>
			<version>2.8.2</version>
		</dependency>
		<!-- https://mvnrepository.com/artifact/org.apache.zookeeper/zookeeper -->
		<dependency>
			<groupId>org.apache.zookeeper</groupId>
			<artifactId>zookeeper</artifactId>
			<version>3.4.10</version>
		</dependency>
	</dependencies>

2) Java用戶端操作(不帶監聽)

public class WithoutWatch {
    ZooKeeper client=null;
    @Before//建立用戶端
    public void getClient() throws IOException {
        client = new ZooKeeper("hadoop003:2181,hadoop004:2181,hadoop005:2181", 2000, null);

    }
    @Test//建立節點   報NullPointerExceptions=是因爲沒帶監聽,監聽設爲null
    //參數一:節點名
    //參數二:參數值
    //參數三:存取許可權,所有用戶端都能存取
    //參數四:永久節點
    public void createNode() throws KeeperException, InterruptedException {
        String node = client.create("/idea", "hello".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        System.out.println(node);
    }
    @Test//獲取節點
    public void getChildrenNode() throws KeeperException, InterruptedException {
        List<String> children = client.getChildren("/",null,null);
        for(String c:children){
            System.out.println(c);
        }
    }
    @Test//獲取節點值
    //參數三:獲取該節點最新的數據
    public void getNodeData() throws KeeperException, InterruptedException {
        byte[] data = client.getData("/idea", false, null);
        System.out.println(new String(data));
    }
    @Test//修改節點數據
    //參數三:-1,表示修改最新版本號的數據
    public void modifyNodeData() throws KeeperException, InterruptedException {
        Stat stat = client.setData("/idea", "666".getBytes(), -1);
        if(stat==null){
            System.out.println("修改失敗");
        }else{
            System.out.println("修改成功");
        }
    }
    @Test//刪除節點
    public void deleteNode() throws KeeperException, InterruptedException {
        client.delete("/idea",-1);
        System.out.println("刪除成功");
    }
}

3) Java用戶端操作(帶監聽)

public class WithWatch {
    ZooKeeper client=null;
    @Before//建立用戶端
    public void getClient() throws IOException {
        client = new ZooKeeper("hadoop003:2181,hadoop004:2181,hadoop005:2181", 2000, new Watcher() {
            public void process(WatchedEvent watchedEvent) {
                System.out.println(watchedEvent.getType()+"-"+watchedEvent.getPath());
                try {
                    client.getChildren("/",true,null);

                } catch (KeeperException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });

    }
    @Test//獲取節點
    public void getChildrenNode() throws KeeperException, InterruptedException {
//        List<String> children = client.getChildren("/",true,null);
//        for(String c:children){
//            System.out.println(c);
//        }
        Thread.sleep(Long.MAX_VALUE);
    }
}

3、 監聽伺服器動態上下線案例

1.需求
某分佈式系統中,主節點可以有多臺,可以動態上下線,任意一臺用戶端都能實時感知到主節點伺服器的上下線。
2.需求分析
1)提供時間查詢服務,用戶端系統呼叫該時間查詢服務。
2)動態上線,下線該時間查詢服務的節點,讓用戶端實時感知伺服器列表變化,查詢時候存取最新的機器節點。
在这里插入图片描述

3)具體實現:
先在叢集上建立/servers節點

[zk: localhost:2181(CONNECTED) 10] create /servers "servers"
Created /servers
public class server {
    public static void main(String[] args) throws Exception {
        //開啓zookeeper用戶端
        ZooKeeper client = new ZooKeeper("hadoop003:2181,hadoop004:2181,hadoop005:2181", 2000, null);
        //向zookeeper建立/servers的子節點,//127.0.0.1 6666   建立臨時有序節點,可以監控伺服器上線下線
        String node = client.create("/servers/server", (args[0] + ":" + args[1]).getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
        System.out.println("伺服器上線了,地址爲:"+args[0]+":"+args[1]+"---"+node);
        //開啓授時服務,發送時間給用戶端
        ServerSocket serverSocket = new ServerSocket(Integer.parseInt(args[1]));//埠號
        while (true){
            Socket accept = serverSocket.accept();//程式卡在此處監聽,有請求才往下執行
            OutputStream outputStream = accept.getOutputStream();
            outputStream.write(new Date().toString().getBytes());//將時間寫回給用戶端
            System.out.println("本次服務結束");
        }

    }

}
public class client {
    List<String> children=null;
    ZooKeeper client=null;
    List<String> serverList=null;
    public static void main(String[] args) throws Exception {
        com.bigdata.test.client c= new client();
        //開啓zookeeper用戶端  開啓監聽,每當servers下的節點數發生變化,以爲着伺服器上線下線,需要重新獲取節點列表,更新線上的伺服器列表
        c.getClient();
        //獲取/servers子節點,監聽節點
        c.getNodeList();




        //向伺服器發送請求,獲取時間
        c.getTime();//每隔5秒就發送請求,獲取一次時間
    }

    public void getClient() throws Exception {
        client = new ZooKeeper("hadoop003:2181,hadoop004:2181,hadoop005:2181", 2000, new Watcher() {
            public void process(WatchedEvent watchedEvent) {
                try {
                    getNodeList();
                } catch (KeeperException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
    }

    public void getNodeList() throws KeeperException, InterruptedException {
        List<String>idAndPost=new ArrayList<String>();
        children = client.getChildren("/servers", true, null);
        for(String node:children){
            byte[] data = client.getData("/servers/" + node, false, null);
            idAndPost.add(new String(data));
        }
        serverList=idAndPost;//儲存每個節點的ip和port,相當於是伺服器地址列表
    }
    public void getTime() throws IOException, InterruptedException {
        while (true){
            Random random = new Random();
            int i = random.nextInt(serverList.size());
            //127.0.0.1 6666
            String ipPort = serverList.get(i);
            String ip =ipPort.split(":")[0];
            String port = ipPort.split(":")[1];
            //發送請求
            Socket socket = new Socket(ip,Integer.parseInt(port));
            OutputStream outputStream = socket.getOutputStream();
            outputStream.write("hello".getBytes());
            byte[] b=new byte[1024];
            InputStream inputStream = socket.getInputStream();
            inputStream.read(b);
            String time = new String(b);
            System.out.println("存取"+ipPort+"獲取到的時間是:"+time);
            outputStream.close();
            inputStream.close();
            socket.close();
            //每個5秒發送一次請求,獲取一次時間
            Thread.sleep(5000);
        }

    }
}