RabbitMQ(五):主题(Topics)

前言

先决条件
✔️ 正确安装 RabbitMQ 并将其运行在 localhost:5672 上
✔️ 已经了解了 RabbitMQ 中的一些基础概念

上一个教程中,我们改进了我们的日志系统。我们使用direct exchange替代了fanout exchange,从只能盲目的广播消息改进为有可能选择性的接收日志。

尽管direct exchange能够改善我们的系统,但是它也有它的限制——没办法基于多个标准执行路由操作。

在我们的日志系统中,我们不只希望订阅基于严重程度的日志,同时还希望订阅基于发送来源的日志。Unix 工具 syslog 就是同时基于严重程度 -severity (info/warn/crit…) 和 设备 -facility (auth/cron/kern…) 来路由日志的。

如果这样的话,将会给予我们非常大的灵活性,我们既可以监听来源于”cron”的严重程度为”critical errors”的日志,也可以监听来源于”kern”的所有日志。

为了实现这个目的,接下来我们学习如何使用另一种更复杂的交换器——topic exchange。

Topic exchange

topic exchange与direct exchange类似,也是将消息路由到 binding key 与 routing key 相匹配的 Queue 中,但这里的匹配规则有些不同,它约定:

  • routing key 为一个句点号.分隔的字符串(我们将被句点号.分隔开的每一段独立的字符串称为一个单词),如 “stock.usd.nyse”、”nyse.vmw”、”quick.orange.rabbit”
  • binding key 与 routing key 一样也是句点号.分隔的字符串
  • binding key 中可以存在两种特殊字符*#,用于做模糊匹配,其中*用于匹配一个单词,#用于匹配多个单词(可以是零个)

这个例子里,我们发送的所有消息都是用来描述小动物的。发送的消息所携带的路由键是由三个单词所组成的,这三个单词被两个.分割开。路由键里的第一个单词描述的是动物的速度,第二个单词是动物的颜色,第三个是动物的种类。所以它看起来是这样的:”速度.颜色.种类”。

我们创建了三个绑定:Q1的binding key为”.orange.“,Q2的binding key为”..rabbit”和”lazy.#”。

这三个binding key被可以总结为:

  • Q1 对所有的桔黄色动物都感兴趣。
  • Q2 则是对所有的兔子和所有懒惰的动物感兴趣。

以上图中的配置为例:
routingKey=”quick.orange.rabbit” 的消息会同时路由到 Q1 与 Q2
routingKey=”lazy.orange.fox” 的消息会路由到 Q1 与 Q2
routingKey=”lazy.brown.fox” 的消息会路由到 Q2
routingKey=”lazy.pink.rabbit” 的消息会路由到 Q2(只会投递给 Q2 一次,虽然这个 routingKey 与 Q2 的两个 bindingKey 都匹配)
routingKey=”quick.brown.fox”、routingKey=”orange”、routingKey=”quick.orange.male.rabbit” 的消息将会被丢弃,因为它们没有匹配任何 bindingKey

如果我们违反约定,发送了一个routing key为一个单词或者四个单词(”orange” or “quick.orange.male.rabbit”)的消息时,该消息不会投递给任何一个队列,而且会丢失掉。

但是,即使”lazy.orange.male.rabbit”有四个单词,他还是会匹配最后一个绑定,并且被投递到第二个队列中。

topic exchange
topic exchange 是强大的,它可以表现出跟其他 exchange 类似的行为。
当一个队列的 binding key 为 “#”(井号) 的时候,它会接收所有消息,而不考虑 routing key,就像 fanout exchange。
当 * (星号) 和 # (井号) 这两个特殊字符都未在绑定键中出现的时候,此时 topic exchange 会表现得像 direct exchange 一样。

代码整合

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class Tut5Sender {

@Autowired
private AmqpTemplate template;

@Autowired
private TopicExchange topic;

private int index;

private int count;

private final String[] keys = {"quick.orange.rabbit",
"lazy.orange.elephant", "quick.orange.fox",
"lazy.brown.fox", "lazy.pink.rabbit", "quick.brown.fox"};


@Scheduled(fixedDelay = 1000, initialDelay = 500)
public void send() {
StringBuilder builder = new StringBuilder("Hello to ");
if (++this.index == keys.length) {
this.index = 0;
}
String key = keys[this.index];
builder.append(key).append(' ');
builder.append(Integer.toString(++this.count));
String message = builder.toString();
template.convertAndSend(topic.getName(), key, message);
System.out.println(" [x] Sent '" + message + "'");
}

}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class Tut5Receiver {

@RabbitListener(queues = "#{autoDeleteQueue1}")
public void receiver1(String in) throws InterruptedException {
receive(in, 1);
}

@RabbitListener(queues = "#{autoDeleteQueue2}")
public void receiver2(String in) throws InterruptedException {
receive(in, 2);
}

private void receive(String in, int instance) throws InterruptedException {
StopWatch watch = new StopWatch();
watch.start();
System.out.println("instance " + instance + " [x] Received '" + in + "'");
doWork(in);
watch.stop();
System.out.println("instance " + instance + " [x] Done in "
+ watch.getTotalTimeSeconds() + "s");
}

private void doWork(String in) throws InterruptedException {
for (char c : in.toCharArray()) {
if (c == '.') {
Thread.sleep(1000);
}
}
}

}

