訊息佇列是應用程式之間通訊方法,無需即時返回且耗時的操作進行非同步處理從而提高系統的吞吐量,可以實現程式之間的解耦合,常見的產品有 ActiveMQ、ZeroMQ、RabbitMQ、RocketMQ和Kafka。
安裝rabbitmq前要先安裝erlang:百度雲下載erlang20.3和rabbitmq3.7.14 提取碼:05z6
預設專案下一步就好,安裝完erlang新增bin路徑到path,例:%ERLANG_HOME%\bin
安裝完rabbitmq後,開啓cmd cd 安裝路徑/sbin(D:\DevInstall\rabbitmq3.7.14\rabbitmq_server-3.7.14\sbin)
啓動rabbitmq輸入:rabbitmq-plugins.bat enable rabbitmq_management,不出意外的話會出現下面 下麪的情況
D:\DevInstall\rabbitmq3.7.14\rabbitmq_server-3.7.14\sbin>rabbitmq-plugins.bat enable rabbitmq_management
Enabling plugins on node rabbit@LAPTOP-IAO5L5AN:
rabbitmq_management
The following plugins have been configured:
rabbitmq_management
rabbitmq_management_agent
rabbitmq_web_dispatch
Applying plugin configuration to rabbit@LAPTOP-IAO5L5AN...
Plugin configuration unchanged.
這時存取rabbitmq管理頁面這存取不到的(http://localhost:15672/),這時候輸入:rabbitmq-server start 時可以存取
D:\DevInstall\rabbitmq3.7.14\rabbitmq_server-3.7.14\sbin>rabbitmq-server start
"WARNING: Using RABBITMQ_ADVANCED_CONFIG_FILE: C:\Users\GONGBIN\AppData\Roaming\RabbitMQ\advanced.config"
## ##
## ## RabbitMQ 3.7.14. Copyright (C) 2007-2019 Pivotal Software, Inc.
########## Licensed under the MPL. See https://www.rabbitmq.com/
###### ##
########## Logs: C:/Users/GONGBIN/AppData/Roaming/RabbitMQ/log/RABBIT~1.LOG
C:/Users/GONGBIN/AppData/Roaming/RabbitMQ/log/rabbit@LAPTOP-IAO5L5AN_upgrade.log
Starting broker...
completed with 3 plugins.
進入之後簡單的設定新增一個使用者,以及管理 Virtual Hosts 虛擬主機
到此建立了一個使用者,點選 Virtual Hosts 建立虛擬主機
Users 預設是 guest 更改成剛建立的使用者
點選 Set Permission 到此完成
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
</dependency>
實現一個生產者和消費者都需要6步,其中公共的4個步驟爲:1. 建立連線工廠設定服務參數、2. 建立連線、3. 建立頻道、4. 宣告佇列,生產者的5、6步爲:5. 發送訊息、6.關閉資源,訊息者爲:5. 建立消費者、6.監聽佇列,訊息者需要監聽訊息佇列不需要關閉資源。
定義公共工具
public class ConnectionUtil {
public static Connection getConnection() throws IOException, TimeoutException {
// 1. 建立連線工廠設定服務參數
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("127.0.0.1");
connectionFactory.setVirtualHost("itea");
connectionFactory.setPort(5672);
connectionFactory.setUsername("dxayga");
connectionFactory.setPassword("dxayga");
// 2. 建立連線
Connection connection = connectionFactory.newConnection();
return connection;
}
}
生產者程式碼
public class Producer {
final static String QUEUE_NAME = "simple_queue";
public static void main(String[] args) throws IOException, TimeoutException {
// 1. 建立連線工廠設定服務參數
// 2. 建立連線
Connection connection = ConnectionUtil.getConnection();
// 3. 建立頻道
Channel channel = connection.createChannel();
// 4. 宣告佇列
/**
* 參數1:佇列名
* 參數2:是否持久化(會一直儲存在伺服器上)
* 參數3:是否獨佔本連線
* 參數4:是否在不使用時刪除
* 參數5:其他參數
*/
channel.queueDeclare(QUEUE_NAME,true,false,true,null);
// 5. 發送訊息
/**
* 參數1:交換機名,沒有指定爲爲預設交換機
* 參數2:路由的key(簡單模式下可以使用佇列名)
* 參數3:訊息的其他屬性
* 參數4:訊息內容
*/
channel.basicPublish("",QUEUE_NAME,null,"Hello World!".getBytes());
// 6.關閉資源
channel.close();
connection.close();
}
}
消費者程式碼
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
// 1. 建立連線工廠設定服務參數
// 2. 建立連線
Connection connection = ConnectionUtil.getConnection();
// 3. 建立頻道
Channel channel = connection.createChannel();
// 4. 宣告佇列
/**
* 參數1:佇列名
* 參數2:是否持久化(會一直儲存在伺服器上)
* 參數3:是否獨佔本連線
* 參數4:是否在不使用時刪除
* 參數5:其他參數
*/
channel.queueDeclare(Producer.QUEUE_NAME,true,false,true,null);
// 5. 建立消費者
DefaultConsumer defaultConsumer = new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("路由的key:"+envelope.getRoutingKey());
System.out.println("交換機爲:"+envelope.getExchange());
System.out.println("訊息id爲:"+envelope.getDeliveryTag());
System.out.println("接收的訊息爲:"+new String(body,"utf-8"));
}
};
// 6.監聽佇列
/**
* 參數1:佇列名
* 參數2:是否自動確認;true爲收到訊息自動從佇列刪除,false爲手動確認,
* 參數3:訊息者
*/
channel.basicConsume(Producer.QUEUE_NAME,true,defaultConsumer);
}
}
啓動生產者和消費者,成功的話返回:
路由的key:simple_queue
交換機爲:
訊息id爲:1
接收的訊息爲:Hello World!
而沒有消費的的資訊也可以在伺服器端看到
工作佇列模式
消費者與消費者是競爭的關係,一條訊息只能被一個消費者消費,不存在一條訊息被兩個消費者消費。