故梦丶
2025-12-20
点 赞
0
热 度
2
评 论
0

springboot集成RabbitMQ

  1. 首页
  2. 技术
  3. springboot集成RabbitMQ

文章摘要

智阅GPT

一、RabbitMQ介绍

RabbitMQ是一个开源的消息代理软件,最初由LShift开发,后来由VMware收购。它实现了高级消息队列协议(AMQP)的标准,提供可靠的消息传递、消息队列、发布/订阅和其他消息模式的功能。

二、RabbitMQ的优缺点

优点:

  1. 可靠性:RabbitMQ 提供了多种机制来确保消息传递的可靠性,包括消息持久化、确认机制等,使得消息的传递更加可靠。

  2. 灵活的路由:通过交换机和路由键,RabbitMQ提供了灵活的消息路由机制,可以根据消息的特定属性将消息路由到不同的队列,满足各种复杂的消息路由需求。

  3. 高可用性:RabbitMQ支持集群模式,可以部署多个节点来实现高可用性和水平扩展。

  4. 丰富的功能:RabbitMQ提供了丰富的功能,包括发布/订阅模式、点对点模式、RPC模式等,能够满足不同场景下的消息传递需求。

  5. 跨语言支持:RabbitMQ提供了多种语言的客户端库,支持多种编程语言,使得开发者可以使用自己熟悉的语言进行开发。

  6. 可扩展性:RabbitMQ采用插件式架构,提供了丰富的插件系统,使得可以扩展和定制其功能,满足不同场景下的需求。

  7. 监控和管理:RabbitMQ提供了一个易于使用的 Web 管理界面,可以通过浏览器轻松地监控和管理消息代理的各种配置和指标。

缺点:

  1. 复杂性:RabbitMQ相对来说比较复杂,配置和管理需要一定的学习成本,尤其是在处理复杂的消息路由和交换机配置时。

  2. 性能开销:与一些轻量级的消息代理相比,RabbitMQ的性能开销可能会更高一些,尤其是在高吞吐量的情况下。

  3. 消息堆积:如果不正确地配置或者消费者处理不及时,可能会导致消息堆积的问题,影响系统的性能和可用性。

  4. 部署复杂性:建立一个RabbitMQ集群需要一定的技术知识和经验,需要考虑到节点之间的通信、数据同步等问题,部署和维护成本较高。

三、RabbitMQ五种模式

1、简单模式

fig:

  1. 消息产生着§将消息放入队列

  2. 消息的消费者(consumer) 监听(while) 消息队列,如果队列中有消息,就消费掉,消息被拿走后,自动从队列中删除(隐患 消息可能没有被消费者正确处理,已经从队列中消失了,造成消息的丢失)应用场景:聊天(中间有一个过度的服务器;p端,c端)

2、工作模式

fig:

  1. 消息产生者将消息放入队列消费者可以有多个,消费者1,消费者2,同时监听同一个队列,消息被消费?C1 C2共同争抢当前的消息队列内容,谁先拿到谁负责消费消息(隐患,高并发情况下,默认会产生某一个消息被多个消费者共同使用,可以设置一个开关(syncronize,与同步锁的性能不一样) 保证一条消息只能被一个消费者使用)

  2. 应用场景:红包;大项目中的资源调度(任务分配系统不需知道哪一个任务执行系统在空闲,直接将任务扔到消息队列中,空闲的系统自动争抢)

3、发布订阅模式

fig:

  1. X代表交换机rabbitMQ内部组件,erlang 消息产生者是代码完成,代码的执行效率不高,消息产生者将消息放入交换机,交换机发布订阅把消息发送到所有消息队列中,对应消息队列的消费者拿到消息进行消费

  2. 相关场景:邮件群发,群聊天,广播(广告)

4、路由模式

fig:

  1. 消息生产者将消息发送给交换机按照路由判断,路由是字符串(info) 当前产生的消息携带路由字符(对象的方法),交换机根据路由的key,只能匹配上路由key对应的消息队列,对应的消费者才能消费消息;

  2. 根据业务功能定义路由字符串

  3. 从系统的代码逻辑中获取对应的功能字符串,将消息任务扔到对应的队列中业务场景:error 通知;EXCEPTION;错误通知的功能;传统意义的错误通知;客户通知;利用key路由,可以将程序中的错误封装成消息传入到消息队列中,开发者可以自定义消费者,实时接收错误;