配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
@Profile({"tut5", "topics"})
@Configuration
public class Tut5Config {

@Bean
public TopicExchange topic() {
return new TopicExchange("tut.topic");
}

@Profile("receiver")
private static class ReceiverConfig {

@Bean
public Queue autoDeleteQueue1() {
return new AnonymousQueue();
}

@Bean
public Queue autoDeleteQueue2() {
return new AnonymousQueue();
}

@Bean
public Binding binding1a(TopicExchange topic, Queue autoDeleteQueue1) {
return BindingBuilder.bind(autoDeleteQueue1)
.to(topic)
.with("*.orange.*");
}

@Bean
public Binding binding2a(TopicExchange topic, Queue autoDeleteQueue2) {
return BindingBuilder.bind(autoDeleteQueue2)
.to(topic)
.with("*.*.rabbit");
}

@Bean
public Binding binding2b(TopicExchange topic, Queue autoDeleteQueue2) {
return BindingBuilder.bind(autoDeleteQueue2)
.to(topic)
.with("lazy.#");
}

@Bean
public Tut5Receiver receiver() {
return new Tut5Receiver();
}

}

@Profile("sender")
@Bean
public Tut5Sender sender() {
return new Tut5Sender();
}

}

运行

maven 编译

1
mvn clean package -Dmaven.test.skip=true

运行

1
2
java -jar target/rabbitmq-tutorial-0.0.1-SNAPSHOT.jar --spring.profiles.active=tut5,receiver  --tutorial.client.duration=60000
java -jar target/rabbitmq-tutorial-0.0.1-SNAPSHOT.jar --spring.profiles.active=tut5,sender --tutorial.client.duration=60000

输出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// Sender
Ready ... running for 60000ms
[x] Sent 'Hello to lazy.orange.elephant 1'
[x] Sent 'Hello to quick.orange.fox 2'
[x] Sent 'Hello to lazy.brown.fox 3'
[x] Sent 'Hello to lazy.pink.rabbit 4'
[x] Sent 'Hello to quick.brown.fox 5'
[x] Sent 'Hello to quick.orange.rabbit 6'
[x] Sent 'Hello to lazy.orange.elephant 7'
[x] Sent 'Hello to quick.orange.fox 8'
[x] Sent 'Hello to lazy.brown.fox 9'
[x] Sent 'Hello to lazy.pink.rabbit 10'

// Receiver
Ready ... running for 60000ms
instance 1 [x] Received 'Hello to lazy.orange.elephant 1'
instance 2 [x] Received 'Hello to lazy.orange.elephant 1'
instance 2 [x] Done in 2.004s
instance 1 [x] Done in 2.004s
instance 2 [x] Received 'Hello to lazy.brown.fox 3'
instance 1 [x] Received 'Hello to quick.orange.fox 2'
instance 1 [x] Done in 2.006s
instance 2 [x] Done in 2.006s
instance 2 [x] Received 'Hello to lazy.pink.rabbit 4'
instance 1 [x] Received 'Hello to quick.orange.rabbit 6'
instance 2 [x] Done in 2.006s
instance 2 [x] Received 'Hello to quick.orange.rabbit 6'
instance 1 [x] Done in 2.007s
instance 1 [x] Received 'Hello to lazy.orange.elephant 7'
instance 2 [x] Done in 2.006s
instance 2 [x] Received 'Hello to lazy.orange.elephant 7'
instance 1 [x] Done in 2.003s
instance 1 [x] Received 'Hello to quick.orange.fox 8'
instance 2 [x] Done in 2.005s
instance 2 [x] Received 'Hello to lazy.brown.fox 9'
instance 1 [x] Done in 2.005s
instance 2 [x] Done in 2.004s
instance 2 [x] Received 'Hello to lazy.pink.rabbit 10'

代码地址:https://github.com/zhaoyibo/rabbitmq-tutorial
相关文章:

  1. RabbitMQ(零):基础概念
  2. RabbitMQ(一):Hello World
  3. RabbitMQ(二):工作队列(Work queues)
  4. RabbitMQ(三):发布订阅(Publish/Subscribe)
  5. RabbitMQ(四):路由(Routing)
  6. RabbitMQ(五):主题(Topics)
  7. RabbitMQ(六):远程过程调用(RPC)

参考

RabbitMQ Tutorial Five