深入學習 RabbitMQ 訊息中介軟體

2020-08-14 21:08:17
  • 什麼是訊息佇列、爲什麼要使用訊息佇列以及常見的產品?

訊息佇列是應用程式之間通訊方法,無需即時返回且耗時的操作進行非同步處理從而提高系統的吞吐量,可以實現程式之間的解耦合,常見的產品有 ActiveMQ、ZeroMQ、RabbitMQ、RocketMQ和Kafka。

  • 安裝設定RabbitMQ

安裝rabbitmq前要先安裝erlang:百度雲下載erlang20.3和rabbitmq3.7.14 提取碼:05z6

預設專案下一步就好,安裝完erlang新增bin路徑到path,例:%ERLANG_HOME%\bin

安裝完rabbitmq後,開啓cmd cd 安裝路徑/sbin(D:\DevInstall\rabbitmq3.7.14\rabbitmq_server-3.7.14\sbin) 

啓動rabbitmq輸入:rabbitmq-plugins.bat enable rabbitmq_management,不出意外的話會出現下面 下麪的情況

D:\DevInstall\rabbitmq3.7.14\rabbitmq_server-3.7.14\sbin>rabbitmq-plugins.bat enable rabbitmq_management
Enabling plugins on node rabbit@LAPTOP-IAO5L5AN:
rabbitmq_management
The following plugins have been configured:
  rabbitmq_management
  rabbitmq_management_agent
  rabbitmq_web_dispatch
Applying plugin configuration to rabbit@LAPTOP-IAO5L5AN...
Plugin configuration unchanged.

這時存取rabbitmq管理頁面這存取不到的(http://localhost:15672/),這時候輸入:rabbitmq-server start 時可以存取

D:\DevInstall\rabbitmq3.7.14\rabbitmq_server-3.7.14\sbin>rabbitmq-server start
"WARNING: Using RABBITMQ_ADVANCED_CONFIG_FILE: C:\Users\GONGBIN\AppData\Roaming\RabbitMQ\advanced.config"

  ##  ##
  ##  ##      RabbitMQ 3.7.14. Copyright (C) 2007-2019 Pivotal Software, Inc.
  ##########  Licensed under the MPL.  See https://www.rabbitmq.com/
  ######  ##
  ##########  Logs: C:/Users/GONGBIN/AppData/Roaming/RabbitMQ/log/RABBIT~1.LOG
                    C:/Users/GONGBIN/AppData/Roaming/RabbitMQ/log/rabbit@LAPTOP-IAO5L5AN_upgrade.log

              Starting broker...
 completed with 3 plugins.

進入之後簡單的設定新增一個使用者,以及管理 Virtual Hosts 虛擬主機

到此建立了一個使用者,點選 Virtual Hosts 建立虛擬主機

Users 預設是 guest 更改成剛建立的使用者

點選 Set Permission 到此完成

  • 搭建 RabbitMQ 入門工程,編寫簡單的生產者與消費者
        <dependency>
            <groupId>com.rabbitmq</groupId>
            <artifactId>amqp-client</artifactId>
        </dependency>

實現一個生產者和消費者都需要6步,其中公共的4個步驟爲:1. 建立連線工廠設定服務參數、2. 建立連線、3. 建立頻道、4. 宣告佇列,生產者的5、6步爲:5. 發送訊息、6.關閉資源,訊息者爲:5. 建立消費者、6.監聽佇列,訊息者需要監聽訊息佇列不需要關閉資源。

定義公共工具

public class ConnectionUtil {

    public static Connection getConnection() throws IOException, TimeoutException {
        // 1. 建立連線工廠設定服務參數
        ConnectionFactory connectionFactory = new ConnectionFactory();
        connectionFactory.setHost("127.0.0.1");
        connectionFactory.setVirtualHost("itea");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("dxayga");
        connectionFactory.setPassword("dxayga");
        // 2. 建立連線
        Connection connection = connectionFactory.newConnection();
        return connection;
    }

}

生產者程式碼

public class Producer {

    final static String QUEUE_NAME = "simple_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        // 1. 建立連線工廠設定服務參數
        // 2. 建立連線
        Connection connection = ConnectionUtil.getConnection();
        // 3. 建立頻道
        Channel channel = connection.createChannel();
        // 4. 宣告佇列
        /**
         * 參數1:佇列名
         * 參數2:是否持久化(會一直儲存在伺服器上)
         * 參數3:是否獨佔本連線
         * 參數4:是否在不使用時刪除
         * 參數5:其他參數
         */
        channel.queueDeclare(QUEUE_NAME,true,false,true,null);
        // 5. 發送訊息
        /**
         * 參數1:交換機名,沒有指定爲爲預設交換機
         * 參數2:路由的key(簡單模式下可以使用佇列名)
         * 參數3:訊息的其他屬性
         * 參數4:訊息內容
         */
        channel.basicPublish("",QUEUE_NAME,null,"Hello World!".getBytes());
        // 6.關閉資源
        channel.close();
        connection.close();
    }

}

消費者程式碼

public class Consumer {

    public static void main(String[] args) throws IOException, TimeoutException {
        // 1. 建立連線工廠設定服務參數
        // 2. 建立連線
        Connection connection = ConnectionUtil.getConnection();
        // 3. 建立頻道
        Channel channel = connection.createChannel();
        // 4. 宣告佇列
        /**
         * 參數1:佇列名
         * 參數2:是否持久化(會一直儲存在伺服器上)
         * 參數3:是否獨佔本連線
         * 參數4:是否在不使用時刪除
         * 參數5:其他參數
         */
        channel.queueDeclare(Producer.QUEUE_NAME,true,false,true,null);
        // 5. 建立消費者
        DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("路由的key:"+envelope.getRoutingKey());
                System.out.println("交換機爲:"+envelope.getExchange());
                System.out.println("訊息id爲:"+envelope.getDeliveryTag());
                System.out.println("接收的訊息爲:"+new String(body,"utf-8"));
            }
        };
        // 6.監聽佇列
        /**
         * 參數1:佇列名
         * 參數2:是否自動確認;true爲收到訊息自動從佇列刪除,false爲手動確認,
         * 參數3:訊息者
         */
        channel.basicConsume(Producer.QUEUE_NAME,true,defaultConsumer);

    }

}

啓動生產者和消費者,成功的話返回:

路由的key:simple_queue
交換機爲:
訊息id爲:1
接收的訊息爲:Hello World!

而沒有消費的的資訊也可以在伺服器端看到

  • 工作佇列模式 

點選檢視這篇文章 

消費者與消費者是競爭的關係,一條訊息只能被一個消費者消費,不存在一條訊息被兩個消費者消費。