5、Topic模式

fig:

  1. 星号井号代表通配符

  2. 星号匹配不多不少恰好1个词),#匹配一个或多个词

  3. 路由功能添加模糊匹配

  4. 消息产生者产生消息,把消息交给交换机

  5. 交换机根据key的规则模糊匹配到对应的队列,由队列的监听消费者接收消息消费

四、SpringBoot配置RabbitMQ

  1. 引入Maven包

 <dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-amqp</artifactId>
 </dependency>
  1. 配置Yml文件

spring:  
  rabbitmq:
    host: 192.168.0.245  # ip号
    port: 5672 # 端口 一般是5672
    virtual-host: /ra # 虚拟机路径
    username: swt # 账号
    password: swt # 密码
  1. 添加rabbitMQ配置(这里用路由模式举例)

/**
 * 消息队列相关配置
 */
@Configuration
public class RabbitMqConfig {

    //报文提取消费

    /**
     * 情报消息实际消费队列所绑定的交换机
     */
    @Bean
    DirectExchange qbDirect() {
        return (DirectExchange) ExchangeBuilder
                .directExchange(QueueEnum.QUEUE_QB_CANCEL.getExchange())
                .durable(true)
                .build();
    }

    /**
     * 情报实际消费队列
     */
    @Bean
    public Queue qbQueue() {
        return new Queue(QueueEnum.QUEUE_QB_CANCEL.getName());
    }



    /**
     * 将情报队列绑定到交换机
     */
    @Bean
    Binding qbBinding(DirectExchange qbDirect,Queue qbQueue){
        return BindingBuilder
                .bind(qbQueue)
                .to(qbDirect)
                .with(QueueEnum.QUEUE_QB_CANCEL.getRouteKey());
    }
}
  1. 发送与接收

### 发送RabbitMQ
@Component
public class ExtractQBSender {
    private static Logger LOGGER =LoggerFactory.getLogger(ExtractQBSender.class);
    @Autowired
    private AmqpTemplate amqpTemplate;

    public void sendMessage(List<Integer> ids){
        //给队列发送消息
        amqpTemplate.convertAndSend(QueueEnum.QUEUE_QB_CANCEL.getExchange(), QueueEnum.QUEUE_QB_CANCEL.getRouteKey(), StrUtil.join(",",ids));
        LOGGER.info("send qbId:{}",ids);
    }
}


### 接收rabbitMQ
@Component
@AllArgsConstructor
@RabbitListener(queues = QueueEnum.QUEUE_QB_CANCEL.getName())
public class ExtractQBReceiver {
    private static Logger LOGGER =LoggerFactory.getLogger(ExtractQBReceiver.class);

    @RabbitHandler
    public void handle(String str){
        LOGGER.info("接收数据为:{}",str);

    }
}

五、如何实现重试机制

只需要设置yml文件就行

spring:  
  rabbitmq:
    host: 192.168.0.245
    port: 5672
    virtual-host: /ra
    username: swt
    password: swt
    listener:
      simple:
        # 重试机制
        retry:
          enabled: true #是否开启消费者重试
          max-attempts: 3 #最大重试次数
          initial-interval: 5000ms #重试间隔时间(单位毫秒)
          max-interval: 1200000ms #重试最大时间间隔(单位毫秒)
          multiplier: 2 #间隔时间乘子,间隔时间*乘子=下一次的间隔时间,最大不能超过设置的最大间隔时间

六、死信队列

1. 什么是死信队列

死信队列(Dead Letter Queue,DLQ)是消息队列中的一种特殊队列,用于存储无法被消费者正确处理的消息。当消息被标记为“死信”时,它们将被发送到死信队列中。

以下是一些导致消息成为死信的常见情况:

  1. 消息过期:消息在队列中停留的时间超过了预设的过期时间。

  2. 消息被拒绝:消费者拒绝接收消息,或者消费者无法处理消息,可能是因为消息格式不正确或者消息内容无法处理。

  3. 消息达到最大重试次数:消息在一定次数的重试后仍然无法被成功处理。

  4. 队列达到最大长度:队列中的消息数量达到了预先设置的最大限制,导致新消息无法入队。

