凌月风的个人博客

记录精彩人生

Open Source, Open Mind,
Open Sight, Open Future!
  menu

Java笔记系列——07-消息中间件(RabbitMQ)

0 浏览

RabbitMQ

  • 默认消息协议AMQP的实现(可以通过插件开启其他协议的实现),服务器端使用Erlang语言编写,支持多客户端
  • 用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性方面表现不俗
  • https://www.rabbitmq.com/configure.html#config-items)

使用实例

  • 安装依赖环境
    1. **在 **http://www.rabbitmq.com/which-erlang.html 页面查看安装rabbitmq需要安装erlang对应的 版本
    2. **在 **https://github.com/rabbitmq/erlang-rpm/releases 页面找到需要下载的erlang版本, erlang-*.centos.x86_64.rpm就是centos版本的。
    3. 复制下载地址后,使用wget命令下载
      wget -P /home/download https://github.com/rabbitmq/erlangrpm/releases/download/v21.2.3/erlang-21.2.3-1.el7.centos.x86_64.rpm
      
    4. 安装 Erlang
      sudo rpm -Uvh /home/download/erlang-21.2.3-1.el7.centos.x86_64.rpm
      
    5. **安装 socat **
      sudo yum install -y socat
      
  • 安装RabbitMQ
    1. 在官方下载页面找到Cent OS 7版本的下载链接,下载rpm安装包
      wget -P /home/download https://github.com/rabbitmq/rabbitmqserver/releases/download/v3.7.9/rabbitmq-server-3.7.9-1.el7.noarch.rpm
      
    2. 安装RabbitMQ
      sudo rpm -Uvh /home/download/rabbitmq-server-3.7.9-1.el7.noarch.rpm
      
  • 启动和关闭
    • 启动服务:sudo systemctl start rabbitmq-server
    • 查看状态:sudo systemctl status rabbitmq-server
    • 停止服务:sudo systemctl stop rabbitmq-server
    • 设置开机启动:sudo systemctl enable rabbitmq-server
  • 开启Web管理插件
    1. 开启插件

      rabbitmq-plugins enable rabbitmq_management
      
    2. 添加用户

      RabbitMQ有一个默认的guest用户,但只能通过本地local host访问,所以需要添加一个能够远程访问的用户。

      rabbitmqctl add_user admin admin
      
    3. 为用户分配操作权限

      rabbitmqctl set_user_tags admin administrator
      
    4. 为用户分配资源权限

      rabbitmqctl set_permissions -p / admin ".*" ".*" ".*"
      
  • 防火墙添加端口
    1. RabbitMQ 服务启动后,还不能进行外部通信,需要将端口添加都防火墙
      sudo firewall-cmd --zone=public --add-port=4369/tcp --permanent 
      sudo firewall-cmd --zone=public --add-port=5672/tcp --permanent 
      sudo firewall-cmd --zone=public --add-port=25672/tcp --permanent 
      sudo firewall-cmd --zone=public --add-port=15672/tcp --permanent
      
    2. 重启防火墙
      sudo firewall-cmd --reload
      

  • 单节点使用
    • 导入依赖
      <dependencies>
          <dependency>
              <groupId>com.rabbitmq</groupId>
              <artifactId>amqp-client</artifactId>
              <version>5.5.1</version>
          </dependency>
      </dependencies>
      
    • 创建生产者
      /**
       * Topic--生产者
       *
       * 生产者将消息发送到topic类型的交换器上,和routing的用法类似,都是通过routingKey路由,但topic类型交换器的routingKey支持通配符
       */
      public class Producer {
          public static void main(String[] args) {
              // 1、创建连接工厂
              ConnectionFactory factory = new ConnectionFactory();
              // 2、设置连接属性
              factory.setHost("192.168.100.242");
              factory.setPort(5672);
              factory.setUsername("admin");
              factory.setPassword("admin");
              // 声明连接
              Connection connection = null;
              Channel channel = null;
              try {
                  // 3、从连接工厂获取连接
                  connection = factory.newConnection("生产者");
                  // 4、从链接中创建通道
                  channel = connection.createChannel();
                  // 路由关系如下:com.# --> queue-1     *.order.* ---> queue-2
                  // 消息内容
                  String message = "Hello A";
                  // 发送消息到topic_test交换器上
                  channel.basicPublish("topic-exchange", "com.order.create", null, message.getBytes());
                  System.out.println("消息 " + message + " 已发送!");
                  // 消息内容
                  message = "Hello B";
                  // 发送消息到topic_test交换器上
                  channel.basicPublish("topic-exchange", "com.sms.create", null, message.getBytes());
                  System.out.println("消息 " + message + " 已发送!");
                  // 消息内容
                  message = "Hello C";
                  // 发送消息到topic_test交换器上
                  channel.basicPublish("topic-exchange", "cn.order.create", null, message.getBytes());
                  System.out.println("消息 " + message + " 已发送!");
              } catch (Exception e) {
                  e.printStackTrace();
              } finally {
                  // 7、关闭通道
                  if (channel != null && channel.isOpen()) {
                      try {
                          channel.close();
                      } catch (Exception e) {
                          e.printStackTrace();
                      }
                  }
                  // 8、关闭连接
                  if (connection != null && connection.isOpen()) {
                      try {
                          connection.close();
                      } catch (IOException e) {
                          e.printStackTrace();
                      }
                  }
              }
          }
      }
      
    • 创建消费者
      /**
       * 路由--消费者
       *
       * 消费者通过一个临时队列和交换器绑定,接收发送到交换器上的消息
       */
      public class Consumer {
          private static Runnable receive = new Runnable() {
              public void run() {
                  // 1、创建连接工厂
                  ConnectionFactory factory = new ConnectionFactory();
                  // 2、设置连接属性
                  factory.setHost("192.168.100.242");
                  factory.setPort(5672);
                  factory.setUsername("admin");
                  factory.setPassword("admin");
                  Connection connection = null;
                  Channel channel = null;
                  final String queueName = Thread.currentThread().getName();
                  try {
                      // 3、从连接工厂获取连接
                      connection = factory.newConnection("消费者");
                      // 4、从链接中创建通道
                      channel = connection.createChannel();
                      // 定义消息接收回调对象
                      DeliverCallback callback = new DeliverCallback() {
                          public void handle(String consumerTag, Delivery message) throws IOException {
                              System.out.println(queueName + " 收到消息:" + new String(message.getBody(), "UTF-8"));
                          }
                      };
                      // 监听队列
                      channel.basicConsume(queueName, true, callback, new CancelCallback() {
                          public void handle(String consumerTag) throws IOException {
                          }
                      });
                      System.out.println(queueName + " 开始接收消息");
                      System.in.read();
                  } catch (Exception e) { 
                      e.printStackTrace();
                  } finally {
                      // 8、关闭通道
                      if (channel != null && channel.isOpen()) {
                          try {
                              channel.close();
                          } catch (Exception e) {
                              e.printStackTrace();
                          }
                      }
                      // 9、关闭连接
                      if (connection != null && connection.isOpen()) {
                          try {
                              connection.close();
                          } catch (IOException e) {
                              e.printStackTrace();
                          }
                      }
                  }
              }
          };
          public static void main(String[] args) {
              new Thread(receive, "queue-1").start();
              new Thread(receive, "queue-2").start();
              new Thread(receive, "queue-3").start();
          }
      }
      

  • 多节点使用
    • 创建生产者
      /**
       * 客户端连接集群示例
       */
      public class Producer {
          public static void main(String[] args) {
              // 1、创建连接工厂
              ConnectionFactory factory = new ConnectionFactory();
              // 2、设置连接属性
              factory.setUsername("order-user");
              factory.setPassword("order-user");
              factory.setVirtualHost("v1");
              Connection connection = null;
              Channel channel = null;
              // 3、设置每个节点的链接地址和端口
              Address[] addresses = new Address[]{
                  new Address("192.168.100.242", 5672),
                  new Address("192.168.100.241", 5672)
              };
              try {
                  // 开启/关闭连接自动恢复,默认是开启状态
                  factory.setAutomaticRecoveryEnabled(true);
                  // 设置每100毫秒尝试恢复一次,默认是5秒:com.rabbitmq.client.ConnectionFactory.DEFAULT_NETWORK_RECOVERY_INTERVAL
                  factory.setNetworkRecoveryInterval(100);
                  factory.setTopologyRecoveryEnabled(false);
                  // 4、使用连接集合里面的地址获取连接
                  connection = factory.newConnection(addresses, "生产者");
                  // 添加重连监听器
                  ((Recoverable) connection).addRecoveryListener(new RecoveryListener() {
                      /**
                       * 重连成功后的回调
                       * @param recoverable
                       */
                      public void handleRecovery(Recoverable recoverable) {
                          System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SS").format(new Date()) + " 已重新建立连接!");
                      }
                      /**
                       * 开始重连时的回调
                       * @param recoverable
                       */
                      public void handleRecoveryStarted(Recoverable recoverable) {
                          System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SS").format(new Date()) + " 开始尝试重连!");
                      }
                  });
                  // 5、从链接中创建通道
                  channel = connection.createChannel();
                  /**
                   * 6、声明(创建)队列
                   * 如果队列不存在,才会创建
                   * RabbitMQ 不允许声明两个队列名相同,属性不同的队列,否则会报错
                   *
                   * queueDeclare参数说明:
                   * @param queue 队列名称
                   * @param durable 队列是否持久化
                   * @param exclusive 是否排他,即是否为私有的,如果为true,会对当前队列加锁,其它通道不能访问,并且在连接关闭时会自动删除,不受持久化和自动删除的属性控制
                   * @param autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除
                   * @param arguments 队列参数,设置队列的有效期、消息最大长度、队列中所有消息的生命周期等等
                   */
                  channel.queueDeclare("queue1", true, false, false, null);
                  for (int i = 0; i < 100; i++) {
                      // 消息内容
                      String message = "Hello World " + i;
                      try {
                          // 7、发送消息
                          channel.basicPublish("", "queue1", null, message.getBytes());
                      } catch (AlreadyClosedException e) {
                          // 可能连接已关闭,等待重连
                          System.out.println("消息 " + message + " 发送失败!");
                          i--;
                          TimeUnit.SECONDS.sleep(2);
                          continue;
                      }
                      System.out.println("消息 " + i + " 已发送!");
                      TimeUnit.SECONDS.sleep(2);
                  }
              } catch (Exception e) {
                  e.printStackTrace();        
              } finally {
                  // 8、关闭通道
                  if (channel != null && channel.isOpen()) {
                      try {
                          channel.close();
                      } catch (Exception e) {
                          e.printStackTrace();
                      }
                  }
                  // 9、关闭连接
                  if (connection != null && connection.isOpen()) {
                      try {
                          connection.close();
                      } catch (IOException e) {
                          e.printStackTrace();
                      }
                  }
              }
          }
      }
      
    • 创建消费者
      /**
       * 客户端连接集群示例
       */
      public class Consumer {
          public static void main(String[] args) {
              // 1、创建连接工厂
              ConnectionFactory factory = new ConnectionFactory();
              // 2、设置连接属性
              factory.setUsername("order-user");
              factory.setPassword("order-user");
              factory.setVirtualHost("v1");
              Connection connection = null;
              Channel channel = null;
      
              // 3、设置每个节点的链接地址和端口
              Address[] addresses = new Address[]{
                  new Address("192.168.100.242", 5672),
                  new Address("192.168.100.241", 5672)
              };
              try {
                  // 开启/关闭连接自动恢复,默认是开启状态
                  factory.setAutomaticRecoveryEnabled(true);
                  // 设置每100毫秒尝试恢复一次,默认是5秒:com.rabbitmq.client.ConnectionFactory.DEFAULT_NETWORK_RECOVERY_INTERVAL
                  factory.setNetworkRecoveryInterval(100);
                  // 4、从连接工厂获取连接
                  connection = factory.newConnection(addresses, "消费者");
                  // 添加重连监听器
                  ((Recoverable) connection).addRecoveryListener(new RecoveryListener() {
                      /**
                       * 重连成功后的回调
                       * @param recoverable
                       */
                      public void handleRecovery(Recoverable recoverable) {
                          System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SS").format(new Date()) + " 已重新建立连接!");
                      }
                      /**
                       * 开始重连时的回调
                       * @param recoverable
                       */
                      public void handleRecoveryStarted(Recoverable recoverable) {
                          System.out.println(new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SS").format(new Date()) + " 开始尝试重连!");
                      }
                  });
                  // 5、从链接中创建通道
                  channel = connection.createChannel();
                  /**
                   * 6、声明(创建)队列
                   * 如果队列不存在,才会创建
                   * RabbitMQ 不允许声明两个队列名相同,属性不同的队列,否则会报错
                   *
                   * queueDeclare参数说明:
                   * @param queue 队列名称
                   * @param durable 队列是否持久化
                   * @param exclusive 是否排他,即是否为私有的,如果为true,会对当前队列加锁,其它通道不能访问,
                   *                  并且在连接关闭时会自动删除,不受持久化和自动删除的属性控制。
                   *                  一般在队列和交换器绑定时使用
                   * @param autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除
                   * @param arguments 队列参数,设置队列的有效期、消息最大长度、队列中所有消息的生命周期等等
                   */
                  channel.queueDeclare("queue1", true, false, false, null);
                  // 7、定义收到消息后的回调
                  final Channel finalChannel = channel;
                  DeliverCallback callback = new DeliverCallback() {
                      public void handle(String consumerTag, Delivery message) throws IOException {
                          System.out.println("收到消息:" + new String(message.getBody(), "UTF-8"));
                          finalChannel.basicAck(message.getEnvelope().getDeliveryTag(), false);
                      }
                  };
                  // 8、监听队列
                  channel.basicConsume("queue1", false, callback, new CancelCallback() {
                      public void handle(String consumerTag) throws IOException {
                      }
                  });
                  System.out.println("开始接收消息");
                  System.in.read();
      
              } catch (Exception e) {
                  e.printStackTrace();
              } finally {
                  // 9、关闭通道
                  if (channel != null && channel.isOpen()) {
                      try {
                          channel.close();
                      } catch (Exception e) {
                          e.printStackTrace();
                      }
                  }
                  // 10、关闭连接
                  if (connection != null && connection.isOpen()) {
                      try {
                          connection.close();
                      } catch (IOException e) {
                          e.printStackTrace();
                      }
                  }
              }
          }
      }
      

整体架构

  • RabbitMQ的整体结构 image-20220523154530416
  • Producer:生产者,就是投递消息的一方。生产者创建消息,然后发布到 RabbitMQ 中。

  • Connection:程序与RabbitMQ建立的连接

  • Channel:频道或信道,是建立在Connection连接之上的一种轻量级的连接
    • 大部分的操作是在Channel这个接口中完成的,包括定义队列的声明(queue Declare)、交换机的声明 (exchange Declare)、队列的绑定(queue Bind)、发布消息(basic Publish)、消费消息(basic Consume)等。
    • 如果把Connection比作一条光纤电缆的话,那么Channel信道就比作成光纤电缆中的其中一束光纤。一个 Connection上可以创建任意数量的Channel。

  • Broker:消息中间件的服务节点 。
    • 对于RabbitMQ来说, 一个RabbitMQ Broker可以简单地看作一个RabbitMQ服务节点,或者 RabbitMQ服务实例。也可以将一个RabbitMQ Broker看作一台 RabbitMQ服务器 。

  • Virtual Host:虚拟主机
    • 表示一批交换器、消息队列和相关对象。 虚拟主机是共享相同的身份认证和加密环境的独立服务器域。
    • 一个Broker可以创建多个Virtual Host,不同Virtual Host之间相互独立。
    • Virtual Host本质上就是一个mini版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。
    • Virtual Host是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 Virtual Host是 / 。

  • Exchange:交换器
    • 生产者将消息发送到 Exchange (交换器,通常也可以用大写的“X”来表示), 由交换器将消息路由到一个或者多个队列中。如果路由不到,可以返回给生产者,或直接丢弃。
  • Binding:绑定
    • RabbitMQ 中将交换器与队列关联起来,叫做绑定。
    • 在绑定的时候一般会指定一个绑定键 ( Binding Key ) ,这样 RabbitMQ 就知道如何正确地将消息路由到队列了
    • Routing Key:路由键。用来指 定这个消息的路由规则
    • Routing Key需要与交换器类型和绑定键 (Binding Key) 联合使用
    • 在交换器类型和绑定键 (Binding Key) 固定的情况下,生产者可以在发送消息给交换器时,通过 指定 Routing Key 来决定消息流向哪里。
  • 交换机类型
    • direct:直连交换机,它会把消息路由到那些 Binding Key 和 Routing Key完全匹配的队列中
    • fanout:扇型交换机,它会把所有发送到该交换器的消息路由到所有与该交换器绑定的队列中
    • topic:主题交换机,与direct类似,但它可以通过通配符进行模糊匹配
      • **使用 **# 匹配任意字符。如:com.#可以匹配到com.a、com.a.b.c
      • **使用 *** 匹配单个字符。如:*.com.*可以匹配到aaa.com.bbb无法匹配到a.b.com.a
    • headers:头交换机,
      • 根据发送的消息内容中的 headers 属性进行 匹配,而不是根据Routing Key匹配
      • headers 类型的交换器性能很差,而且也不实用
    • default exchange :由RabbitMQ预先定义好的空的交换机是direct类型,新建立的队列默认绑定到这个交换机上。

  • Queue:队列,是RabbitMQ的内部对象,用于存储消息。
    • 普通队列
    • 死信队列,监听普通队列,普通队列的消息超出一定时间没有被消费之后,会移动到死信队列,解决订单超时问题

  • Consumer:消费者,就是接收消息的一方。消费者连接到 RabbitMQ 服务器,并订阅到队列 上 。

  • Message:消息一般可以包含两个部分:消息体和附加信息。
    • 消息体(payload):消息体一般是一个带有业务逻辑结构的数据,比如一个JSON字符串
    • 附加信息:用来表述这条消息,比如目标交换器的名称、路由键和一些自定义属性 等等。
    • 生产者发送的信息会封装成固定格式

功能特性

  • RabbitMQ 的持久化分为队列持久化、消息持久化和交换器持久化。 不管是持久化的消息还是非持久化的消息都可以被写入到磁盘。
    • 队列持久化
      • 队列的持久化是在定义队列时的durable参数来实现的,durable为true时,队列才会持久化。
      • 开启后在持久化队列管理界面可以看到一个蓝色的“D”标识
        Connection connection = connectionFactory.newConnection(); 
        Channel channel = connection.createChannel(); 
        //第二个参数设置为true,即durable=true 
        channel.queueDeclare("queue1", true, false, false, null);
        
    • 消息持久化
      • 消息持久化通过消息的属性deliveryMode来设置是否持久化,发送时设置
      • 在发送消息时通过basicPublish的 参数传入,代码如下:
        // 通过传入MessageProperties.PERSISTENT_TEXT_PLAIN 就可以实现消息持久化 
        channel.basicPublish("", "queue1", MessageProperties.PERSISTENT_TEXT_PLAIN, "persistent_t est_message".getBytes());
        
    • 交换器持久化
      • 同队列一样,交换器也需要在定义时设置持久化标识,否则在Broker重启后将丢失
        // durable 为true则开启持久化 
        Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable) throws IO Exception;
        

  • 内存控制
    • 当内存使用超过配置的阈值或者磁盘剩余空间低于配置的阈值时,RabbitMQ 会暂时阻塞客户端的连接, 并停止接收从客户端发来的消息,以此避免服务崩溃,客户端与服务端的心跳检测也会失效。
    • 当出现生产者生产大量消息,而接收者接收缓慢时容易出现 告警,可以通过调整参数解决
    • 通过管理命令调整,不需重启,RabbitMQ 提供relativeabsolute两种配置方式
      • rabbitmqctl set_vm_memory_high_watermark <fraction>,参数fraction为内存阈值,RabbitMQ 默认值为0.4,表示当RabbitMQ 使用的内存超过40%时, 就会产生告警并阻塞所有生产者连接。
      • rabbitmqctl set_vm_memory_high_watermark absolute <value>,参数value单位为KB、MB、GB,
    • 通过配置文件调整vim /etc/rabbitmq/rabbitmq.conf,需要重启
      #RabbitMQ 提供relative或absolute两种配置方式
      #relative 相对值,建议取值在0.4~0.66之间,不建议超过0.7 
      #absolute 绝对值,单位为KB、MB、GB,
      vm_memory_high_watermark.relative = 0.4 
      #或者
      vm_memory_high_watermark.absolute = 1GB
      

  • 内存换页
    • 在某个 Broker 节点触及内存并阻塞生产者之前,它会尝试将队列中的消息换页到磁盘以释放内存空间。 持久化和非持久化的消息都会被转储到磁盘中,其中持久化的消息本身就在磁盘中有一份副本,这里会 将持久化的消息从内存中清除掉。
    • 默认情况下,在内存到达内存告警阈值的 50%时会进行换页动作。 也就是说,在默认的内存告警阈值为 0.4 的情况下,当内存超过 0.4 x 0.5=0.2 时会进行换页动作
    • 可以通过在配置文件中配置vm_memory_high_watermark_paging_ratio项来修改此值
      #以下配置将会在RabbitMQ 内存使用率达到30%(0.4*0.75)时进行换页动作,并在40%时阻塞生产者。
      #当 vm_memory_high_watermark_paging_ratio的值大于1时,相当于禁用了换页功能。
      vm_memory_high_watermark.relative = 0.4 
      vm_memory_high_watermark_paging_ratio = 0.75
      

  • 磁盘控制

    • 当磁盘剩余空间低于确定的阈值时,RabbitMQ 同样会阻塞生产者,这样可以避免因非持久化的消息持续 换页而耗尽磁盘空间导致服务崩溃。
  • 默认情况下,磁盘阈值为50MB,表示当磁盘剩余空间低于50MB 时会阻塞生产者并停止内存 中消息的换页动作 。 这个阈值的设置可以减小,但不能完全消除因磁盘耗尽而导致崩溃的可能性。比如在两次磁盘 空间检测期间内,磁盘空间从大于50MB被耗尽到0MB 。

  • 一个相对谨慎的做法是将磁盘阈值设置为与操作系统所显示的内存大小一致。

  • 通过命令可以临时调整磁盘阈值

    • rabbitmqctl set_disk_free_limit <disk_limit>,参数disk_limit 为固定大小,单位为KB、MB、GB
    • rabbitmqctl set_disk_free_limit mem_relative <fraction>,参数 fraction 为相对比值,建议的取值为1.0~2.0之间
  • 修改配置文件中的设置

    #RabbitMQ 提供relative或absolute两种配置方式
    #relative 相对值,建议取值在1.0~2.0之间 
    #absolute 绝对值,单位为KB、MB、GB,
    disk_free_limit.relative = 2.0 
    #或者
    disk_free_limit.absolute = 50mb
    

  • 插件机制
    • 通过rabbitmq-plugins命令可以启用或禁用插件
      • 启用:rabbitmq-plugins enable plugin-name
      • 禁用:rabbitmq-plugins disable plugin-name
    • 常用插件
      • rabbitmq_auth_mechanism_ssl:身份验证机制插件,允许RabbitMQ 客户端使用 x509 证书和 TLS(PKI)证书进行身份验证
      • rabbitmq_event_exchange: 事件分发插件,使客户端可以接收到Broker的queue.deletedexchange.createdbinding.created等事件
      • rabbitmq_management:基于Web界面的管理/监控插件
      • rabbitmq_management_agent: 启用rabbitmq_management时,会自动启用此插件,用于在Web管理中查看集群节点
      • rabbitmq_mqtt:MQTT插件,使RabbitMQ 支持MQTT协议
      • rabbitmq_web_mqtt:使RabbitMQ 支持通过WebSocket订阅消息,基于MQTT协议传输

