RabbitMQ基础入门:消息队列的奇妙世界
欢迎来到RabbitMQ的世界!如果你觉得系统间直接调用像手拉手过马路一样危险,那么消息队列就是那条安全的地下通道——让你的系统组件可以安全、异步地通信。
1. 消息队列是什么鬼?
想象一下邮局的工作方式:
- 你写信(发送消息)
- 邮局接收并保存(消息队列)
- 邮递员投递(消费者处理)
消息队列(Message Queue)就是这样一种"邮局",在分布式系统中起到"中间人"的作用。
RabbitMQ的核心特点
- 基于AMQP协议:Advanced Message Queuing Protocol,业界标准
- 多种消息模式:简单队列、工作队列、发布订阅等
- 高可靠性:消息持久化、确认机制
- 灵活路由:支持多种交换机类型
应用场景
- 异步任务处理:用户注册后发送欢迎邮件
- 系统解耦:订单系统和库存系统不直接调用
- 削峰填谷:秒杀活动时缓冲大量请求
- 日志收集:统一收集各服务日志
2. 环境搭建:让RabbitMQ跑起来
安装RabbitMQ
RabbitMQ是用Erlang语言开发的,所以需要先安装Erlang环境。
Docker方式(推荐):
bash
# 运行RabbitMQ容器(带管理界面)
docker run -d --name rabbitmq \
-p 5672:5672 -p 15672:15672 \
rabbitmq:3-management
# 访问管理界面:http://localhost:15672
# 默认账号密码:guest/guestLinux安装:
bash
# Ubuntu/Debian
sudo apt update
sudo apt install erlang
sudo apt install rabbitmq-server
# 启动服务
sudo systemctl start rabbitmq-server
sudo systemctl enable rabbitmq-server管理界面
RabbitMQ自带一个炫酷的Web管理界面:
bash
# 启用管理插件(如果没启用)
rabbitmq-plugins enable rabbitmq_management
# 创建管理员用户
rabbitmqctl add_user admin your_password
rabbitmqctl set_user_tags admin administrator
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"访问地址:http://localhost:15672
3. 核心概念:RabbitMQ的五虎上将
核心组件
- 生产者(Producer):发送消息的应用
- 消费者(Consumer):接收消息的应用
- 交换机(Exchange):接收生产者消息,路由到队列
- 队列(Queue):存储消息,供消费者获取
- 绑定(Binding):交换机与队列的关联关系
消息流转流程
生产者 → 交换机 → 队列 → 消费者4. 交换机类型:消息的导航系统
直连交换机(Direct Exchange)
就像快递分拣中心,根据地址(路由键)精确投递:
java
// Java示例
// 声明交换机
channel.exchangeDeclare("direct_logs", "direct");
// 声明队列
String queueName = channel.queueDeclare().getQueue();
// 绑定队列到交换机(指定路由键)
channel.queueBind(queueName, "direct_logs", "error");
channel.queueBind(queueName, "direct_logs", "warning");
// 发送消息
channel.basicPublish("direct_logs", "error", null, "错误日志内容".getBytes());扇形交换机(Fanout Exchange)
就像广播电台,消息发送给所有听众:
java
// 声明扇形交换机
channel.exchangeDeclare("logs", "fanout");
// 发送消息(忽略路由键)
channel.basicPublish("logs", "", null, "广播消息".getBytes());主题交换机(Topic Exchange)
就像邮件分类系统,支持通配符匹配:
java
// 声明主题交换机
channel.exchangeDeclare("topic_logs", "topic");
// 绑定队列(使用模式匹配)
channel.queueBind(queueName, "topic_logs", "*.error.*"); // 匹配 xxx.error.xxx
channel.queueBind(queueName, "topic_logs", "order.#"); // 匹配 order.开头的所有路由键
// 发送消息
channel.basicPublish("topic_logs", "user.login.error", null, "用户登录错误".getBytes());
channel.basicPublish("topic_logs", "order.create.success", null, "订单创建成功".getBytes());通配符规则:
*:匹配一个单词(如a.*.c匹配a.b.c)#:匹配零个或多个单词(如a.#匹配a.b.c.d)
5. 基本消息模式:五种经典套路
简单模式(Hello World)
一个生产者,一个消费者,一个队列:
java
// 生产者
public class Send {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 声明队列
channel.queueDeclare("hello", false, false, false, null);
// 发送消息
String message = "Hello RabbitMQ!";
channel.basicPublish("", "hello", null, message.getBytes());
System.out.println("发送消息: " + message);
}
}
}
// 消费者
public class Recv {
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// 声明队列
channel.queueDeclare("hello", false, false, false, null);
// 消费消息
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("接收到消息: " + message);
};
channel.basicConsume("hello", true, deliverCallback, consumerTag -> { });
}
}工作队列模式(Work Queues)
多个消费者共享一个队列,分担任务:
java
// 生产者发送任务
for (int i = 0; i < 10; i++) {
String message = "Task " + i;
channel.basicPublish("", "task_queue",
MessageProperties.PERSISTENT_TEXT_PLAIN,
message.getBytes("UTF-8"));
}
// 消费者(设置公平分发)
channel.basicQos(1); // 每次只处理一条消息
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("处理任务: " + message);
try {
// 模拟耗时任务
Thread.sleep(1000);
} finally {
// 手动确认消息
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
};
// 关闭自动确认
channel.basicConsume("task_queue", false, deliverCallback, consumerTag -> { });发布订阅模式(Publish/Subscribe)
消息广播给所有订阅者:
java
// 生产者
channel.exchangeDeclare("logs", "fanout");
channel.basicPublish("logs", "", null, "广播消息".getBytes());
// 消费者
channel.exchangeDeclare("logs", "fanout");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, "logs", "");
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });路由模式(Routing)
根据路由键精确路由消息:
java
// 生产者
channel.exchangeDeclare("direct_logs", "direct");
channel.basicPublish("direct_logs", "error", null, "错误日志".getBytes());
// 消费者(只接收error级别日志)
channel.queueBind(queueName, "direct_logs", "error");主题模式(Topics)
最灵活的路由模式:
java
// 生产者
channel.exchangeDeclare("topic_logs", "topic");
channel.basicPublish("topic_logs", "order.create", null, "创建订单".getBytes());
// 消费者(接收所有订单相关消息)
channel.queueBind(queueName, "topic_logs", "order.*");6. 实践练习:小试牛刀
练习1:简单消息发送接收
bash
# 1. 启动RabbitMQ
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management
# 2. 编写Java程序发送"Hello RabbitMQ"消息
# 3. 编写消费者程序接收并打印消息练习2:工作队列处理日志
java
// 模拟日志处理任务
public class LogProducer {
public static void main(String[] args) throws Exception {
// 发送不同类型日志任务
String[] logLevels = {"info", "warning", "error"};
for (int i = 0; i < 20; i++) {
String level = logLevels[i % 3];
String message = "[" + level + "] Log message " + i;
channel.basicPublish("", "log_queue", null, message.getBytes());
}
}
}
// 多个消费者处理日志
public class LogConsumer {
public static void main(String[] args) throws Exception {
// 设置不同的处理时间模拟不同复杂度
// Consumer1: 快速处理
// Consumer2: 慢速处理
}
}练习3:主题模式消息分类
java
// 发送不同类型的消息
channel.basicPublish("topic_exchange", "order.create", null, "创建订单".getBytes());
channel.basicPublish("topic_exchange", "user.login", null, "用户登录".getBytes());
channel.basicPublish("topic_exchange", "payment.success", null, "支付成功".getBytes());
// 消费者1:处理订单相关消息
channel.queueBind(queue1, "topic_exchange", "order.*");
// 消费者2:处理用户相关消息
channel.queueBind(queue2, "topic_exchange", "user.*");
// 消费者3:处理所有消息
channel.queueBind(queue3, "topic_exchange", "#");总结
恭喜你完成了RabbitMQ基础入门!现在你已经掌握了:
- 消息队列的基本概念和应用场景
- RabbitMQ的安装和基本操作
- 四种核心交换机类型
- 五种基本消息模式
- 简单的生产者和消费者实现
RabbitMQ就像一个智能的邮局系统,能够根据不同的"地址"(路由键)将"信件"(消息)准确投递到相应的"收件箱"(队列)中。在下一章节中,我们将学习RabbitMQ的进阶特性,包括消息可靠性保障、死信队列、延迟队列等更高级的功能。
记住,学习消息队列最重要的是理解不同模式的适用场景,多动手实践各种模式,这样才能在实际项目中灵活运用。赶紧打开你的开发环境,开始你的RabbitMQ之旅吧!