死信队列的存在有助于系统的健壮性和错误处理机制,使得开发者能够更好地监控和处理消息处理过程中出现的异常情况。一旦消息被发送到死信队列,系统可以进一步对这些消息进行审查、日志记录或者重试等操作,以便进一步分析问题的根源并采取适当的措施。

在 RabbitMQ 中,死信队列通常是通过设置队列的参数和交换机的绑定来实现的。开发者可以根据自己的需求和业务逻辑来配置死信队列,以便更好地处理异常情况和消息处理失败的情况。

2. 如何实现死信队列
/**
 * 消息队列相关配置
 */
@Configuration
public class RabbitMqConfig {

    //报文提取消费

    /**
     * 情报消息实际消费队列所绑定的交换机
     */
    @Bean
    DirectExchange qbDirect() {
        return (DirectExchange) ExchangeBuilder
                .directExchange(QueueEnum.QUEUE_QB_CANCEL.getExchange())
                .durable(true)
                .build();
    }

    /**
     * 情报实际消费队列
     */
    @Bean
    public Queue qbQueue() {
        // 绑定死信队列
         return QueueBuilder.durable(QueueEnum.QUEUE_QB_CANCEL.getName())
                .deadLetterExchange(QueueEnum.QUEUE_DEAL_CANCEL.getExchange())
                .deadLetterRoutingKey(QueueEnum.QUEUE_DEAL_CANCEL.getRouteKey())
                .build();
    }



    /**
     * 将情报队列绑定到交换机
     */
    @Bean
    Binding qbBinding(DirectExchange qbDirect,Queue qbQueue){
        return BindingBuilder
                .bind(qbQueue)
                .to(qbDirect)
                .with(QueueEnum.QUEUE_QB_CANCEL.getRouteKey());
    }
    
    /**
     * 设置通用死信队列
     */
    @Bean
    DirectExchange dealDirect() {
        return (DirectExchange) ExchangeBuilder
                .directExchange(QueueEnum.QUEUE_DEAL_CANCEL.getExchange())
                .durable(true)
                .build();
    }

    @Bean
    public Queue dealQueue() {
        return new Queue(QueueEnum.QUEUE_DEAL_CANCEL.getName());
    }


    @Bean
    Binding dealBinding(DirectExchange dealDirect,Queue dealQueue){
        return BindingBuilder
                .bind(dealQueue)
                .to(dealDirect)
                .with(QueueEnum.QUEUE_DEAL_CANCEL.getRouteKey());
    }
}

切记,如果在绑定死信队列之前,服务中已存在QUEUE_QB_CANCEL队列。需要先删除QUEUE_QB_CANCEL队列再进行启动否则会报错。

七、通过TTL加死信队列实现延迟队列

在许多业务情况下是需要实现延迟队列的,比如一个订单过来后并不想直接处理掉,需要过段时间再处理这时就需要使用到延迟队列。但是在RabbitMQ中没有具体的方法来实现延迟队列。于是我们可以通过TTL加死信队列来实现。

大致原理是通过TTL设置超时队列,等队列超时之后会进入死信队列,最后通过死信队列来处理该数据。则可以实现延迟队列,其中超时时间就是延迟时间。

RabbitMQ有两种方式来设置超时

  1. 通过队列属性设置:队列中所有消息都有相同的过期时间

  2. 对消息进行单独设置:每条消息TTL可以不同

需要注意的是,在实际应用中,如果同时在队列上设置了超时时间并且在消息发送时也设置了超时时间,那么两者的行为可能会相互影响。例如,如果消息发送时设置的超时时间短于队列的过期时间,则消息可能会在队列中超时而被删除,而不会等到消息的超时时间到达。因此,在设置超时时间时需要确保两者之间的协调一致,以避免意外情况的发生。

### 配置延迟队列
/**
 * 消息队列相关配置
 */
@Configuration
public class RabbitMqConfig {

    //报文提取消费

    /**
     * 情报消息实际消费队列所绑定的交换机
     */
    @Bean
    DirectExchange qbDirect() {
        return (DirectExchange) ExchangeBuilder
                .directExchange(QueueEnum.QUEUE_QB_CANCEL.getExchange())
                .durable(true)
                .build();
    }