消息可靠性保证

  • 消息可靠性,是业务系统接入消息中间件时首要考虑的问题,RabbitMQ种要保证消息的可靠投递需要保证各链条的可靠 image-20220731161054470
  • 发送可靠性:确保消息成功发送到Broker
    • 通过AMQP事务来保证发送可靠性(不推荐)
    • 消息生产者通过publisher confirm机制,以确保消息可以可靠地传输到 RabbitMQ 中。
      • 生产者发送消息到Broker,broker在接收到消息之后会返回一个回执
      • 如果返回的回执代表为收到,则需要重试。可能会存在消息重复的情况,需要通过业务ID去自行判断 image-20220524151821386

  • 路由保证:还要保证发送到Broker的消息能到指定的队列中,而不是被丢弃
    • 使用回发监听器,如果没有找到路由队列,会触发回发监听器
    • 备份交换机,当一个交换机没找到对应队列时,尝试将消息发给备份交换机的队列

  • 存储可靠性:Broker对消息持久化,确保消息不丢失

  • 消费可靠性:确保消息被成功消费
    • 消费者在消费消息的同时,需要将auto Ack设置为false,然后通过手动确认的方式去确认 己经正确消费的消息,以免在消费端引起不必要的消息丢失。

  • 生产者如果想知道消息最终有没有消费成功,可以通过消费者回调:
    • 生产者提供一个接口,消费者消费成功之后回调以下消费者的API
    • 消费者回复一条消息
    • 补偿机制:定义超时时间,超过时间没收到消费者返回,判断消费失败。重发

