做彩票網(wǎng)站程序違法嗎寧波seo超級外鏈工具
RabbitMq
由于RabbitMQ
采用了AMQP協(xié)議,因此它具備跨語言的特性。任何語言只要遵循AMQP協(xié)議收發(fā)消息,都可以與RabbitMQ
交互。并且RabbitMQ
官方也提供了各種不同語言的客戶端。
但是,RabbitMQ官方提供的Java客戶端編碼相對復(fù)雜,一般生產(chǎn)環(huán)境下我們更多會結(jié)合Spring來使用。而Spring的官方剛好基于RabbitMQ提供了這樣一套消息收發(fā)的模板工具:SpringAMQP。并且還基于SpringBoot對其實現(xiàn)了自動裝配,使用起來非常方便。
SpringAMQP提供了三個功能:
- 自動聲明隊列、交換機及其綁定關(guān)系
- 基于注解的監(jiān)聽器模式,異步接收消息
- 封裝了RabbitTemplate工具,用于發(fā)送消息
概念
**publisher**
:生產(chǎn)者,也就是發(fā)送消息的一方**consumer**
:消費者,也就是消費消息的一方**queue**
:隊列,存儲消息。生產(chǎn)者投遞的消息會暫存在消息隊列中,等待消費者處理**exchange**
:交換機,負責(zé)消息路由。生產(chǎn)者發(fā)送的消息由交換機決定投遞到哪個隊列。**virtual host**
:虛擬主機,起到數(shù)據(jù)隔離的作用。每個虛擬主機相互獨立,有各自的exchange、queue
交換機
我們打開Exchanges選項卡,可以看到已經(jīng)存在很多交換機:
我們點擊任意交換機,即可進入交換機詳情頁面。仍然會利用控制臺中的publish message 發(fā)送一條消息:
這里是由控制臺模擬了生產(chǎn)者發(fā)送的消息。由于沒有消費者存在,最終消息丟失了,這樣說明交換機沒有存儲消息的能力。
隊列
我們打開Queues
選項卡,新建一個隊列:
命名為hello.queue1
:
再以相同的方式,創(chuàng)建一個隊列,密碼為hello.queue2
,最終隊列列表如下:
此時,我們再次向amq.fanout
交換機發(fā)送一條消息。會發(fā)現(xiàn)消息依然沒有到達隊列!!
發(fā)送到交換機的消息,只會路由到與其綁定的隊列,因此僅僅創(chuàng)建隊列是不夠的,我們還需要將其與交換機綁定。
綁定關(guān)系
點擊Exchanges
選項卡,點擊amq.fanout
交換機,進入交換機詳情頁,然后點擊Bindings
菜單,在表單中填寫要綁定的隊列名稱:
相同的方式,將hello.queue2也綁定到改交換機。
最終,綁定結(jié)果如下:
發(fā)送消息
再次回到exchange頁面,找到剛剛綁定的amq.fanout
,點擊進入詳情頁,再次發(fā)送一條消息:
回到Queues
頁面,可以發(fā)現(xiàn)hello.queue
中已經(jīng)有一條消息了:
點擊隊列名稱,進入詳情頁,查看隊列詳情,這次我們點擊get message:
可以看到消息到達隊列了:
這個時候如果有消費者監(jiān)聽了MQ的hello.queue1
或hello.queue2
隊列,自然就能接收到消息了。
用戶管理
點擊Admin
選項卡,首先會看到RabbitMQ控制臺的用戶管理界面:
這里的用戶都是RabbitMQ的管理或運維人員。目前只有安裝RabbitMQ時添加的itheima
這個用戶。仔細觀察用戶表格中的字段,如下:
Name
:admin
,也就是用戶名Tags
:administrator
,說明itheima
用戶是超級管理員,擁有所有權(quán)限Can access virtual host
:/
,可以訪問的virtual host
,這里的/
是默認的virtual host
對于小型企業(yè)而言,出于成本考慮,我們通常只會搭建一套MQ集群,公司內(nèi)的多個不同項目同時使用。這個時候為了避免互相干擾, 我們會利用virtual host
的隔離特性,將不同項目隔離。一般會做兩件事情:
- 給每個項目創(chuàng)建獨立的運維賬號,將管理權(quán)限分離。
- 給每個項目創(chuàng)建不同的
virtual host
,將每個項目的數(shù)據(jù)隔離。
交換機
。而一旦引入交換機,消息發(fā)送的模式會有很大變化:
可以看到,在訂閱模型中,多了一個exchange角色,而且過程略有變化:
- Publisher:生產(chǎn)者,不再發(fā)送消息到隊列中,而是發(fā)給交換機
- Exchange:交換機,一方面,接收生產(chǎn)者發(fā)送的消息。另一方面,知道如何處理消息,例如遞交給某個特別隊列、遞交給所有隊列、或是將消息丟棄。到底如何操作,取決于Exchange的類型。
- Queue:消息隊列也與以前一樣,接收消息、緩存消息。不過隊列一定要與交換機綁定。
- Consumer:消費者,與以前一樣,訂閱隊列,沒有變化
- Exchange(交換機)只負責(zé)轉(zhuǎn)發(fā)消息,不具備存儲消息的能力,因此如果沒有任何隊列與Exchange綁定,或者沒有符合路由規(guī)則的隊列,那么消息會丟失!
交換機的類型有四種:
- Fanout:廣播,將消息交給所有綁定到交換機的隊列。我們最早在控制臺使用的正是Fanout交換機
- Direct:訂閱,基于RoutingKey(路由key)發(fā)送給訂閱了消息的隊列
- Topic:通配符訂閱,與Direct類似,只不過RoutingKey可以使用通配符
- Headers:頭匹配,基于MQ的消息頭匹配,用的較少。
Fanout交換機
Fanout,英文翻譯是扇出,我覺得在MQ中叫廣播更合適。
在廣播模式下,消息發(fā)送流程是這樣的:
- 1) 可以有多個隊列
- 2) 每個隊列都要綁定到Exchange(交換機)
- 3) 生產(chǎn)者發(fā)送的消息,只能發(fā)送到交換機
- 4) 交換機把消息發(fā)送給綁定過的所有隊列
- 5) 訂閱隊列的消費者都能拿到消息
我們的計劃是這樣的:
- 創(chuàng)建一個名為
hmall.fanout
的交換機,類型是Fanout
- 創(chuàng)建兩個隊列
fanout.queue1
和fanout.queue2
,綁定到交換機hmall.fanout
消息發(fā)送
在publisher服務(wù)的SpringAmqpTest類中添加測試方法:
@Test
public void testFanoutExchange() {// 交換機名稱String exchangeName = "hmall.fanout";// 消息String message = "hello, everyone!";rabbitTemplate.convertAndSend(exchangeName, "", message);
}
消息接收
在consumer服務(wù)的SpringRabbitListener中添加兩個方法,作為消費者:
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {System.out.println("消費者1接收到Fanout消息:【" + msg + "】");
}@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {System.out.println("消費者2接收到Fanout消息:【" + msg + "】");
}
Direct交換機
在Fanout模式中,一條消息,會被所有訂閱的隊列都消費。但是,在某些場景下,我們希望不同的消息被不同的隊列消費。這時就要用到Direct類型的Exchange。
在Direct模型下:
- 隊列與交換機的綁定,不能是任意綁定了,而是要指定一個
RoutingKey
(路由key) - 消息的發(fā)送方在 向 Exchange發(fā)送消息時,也必須指定消息的
RoutingKey
。 - Exchange不再把消息交給每一個綁定的隊列,而是根據(jù)消息的
Routing Key
進行判斷,只有隊列的Routingkey
與消息的Routing key
完全一致,才會接收到消息
案例需求如圖:
- 聲明一個名為
hmall.direct
的交換機 - 聲明隊列
direct.queue1
,綁定hmall.direct
,bindingKey
為blud
和red
- 聲明隊列
direct.queue2
,綁定hmall.direct
,bindingKey
為yellow
和red
- 在
consumer
服務(wù)中,編寫兩個消費者方法,分別監(jiān)聽direct.queue1和direct.queue2 - 在publisher中編寫測試方法,向
hmall.direct
發(fā)送消息
聲明隊列和交換機
首先在控制臺聲明兩個隊列direct.queue1
和direct.queue2
,然后聲明一個direct類型的交換機,命名為hmall.direct
:
然后使用red
和blue
作為key,綁定direct.queue1
到hmall.direct
同理,使用red
和yellow
作為key,綁定direct.queue2
到hmall.direct
,步驟略,最終結(jié)果:
消息接收
在consumer服務(wù)的SpringRabbitListener中添加方法:
@RabbitListener(queues = "direct.queue1")
public void listenDirectQueue1(String msg) {System.out.println("消費者1接收到direct.queue1的消息:【" + msg + "】");
}@RabbitListener(queues = "direct.queue2")
public void listenDirectQueue2(String msg) {System.out.println("消費者2接收到direct.queue2的消息:【" + msg + "】");
}
消息發(fā)送
在publisher服務(wù)的SpringAmqpTest類中添加測試方法:
@Test
public void testSendDirectExchange() {// 交換機名稱String exchangeName = "hmall.direct";// 消息String message = "紅色警報!日本亂排核廢水,導(dǎo)致海洋生物變異,驚現(xiàn)哥斯拉!";// 發(fā)送消息rabbitTemplate.convertAndSend(exchangeName, "red", message);
}
由于使用的red這個key,所以兩個消費者都收到了消息:
我們再切換為blue這個key:
@Test
public void testSendDirectExchange() {// 交換機名稱String exchangeName = "hmall.direct";// 消息String message = "最新報道,哥斯拉是居民自治巨型氣球,虛驚一場!";// 發(fā)送消息rabbitTemplate.convertAndSend(exchangeName, "blue", message);
}
你會發(fā)現(xiàn),只有消費者1收到了消息:
總結(jié)
描述下Direct交換機與Fanout交換機的差異?
- Fanout交換機將消息路由給每一個與之綁定的隊列
- Direct交換機根據(jù)RoutingKey判斷路由給哪個隊列
- 如果多個隊列具有相同的RoutingKey,則與Fanout功能類似
交換機
說明
Topic
類型的Exchange
與Direct
相比,都是可以根據(jù)RoutingKey
把消息路由到不同的隊列。
只不過Topic
類型Exchange
可以讓隊列在綁定BindingKey
的時候使用通配符!
BindingKey
一般都是有一個或多個單詞組成,多個單詞之間以.
分割,例如: item.insert
通配符規(guī)則:
#
:匹配一個或多個詞*
:匹配不多不少恰好1個詞
舉例:
item.#
:能夠匹配item.spu.insert
或者item.spu
item.*
:只能匹配item.spu
圖示:
假如此時publisher發(fā)送的消息使用的RoutingKey
共有四種:
china.news
代表有中國的新聞消息;china.weather
代表中國的天氣消息;japan.news
則代表日本新聞japan.weather
代表日本的天氣消息;
解釋:
topic.queue1
:綁定的是china.#
,凡是以china.
開頭的routing key
都會被匹配到,包括:china.news
china.weather
topic.queue2
:綁定的是#.news
,凡是以.news
結(jié)尾的routing key
都會被匹配。包括:china.news
japan.news
接下來,我們就按照上圖所示,來演示一下Topic交換機的用法。
首先,在控制臺按照圖示例子創(chuàng)建隊列、交換機,并利用通配符綁定隊列和交換機。此處步驟略。最終結(jié)果如下:
消息發(fā)送
在publisher服務(wù)的SpringAmqpTest類中添加測試方法:
/*** topicExchange*/
@Test
public void testSendTopicExchange() {// 交換機名稱String exchangeName = "hmall.topic";// 消息String message = "喜報!孫悟空大戰(zhàn)哥斯拉,勝!";// 發(fā)送消息rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
}
消息接收
在consumer服務(wù)的SpringRabbitListener中添加方法:
@RabbitListener(queues = "topic.queue1")
public void listenTopicQueue1(String msg){System.out.println("消費者1接收到topic.queue1的消息:【" + msg + "】");
}@RabbitListener(queues = "topic.queue2")
public void listenTopicQueue2(String msg){System.out.println("消費者2接收到topic.queue2的消息:【" + msg + "】");
}
聲明隊列和交換機
在之前我們都是基于RabbitMQ控制臺來創(chuàng)建隊列、交換機。但是在實際開發(fā)時,隊列和交換機是程序員定義的,將來項目上線,又要交給運維去創(chuàng)建。那么程序員就需要把程序中運行的所有隊列和交換機都寫下來,交給運維。在這個過程中是很容易出現(xiàn)錯誤的。
因此推薦的做法是由程序啟動時檢查隊列和交換機是否存在,如果不存在自動創(chuàng)建。
fanout示例
在consumer中創(chuàng)建一個類,聲明隊列和交換機:
package com.itheima.consumer.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class FanoutConfig {/*** 聲明交換機* @return Fanout類型交換機*/@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("hmall.fanout");}/*** 第1個隊列*/@Beanpublic Queue fanoutQueue1(){return new Queue("fanout.queue1");}/*** 綁定隊列和交換機*/@Beanpublic Binding bindingQueue1(Queue fanoutQueue1, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}/*** 第2個隊列*/@Beanpublic Queue fanoutQueue2(){return new Queue("fanout.queue2");}/*** 綁定隊列和交換機*/@Beanpublic Binding bindingQueue2(Queue fanoutQueue2, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}
}
direct示例
direct模式由于要綁定多個KEY,會非常麻煩,每一個Key都要編寫一個binding:
package com.itheima.consumer.config;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class DirectConfig {/*** 聲明交換機* @return Direct類型交換機*/@Beanpublic DirectExchange directExchange(){return ExchangeBuilder.directExchange("hmall.direct").build();}/*** 第1個隊列*/@Beanpublic Queue directQueue1(){return new Queue("direct.queue1");}/*** 綁定隊列和交換機*/@Beanpublic Binding bindingQueue1WithRed(Queue directQueue1, DirectExchange directExchange){return BindingBuilder.bind(directQueue1).to(directExchange).with("red");}/*** 綁定隊列和交換機*/@Beanpublic Binding bindingQueue1WithBlue(Queue directQueue1, DirectExchange directExchange){return BindingBuilder.bind(directQueue1).to(directExchange).with("blue");}/*** 第2個隊列*/@Beanpublic Queue directQueue2(){return new Queue("direct.queue2");}/*** 綁定隊列和交換機*/@Beanpublic Binding bindingQueue2WithRed(Queue directQueue2, DirectExchange directExchange){return BindingBuilder.bind(directQueue2).to(directExchange).with("red");}/*** 綁定隊列和交換機*/@Beanpublic Binding bindingQueue2WithYellow(Queue directQueue2, DirectExchange directExchange){return BindingBuilder.bind(directQueue2).to(directExchange).with("yellow");}
}
基于注解聲明
基于@Bean的方式聲明隊列和交換機比較麻煩,Spring還提供了基于注解方式來聲明。
例如,我們同樣聲明Direct模式的交換機和隊列:
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"),exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){System.out.println("消費者1接收到direct.queue1的消息:【" + msg + "】");
}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"),exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){System.out.println("消費者2接收到direct.queue2的消息:【" + msg + "】");
}
是不是簡單多了。
再試試Topic模式:
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue1"),exchange = @Exchange(name = "hmall.topic", type = ExchangeTypes.TOPIC),key = "china.#"
))
public void listenTopicQueue1(String msg){System.out.println("消費者1接收到topic.queue1的消息:【" + msg + "】");
}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue2"),exchange = @Exchange(name = "hmall.topic", type = ExchangeTypes.TOPIC),key = "#.news"
))
public void listenTopicQueue2(String msg){System.out.println("消費者2接收到topic.queue2的消息:【" + msg + "】");
}
消息轉(zhuǎn)換器
Spring的消息發(fā)送代碼接收的消息體是一個Object:
而在數(shù)據(jù)傳輸時,它會把你發(fā)送的消息序列化為字節(jié)發(fā)送給MQ,接收消息的時候,還會把字節(jié)反序列化為Java對象。
只不過,默認情況下Spring采用的序列化方式是JDK序列化。眾所周知,JDK序列化存在下列問題:
- 數(shù)據(jù)體積過大
- 有安全漏洞
- 可讀性差
配置JSON轉(zhuǎn)換器
顯然,JDK序列化方式并不合適。我們希望消息體的體積更小、可讀性更高,因此可以使用JSON方式來做序列化和反序列化。
在publisher
和consumer
兩個服務(wù)中都引入依賴:
<dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId><version>2.9.10</version>
</dependency>
注意,如果項目中引入了spring-boot-starter-web
依賴,則無需再次引入Jackson
依賴。
配置消息轉(zhuǎn)換器,在publisher
和consumer
兩個服務(wù)的啟動類中添加一個Bean即可:
@Bean
public MessageConverter messageConverter(){// 1.定義消息轉(zhuǎn)換器Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();// 2.配置自動創(chuàng)建消息id,用于識別不同消息,也可以在業(yè)務(wù)中基于ID判斷是否是重復(fù)消息jackson2JsonMessageConverter.setCreateMessageIds(true);return jackson2JsonMessageConverter;
}
消息轉(zhuǎn)換器中添加的messageId可以便于我們將來做冪等性判斷。
總結(jié)
以上的代碼已上傳到Github
https://github.com/onenewcode/mq-demo