    /**
     * 情报实际消费队列
     */
    @Bean
    public Queue qbQueue() {
        // 绑定死信队列
         return QueueBuilder.durable(QueueEnum.QUEUE_QB_CANCEL.getName())
                .ttl(5000)  //通过队列设置,时间为5秒
                .deadLetterExchange(QueueEnum.QUEUE_DEAL_CANCEL.getExchange())
                .deadLetterRoutingKey(QueueEnum.QUEUE_DEAL_CANCEL.getRouteKey())
                .build();
    }



    /**
     * 将情报队列绑定到交换机
     */
    @Bean
    Binding qbBinding(DirectExchange qbDirect,Queue qbQueue){
        return BindingBuilder
                .bind(qbQueue)
                .to(qbDirect)
                .with(QueueEnum.QUEUE_QB_CANCEL.getRouteKey());
    }
    
    /**
     * 设置通用死信队列
     */
    @Bean
    DirectExchange dealDirect() {
        return (DirectExchange) ExchangeBuilder
                .directExchange(QueueEnum.QUEUE_DEAL_CANCEL.getExchange())
                .durable(true)
                .build();
    }

    @Bean
    public Queue dealQueue() {
        return new Queue(QueueEnum.QUEUE_DEAL_CANCEL.getName());
    }


    @Bean
    Binding dealBinding(DirectExchange dealDirect,Queue dealQueue){
        return BindingBuilder
                .bind(dealQueue)
                .to(dealDirect)
                .with(QueueEnum.QUEUE_DEAL_CANCEL.getRouteKey());
    }
}


### 消息发送模块
@Component
public class ExtractQBSender {
    private static Logger LOGGER =LoggerFactory.getLogger(ExtractQBSender.class);
    @Autowired
    private AmqpTemplate amqpTemplate;

    public void sendMessage(List<Integer> ids,final long delayTimes){
        //给队列发送消息
        amqpTemplate.convertAndSend(QueueEnum.QUEUE_QB_CANCEL.getExchange(), QueueEnum.QUEUE_QB_CANCEL.getRouteKey(), StrUtil.join(",",ids), new MessagePostProcessor(){
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                //给消息设置延迟毫秒值
                message.getMessageProperties().setExpiration(String.valueOf(delayTimes));
                return message;
            }
        });
        LOGGER.info("send qbId:{}",ids);
    }
}

八、消息确认机制(包含手动ACK)

消息确认主要分为两种

  1. 发送确认,发送确认包含两种情况,一种是消息是否到达交换机,一种是消息是否到达队列

  2. 接收确认

一、发送方消息确认
1、ConfirmCallback 接口

配置文件需要开启配置

publisher-confirm-type: correlated

消息发送到交换机回调,当消息发送到交换机时,会触发此接口中的 confirm 回调函数confirm 中有三个参数

  1. CorrelationData:包含消息发送时的 id,可以根据 id 快速定位

  2. ack:消息发送成功与否,成功为 true,失败为 false

  3. cause:消息发送失败的原因

2、ReturnCallback 接口

配置文件需要开启配置

publisher-returns: true

消息发送到队列回调,接口中包含 returnedMessage 函数,如果消息发送成功,则不会调用此回调函数,如果消息发送失败,则会调用此回调函数
returnedMessage 中有5个参数

  1. message:发送的消息内容

  2. replyCode:回应码

  3. replyText:回应信息

  4. exchange:交换机

  5. routingKey:路由键

3、配置回调函数
  1. application配置