高可用方案

  • Cluster 普通模式:
    • 通过元数据进行同步,但是数据不进行同步 image-20220523161912828
    • 元数据包含以下内容:
      • **队列元数据:队列的名称及属性 **
      • **交换器:交换器的名称及属性 **
      • **绑定关系元数据:交换器与队列或者交换器与交换器 **
      • Virtual Host元数据:为 Virtual Host 内的队列、交换器和绑定提供命名空间及安全属性之间的绑定关系
    • 优点:同步元数据数据量较小,效率高
    • 缺点:存在单点故障问题,不能保证消息的万无一失
    • 部署方式:多机多节点、单机多节点。

  • Cluster 镜像模式
    • 镜像模式的集群是在普通模式的基础上,通过policy来实现,使用镜像模式可以实现RabbitMQ的 高可用方案 image-20220523162807604
    • 缺点:存在数据冗余,占用过多内存
    • 镜像队列模式相比较普通模式,镜像模式会占用更多的带宽来进行同步,所以镜像队列的吞吐量会 低于普通模式
    • 但普通模式不能实现高可用,某个节点挂了后,这个节点上的消息将无法被消费,需要等待节点启 动后才能被消费。

心中无我,眼中无钱,念中无他,朝中无人,学无止境

纸上得来终觉浅,绝知此事要躬行

image/svg+xml