RabbitMQ(一):Hello World

前言

先决条件
✔️ 正确安装 RabbitMQ 并将其运行在 localhost:5672 上
✔️ 已经了解了 RabbitMQ 中的一些基础概念

在本文中,我们将通过在 Spring Boot 应用中整合 RabbitMQ,并实现一个简单的发送、接收消息的例子来对 RabbitMQ 有一个直观的感受和理解。

Spring Boot 整合

环境:

  • RabbitMQ:3.7.4
  • Spring Boot:2.0.1.RELEASE

因为有 Starter POMs,在 Spring Boot 中整合 RabbitMQ 是一件非常容易的事,其中的 AMQP 模块就可以很好的支持 RabbitMQ。
我们可以使用 Spring Intializr 或 https://start.spring.io/ 创建一个 Spring Boot 工程,并勾选 RabbitMQ。
或者手动在pom.xml文件中加入

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

在application.yml中配置关于 RabbitMQ 的连接和用户信息,如果没有改 RabbitMQ 的默认配置的话,这里零配置即可启动。这里我们还定义了一些额外的配置备用。

1
2
3
4
5
6
7
8
spring:
profiles:
active: usage_message
rabbitmq:
port: 5672
tutorial:
client:
duration: 10000

生产者

Spring AMQP 让我们用少量的代码就能轻松实现消息的发送和接收。通过注入AmqpTemplate接口的实例来实现消息的发送,AmqpTemplate接口定义了一套针对 AMQP 协议的基础操作。在 Spring Boot 中会根据配置来注入其具体实现(AmqpTemplate的默认实现就是RabbitTemplate)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class Tut1Sender {

@Autowired
private AmqpTemplate template;

@Autowired
private Queue queue;

/**
* 用定时任务来模拟生产者定时发送消息
*/
@Scheduled(fixedDelay = 1000, initialDelay = 500)
public void send() {
String message = "Hello World!" + new Date();
template.convertAndSend(queue.getName(), message);
System.out.println(" [x] Sent '" + message + "'");
}

}

在该生产者中,我们会产生一个字符串,并发送到名为”hello-world”的队列中。

消费者

创建消费者Receiver。通过@RabbitListener注解定义该类对”hello-world”队列的监听,并用@RabbitHandler注解来指定对消息的处理方法。所以,该消费者实现了对”hello-world”队列的消费,消费操作为输出消息的字符串内容。

1
2
3
4
5
6
7
8
9
@RabbitListener(queues = "hello-world")
public class Tut1Receiver {

@RabbitHandler
public void receive(String in) {
System.out.println(" [x] Received '" + in + "'");
}

}

配置类

创建一个新的 JavaConfig 文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Profile({"tut1", "hello-world"})
@Configuration
public class Tut1Config {

@Bean
public Queue queue() {
return new Queue("hello-world");
}

@Profile("receiver")
@Bean
public Tut1Receiver receiver() {
return new Tut1Receiver();
}

@Profile("sender")
@Bean
public Tut1Sender sender() {
return new Tut1Sender();
}

}

在上面的 JavaConfig 中,我们使用@Configuration让 Spring 知道这是一个 Java 配置,并定义了生产者、消费者和一个名为”hello-world”的队列。并且,我们使用 Spring Profiles 来控制它运行哪个示例,以及它是生产者还是消费者,这样我们就可以简单的通过启动参数传递我们的配置文件来正确的启动应用了。
至于具体的生产者(Tut1Sender)和消费者(Tut1Receiver),我们这里仅先定义出来,稍后再具体实现。

应用主类

再小小的改造一下生成的 RabbitmqTutorialApplication.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@SpringBootApplication
@EnableScheduling
public class RabbitmqTutorialApplication {

public static void main(String[] args) {
new SpringApplicationBuilder()
.sources(RabbitmqTutorialApplication.class)
// 设置成非 web 环境
.web(WebApplicationType.NONE)
.run(args);
}

@Profile("usage_message")
@Bean
public CommandLineRunner usage() {
return arg0 -> {
System.out.println("This app uses Spring Profiles to control its behavior.\n");
System.out.println("Sample usage: java -jar target/rabbitmq-tutorial-0.0.1-SNAPSHOT.jar --spring.profiles.active=hello-world,sender");
};
}

@Profile("!usage_message")
@Bean
public CommandLineRunner tutorial() {
return new RabbitTutorialRunner();
}

}

这里我将环境设置为了WebApplicationType.NONE,即非WEB环境,因为默认的话Netty会监听8080端口,同时运行的话就会接口冲突导致启动失败(当然,也可以直接在启动时用参数绑定不同的端口以避免冲突)。

其中的 RabbitTutorialRunner 如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class RabbitTutorialRunner implements CommandLineRunner {

@Value("${tutorial.client.duration:0}")
private int duration;

@Autowired
private ConfigurableApplicationContext ctx;

@Override
public void run(String... args) throws Exception {
System.out.println("Ready ... running for " + duration + "ms");
Thread.sleep(duration);
ctx.close();
}
}

这个Runner主要是为了阻止主线程退出。除了用Thread.sleep(millisecond),也可以用CountDownLatch来达到相同的目的。

运行

编译

1
mvn clean package -Dmaven.test.skip=true

运行

1
2
java -jar target/rabbitmq-tutorial-0.0.1-SNAPSHOT.jar --spring.profiles.active=tut1,sender
java -jar target/rabbitmq-tutorial-0.0.1-SNAPSHOT.jar --spring.profiles.active=tut1,receiver

输出

1
2
3
4
5
6
7
8
9
10
11
12
13
// Sender
Ready ... running for 10000ms
[x] Sent 'Hello World!Thu Apr 12 16:56:01 CST 2018'
[x] Sent 'Hello World!Thu Apr 12 16:56:03 CST 2018'
[x] Sent 'Hello World!Thu Apr 12 16:56:04 CST 2018'
...

// Receiver
Ready ... running for 10000ms
[x] Received 'Hello World!Thu Apr 12 16:56:01 CST 2018'
[x] Received 'Hello World!Thu Apr 12 16:56:03 CST 2018'
[x] Received 'Hello World!Thu Apr 12 16:56:04 CST 2018'
...

代码地址:https://github.com/zhaoyibo/rabbitmq-tutorial
相关文章:

  1. RabbitMQ(零):基础概念
  2. RabbitMQ(一):Hello World
  3. RabbitMQ(二):工作队列(Work queues)
  4. RabbitMQ(三):发布订阅(Publish/Subscribe)
  5. RabbitMQ(四):路由(Routing)
  6. RabbitMQ(五):主题(Topics)
  7. RabbitMQ(六):远程过程调用(RPC)

参考

RabbitMQ Tutorial One