spring:  
  rabbitmq:
    host: 192.168.0.245
    port: 5672
    virtual-host: /ra
    username: swt
    password: swt
    # 开启生产者发布确认,确认消息已发送到交换机 Exchange
    publisher-confirm-type: correlated
    # 开启发布者返回,确认消息已发送到队列 Queue
    publisher-returns: true
    listener:
      simple:
       	# acknowledge-mode: none 自动模式(默认开启)
        # acknowledge-mode: manual  手动模式
        # acknowledge-mode: auto 自动模式 (根据侦听器检测是正常返回、还是抛出异常来发出 ack/nack)
        # 开启消息手动确认(即需要调用channel.basicAck才会从队列中删除消息)
        acknowledge-mode: manual
        #表示消费者端每次从队列拉取多少个消息进行消费,直到手动确认消费完毕后,才会继续拉取下一条
        prefetch: 1
        #消费被拒绝时 true:重回队列 false为否
        default-requeue-rejected: false
        # 重试机制
        retry:
          enabled: true #是否开启消费者重试
          max-attempts: 3 #最大重试次数
          initial-interval: 5000ms #重试间隔时间(单位毫秒)
          max-interval: 1200000ms #重试最大时间间隔(单位毫秒)
          multiplier: 2 #间隔时间乘子,间隔时间*乘子=下一次的间隔时间,最大不能超过设置的最大间隔时间
  1. 配置发送回调函数

@Slf4j
@Configuration
public class RabbitTemplateConfig {

    @Bean
    public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate();
        rabbitTemplate.setConnectionFactory(connectionFactory);
        // 设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
        rabbitTemplate.setMandatory(true);

        // 确认消息送到交换机(Exchange)回调
        // 如果消息没有到 exchange,则 confirm 回调,ack=false; 如果消息到达exchange,则confirm回调,ack=true
        rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                if(ack){
                    log.info("消息发送交换机成功:correlationData:{},ack:{},cause:{}",correlationData,ack,cause);
                }else{
                    log.info("消息发送交换机失败:correlationData:{},ack:{},cause:{}",correlationData,ack,cause);
                }
            }
        });

        // 确认消息送到队列(Queue)回调
        // 如果exchange到queue成功,则不回调return;如果exchange到queue失败,则回调return(需设置mandatory=true,否则不回回调,消息就丢了)
        rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
                log.info("确认消息送到队列结果:");
                log.info("发送消息:{}", message);
                log.info("回应码:{}", replyCode);
                log.info("回应信息:{}", replyText);
                log.info("交换机:{}", exchange);
                log.info("路由键:{}", routingKey);
            }
        });
        return rabbitTemplate;
    }
}
二、消费者消息确认
1、设置手动确认逻辑
@Component
@AllArgsConstructor
@RabbitListener(queues = QueueEnum.QUEUE_QB_CANCEL.getName())
public class ExtractQBReceiver {
    private static Logger LOGGER =LoggerFactory.getLogger(ExtractQBReceiver.class);

    @RabbitHandler
    public void handle(String str,Message message,Channel channel){
        LOGGER.info("接收数据为:{}",str);
		long deliveryTag = message.getMessageProperties().getDeliveryTag();
        // 手动进行消息确认
        channel.basicAck(deliveryTag,false);
    }
}
2、消费者确认模式和相关参数

消费者消息确认,我们需要在配置文件中开启消息的手动确认,因为默认是自动确认的

acknowledge-mode: manual

确认模式有三种

  1. none:默认,不进行确认

  2. auto:自动确认

  3. manual:手动确认

消息确认函数中会用到的几个参数

  1. deliveryTag :唯一标识 ID,当一个消费者向 RabbitMQ 注册后,会建立起一个 Channel ,RabbitMQ 会用 basic.deliver 方法向消费者推送消息,这个方法携带了一个 delivery tag, 它代表了 RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID,是一个单调递增的正整数,delivery tag 的范围仅限于 Channel

  2. multiple:是否批处理,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息

  3. requeue:被拒绝的是否重新入队列

3、手动操作通用方法
  1. 消费确认:basicAck

  2. 拒绝消费:basicNack (支持批量拒绝,参数与消费确认参数一致)

  3. 拒绝消费:basicReject (不支持批量拒绝,参数比消费确认少一个multiple)


成功是一时的,但失败和平凡是我们生命的主旋律,我们在平凡的路上,一直和你们相伴!

故梦丶

intp 逻辑家

站长

具有版权性

请您在转载、复制时注明本文 作者、链接及内容来源信息。 若涉及转载第三方内容,还需一同注明。

具有时效性

目录

欢迎来到故梦丶的站点,为您导航全站动态

33 文章数
3 分类数
3 评论数
8标签数
最近评论
测试人

测试人


不错

故梦丶

故梦丶


1111

热门文章