網(wǎng)站 默認(rèn)首頁(yè)濟(jì)南seo的排名優(yōu)化
1、 訂閱模型-Direct
? 有選擇性的接收消息
? 在訂閱模式中,生產(chǎn)者發(fā)布消息,所有消費(fèi)者都可以獲取所有消息。
? 在路由模式中,我們將添加一個(gè)功能 - 我們將只能訂閱一部分消息。
例如,我們只能將重要的錯(cuò)誤消息引導(dǎo)到日志文件(以節(jié)省磁盤(pán)空間),同時(shí)仍然能夠在控制臺(tái)上打印所有日志消息。
? 但是,在某些場(chǎng)景下,我們希望不同的消息被不同的隊(duì)列消費(fèi)。這時(shí)就要用到Direct類(lèi)型的Exchange。
? 在Direct模型下,隊(duì)列與交換機(jī)的綁定,不能是任意綁定了,而是要指定一個(gè)RoutingKey(路由key)
? 消息的發(fā)送方在向Exchange發(fā)送消息時(shí),也必須指定消息的routing key。
? P:生產(chǎn)者,向Exchange發(fā)送消息,發(fā)送消息時(shí),會(huì)指定一個(gè)routing key。
? X:Exchange(交換機(jī)),接收生產(chǎn)者的消息,然后把消息遞交給 與routing key完全匹配的隊(duì)列
? C1:消費(fèi)者,其所在隊(duì)列指定了需要routing key 為 error 的消息
? C2:消費(fèi)者,其所在隊(duì)列指定了需要routing key 為 info、error、warning 的消息
1.1、生產(chǎn)者
此處我們模擬商品的增刪改,發(fā)送消息的RoutingKey分別是:insert、update、delete
public class Send {private final static String EXCHANGE_NAME = "direct_exchange_test";public static void main(String[] argv) throws Exception {// 獲取到連接Connection connection = ConnectionUtil.getConnection();// 獲取通道Channel channel = connection.createChannel();// 聲明exchange,指定類(lèi)型為directchannel.exchangeDeclare(EXCHANGE_NAME, "direct");// 消息內(nèi)容String message = "商品新增了, id = 1001";// 發(fā)送消息,并且指定routing key 為:insert ,代表新增商品channel.basicPublish(EXCHANGE_NAME, "insert", null, message.getBytes());System.out.println(" [商品服務(wù):] Sent '" + message + "'");channel.close();connection.close();}
}
1.2、消費(fèi)者1
我們此處假設(shè)消費(fèi)者1只接收兩種類(lèi)型的消息:更新商品和刪除商品。
public class Recv {private final static String QUEUE_NAME = "direct_exchange_queue_1";private final static String EXCHANGE_NAME = "direct_exchange_test";public static void main(String[] argv) throws Exception {// 獲取到連接Connection connection = ConnectionUtil.getConnection();// 獲取通道Channel channel = connection.createChannel();// 聲明隊(duì)列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 綁定隊(duì)列到交換機(jī),同時(shí)指定需要訂閱的routing key。假設(shè)此處需要update和delete消息channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");// 定義隊(duì)列的消費(fèi)者DefaultConsumer consumer = new DefaultConsumer(channel) {// 獲取消息,并且處理,這個(gè)方法類(lèi)似事件監(jiān)聽(tīng),如果有消息的時(shí)候,會(huì)被自動(dòng)調(diào)用@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,byte[] body) throws IOException {// body 即消息體String msg = new String(body);System.out.println(" [消費(fèi)者1] received : " + msg + "!");}};// 監(jiān)聽(tīng)隊(duì)列,自動(dòng)ACKchannel.basicConsume(QUEUE_NAME, true, consumer);}
}
1.3、 消費(fèi)者2
我們此處假設(shè)消費(fèi)者2接收所有類(lèi)型的消息:新增商品,更新商品和刪除商品。
public class Recv2 {private final static String QUEUE_NAME = "direct_exchange_queue_2";private final static String EXCHANGE_NAME = "direct_exchange_test";public static void main(String[] argv) throws Exception {// 獲取到連接Connection connection = ConnectionUtil.getConnection();// 獲取通道Channel channel = connection.createChannel();// 聲明隊(duì)列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 綁定隊(duì)列到交換機(jī),同時(shí)指定需要訂閱的routing key。訂閱 insert、update、deletechannel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "insert");channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "update");channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "delete");// 定義隊(duì)列的消費(fèi)者DefaultConsumer consumer = new DefaultConsumer(channel) {// 獲取消息,并且處理,這個(gè)方法類(lèi)似事件監(jiān)聽(tīng),如果有消息的時(shí)候,會(huì)被自動(dòng)調(diào)用@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties,byte[] body) throws IOException {// body 即消息體String msg = new String(body);System.out.println(" [消費(fèi)者2] received : " + msg + "!");}};// 監(jiān)聽(tīng)隊(duì)列,自動(dòng)ACKchannel.basicConsume(QUEUE_NAME, true, consumer);}
}
1.4、測(cè)試
我們分別發(fā)送增、刪、改的RoutingKey,發(fā)現(xiàn)結(jié)果: