Skip to content

RabbitMQ基础入门:消息队列的奇妙世界

欢迎来到RabbitMQ的世界!如果你觉得系统间直接调用像手拉手过马路一样危险,那么消息队列就是那条安全的地下通道——让你的系统组件可以安全、异步地通信。

1. 消息队列是什么鬼?

想象一下邮局的工作方式:

  • 你写信(发送消息)
  • 邮局接收并保存(消息队列)
  • 邮递员投递(消费者处理)

消息队列(Message Queue)就是这样一种"邮局",在分布式系统中起到"中间人"的作用。

RabbitMQ的核心特点

  • 基于AMQP协议:Advanced Message Queuing Protocol,业界标准
  • 多种消息模式:简单队列、工作队列、发布订阅等
  • 高可靠性:消息持久化、确认机制
  • 灵活路由:支持多种交换机类型

应用场景

  • 异步任务处理:用户注册后发送欢迎邮件
  • 系统解耦:订单系统和库存系统不直接调用
  • 削峰填谷:秒杀活动时缓冲大量请求
  • 日志收集:统一收集各服务日志

2. 环境搭建:让RabbitMQ跑起来

安装RabbitMQ

RabbitMQ是用Erlang语言开发的,所以需要先安装Erlang环境。

Docker方式(推荐):

bash
# 运行RabbitMQ容器(带管理界面)
docker run -d --name rabbitmq \
  -p 5672:5672 -p 15672:15672 \
  rabbitmq:3-management

# 访问管理界面:http://localhost:15672
# 默认账号密码:guest/guest

Linux安装:

bash
# Ubuntu/Debian
sudo apt update
sudo apt install erlang
sudo apt install rabbitmq-server

# 启动服务
sudo systemctl start rabbitmq-server
sudo systemctl enable rabbitmq-server

管理界面

RabbitMQ自带一个炫酷的Web管理界面:

bash
# 启用管理插件(如果没启用)
rabbitmq-plugins enable rabbitmq_management

# 创建管理员用户
rabbitmqctl add_user admin your_password
rabbitmqctl set_user_tags admin administrator
rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"

访问地址:http://localhost:15672

3. 核心概念:RabbitMQ的五虎上将

核心组件

  1. 生产者(Producer):发送消息的应用
  2. 消费者(Consumer):接收消息的应用
  3. 交换机(Exchange):接收生产者消息,路由到队列
  4. 队列(Queue):存储消息,供消费者获取
  5. 绑定(Binding):交换机与队列的关联关系

消息流转流程

生产者 → 交换机 → 队列 → 消费者

4. 交换机类型:消息的导航系统

直连交换机(Direct Exchange)

就像快递分拣中心,根据地址(路由键)精确投递:

java
// Java示例
// 声明交换机
channel.exchangeDeclare("direct_logs", "direct");

// 声明队列
String queueName = channel.queueDeclare().getQueue();

// 绑定队列到交换机(指定路由键)
channel.queueBind(queueName, "direct_logs", "error");
channel.queueBind(queueName, "direct_logs", "warning");

// 发送消息
channel.basicPublish("direct_logs", "error", null, "错误日志内容".getBytes());

扇形交换机(Fanout Exchange)

就像广播电台,消息发送给所有听众:

java
// 声明扇形交换机
channel.exchangeDeclare("logs", "fanout");

// 发送消息(忽略路由键)
channel.basicPublish("logs", "", null, "广播消息".getBytes());

主题交换机(Topic Exchange)

就像邮件分类系统,支持通配符匹配:

java
// 声明主题交换机
channel.exchangeDeclare("topic_logs", "topic");

// 绑定队列(使用模式匹配)
channel.queueBind(queueName, "topic_logs", "*.error.*");  // 匹配 xxx.error.xxx
channel.queueBind(queueName, "topic_logs", "order.#");    // 匹配 order.开头的所有路由键

// 发送消息
channel.basicPublish("topic_logs", "user.login.error", null, "用户登录错误".getBytes());
channel.basicPublish("topic_logs", "order.create.success", null, "订单创建成功".getBytes());

通配符规则:

  • *:匹配一个单词(如 a.*.c 匹配 a.b.c
  • #:匹配零个或多个单词(如 a.# 匹配 a.b.c.d

5. 基本消息模式:五种经典套路

简单模式(Hello World)

一个生产者,一个消费者,一个队列:

java
// 生产者
public class Send {
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try (Connection connection = factory.newConnection();
             Channel channel = connection.createChannel()) {
            
            // 声明队列
            channel.queueDeclare("hello", false, false, false, null);
            
            // 发送消息
            String message = "Hello RabbitMQ!";
            channel.basicPublish("", "hello", null, message.getBytes());
            System.out.println("发送消息: " + message);
        }
    }
}

// 消费者
public class Recv {
    public static void main(String[] args) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        
        // 声明队列
        channel.queueDeclare("hello", false, false, false, null);
        
        // 消费消息
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println("接收到消息: " + message);
        };
        
        channel.basicConsume("hello", true, deliverCallback, consumerTag -> { });
    }
}

工作队列模式(Work Queues)

多个消费者共享一个队列,分担任务:

java
// 生产者发送任务
for (int i = 0; i < 10; i++) {
    String message = "Task " + i;
    channel.basicPublish("", "task_queue", 
        MessageProperties.PERSISTENT_TEXT_PLAIN,
        message.getBytes("UTF-8"));
}

// 消费者(设置公平分发)
channel.basicQos(1); // 每次只处理一条消息
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
    String message = new String(delivery.getBody(), "UTF-8");
    System.out.println("处理任务: " + message);
    
    try {
        // 模拟耗时任务
        Thread.sleep(1000);
    } finally {
        // 手动确认消息
        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
    }
};

// 关闭自动确认
channel.basicConsume("task_queue", false, deliverCallback, consumerTag -> { });

发布订阅模式(Publish/Subscribe)

消息广播给所有订阅者:

java
// 生产者
channel.exchangeDeclare("logs", "fanout");
channel.basicPublish("logs", "", null, "广播消息".getBytes());

// 消费者
channel.exchangeDeclare("logs", "fanout");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, "logs", "");
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });

路由模式(Routing)

根据路由键精确路由消息:

java
// 生产者
channel.exchangeDeclare("direct_logs", "direct");
channel.basicPublish("direct_logs", "error", null, "错误日志".getBytes());

// 消费者(只接收error级别日志)
channel.queueBind(queueName, "direct_logs", "error");

主题模式(Topics)

最灵活的路由模式:

java
// 生产者
channel.exchangeDeclare("topic_logs", "topic");
channel.basicPublish("topic_logs", "order.create", null, "创建订单".getBytes());

// 消费者(接收所有订单相关消息)
channel.queueBind(queueName, "topic_logs", "order.*");

6. 实践练习:小试牛刀

练习1:简单消息发送接收

bash
# 1. 启动RabbitMQ
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3-management

# 2. 编写Java程序发送"Hello RabbitMQ"消息
# 3. 编写消费者程序接收并打印消息

练习2:工作队列处理日志

java
// 模拟日志处理任务
public class LogProducer {
    public static void main(String[] args) throws Exception {
        // 发送不同类型日志任务
        String[] logLevels = {"info", "warning", "error"};
        for (int i = 0; i < 20; i++) {
            String level = logLevels[i % 3];
            String message = "[" + level + "] Log message " + i;
            channel.basicPublish("", "log_queue", null, message.getBytes());
        }
    }
}

// 多个消费者处理日志
public class LogConsumer {
    public static void main(String[] args) throws Exception {
        // 设置不同的处理时间模拟不同复杂度
        // Consumer1: 快速处理
        // Consumer2: 慢速处理
    }
}

练习3:主题模式消息分类

java
// 发送不同类型的消息
channel.basicPublish("topic_exchange", "order.create", null, "创建订单".getBytes());
channel.basicPublish("topic_exchange", "user.login", null, "用户登录".getBytes());
channel.basicPublish("topic_exchange", "payment.success", null, "支付成功".getBytes());

// 消费者1:处理订单相关消息
channel.queueBind(queue1, "topic_exchange", "order.*");

// 消费者2:处理用户相关消息
channel.queueBind(queue2, "topic_exchange", "user.*");

// 消费者3:处理所有消息
channel.queueBind(queue3, "topic_exchange", "#");

总结

恭喜你完成了RabbitMQ基础入门!现在你已经掌握了:

  • 消息队列的基本概念和应用场景
  • RabbitMQ的安装和基本操作
  • 四种核心交换机类型
  • 五种基本消息模式
  • 简单的生产者和消费者实现

RabbitMQ就像一个智能的邮局系统,能够根据不同的"地址"(路由键)将"信件"(消息)准确投递到相应的"收件箱"(队列)中。在下一章节中,我们将学习RabbitMQ的进阶特性,包括消息可靠性保障、死信队列、延迟队列等更高级的功能。

记住,学习消息队列最重要的是理解不同模式的适用场景,多动手实践各种模式,这样才能在实际项目中灵活运用。赶紧打开你的开发环境,开始你的RabbitMQ之旅吧!