消息消费失败如何处理?

前端 2023-07-05 17:29:38
52阅读

文中转载微信公众平台「Java极客技术性」,创作者鸭血粉丝汤。转截文中请联络Java极客技术性微信公众号。 

一、详细介绍

在详细介绍消息中间件 MQ 以前,大家先来简易的了解一下,为什么要引入消息中间件。

比如,在电子商务平台中,普遍的客户提交订单,会历经下列好多个步骤。

当客户提交订单时,建立完订单信息以后,会启用第三方支付服务平台,对客户的账户金额开展扣费,假如服务平台付款扣费取得成功,会将結果通告到相匹配的业务管理系统,然后业务管理系统会升级订单信息,另外启用库房插口,开展减库存量,通告货运物流开展送货!

设想一下,从订单信息升级、到扣除库存量、通告物流发货都是在一个方式 内同歩进行,倘若客户验证成功、订单信息升级也取得成功,可是在扣除库存量或是通告物流发货流程失败了,那麼便会导致一个难题,客户早已验证成功了,仅仅在库房扣除库存量层面不成功,进而造成 全部支付失败!

一单不成功,老总能够 装作看不到,可是假如上百个订单都因而不成功,那麼因系统软件导致的业务流程损害,将是极大的,老总很有可能注意力不集中了!

因而,对于这类业务场景,系统架构师们引进了异步通信技术规范,进而确保服务项目的高可用性,大致步骤以下:

当订单管理系统接到支付系统推送的扣费結果以后,会将订单信息信息发送至 MQ 消息中间件,另外也会升级订单信息。

在另一端,由仓库系统来多线程监视订单管理系统推送的信息,当接到订单信息信息以后,再实际操作扣除库存量、通告货运物流公司送货等服务项目!

在提升后的步骤下,即便扣除库存量服务项目不成功,也不会危害客户买卖。

如同《人月神话》中常说的,软件开发,沒有银弹!

当引进了 MQ 消息中间件以后,一样也会产生另一个难题,倘若 MQ 消息中间件忽然服务器宕机了,造成 信息没法推送出来,那仓库系统就没法接纳到订单信息信息,从而也没法送货!

对于这个问题,业内流行的解决方案是选用群集布署,一主多从方式,进而完成服务项目的高可用性,即便一台设备忽然服务器宕机了,也仍然能确保服务项目能用,在网络服务器常见故障期内,根据运维管理方式,将服务项目重启,以后服务项目仍然能一切正常运作!

可是也有另一个难题,倘若仓库系统早已接到订单信息信息了,可是业务流程解决出现异常,或是网络服务器出现异常,造成 当今产品库存量并沒有扣除,都没有送货!

这个时候又改如何处理呢?

今日大家所要详细介绍的恰好是这类情景,倘若信息消費不成功,大家应当如何处理?

二、解决方法

对于信息消費不成功的情景,大家一般会根据以下方法开展解决:

  • 当信息消費不成功时,会对信息开展再次消息推送
  • 假如再试频次超出最高值,会将出现异常信息储存到数据库查询,随后人力干预清查难题,开展手工制作再试

当信息在手机客户端消費不成功时,大家会将出现异常的信息添加到一个信息再试目标中,另外设定较大 再试频次,并将信息再次消息推送到 MQ 消息中间件里,当再试频次超出最高值时,会将出现异常的信息储存到 MongoDB数据库查询中,便捷事后查看出现异常的信息内容。

根据之上系统软件实体模型,我们可以撰写一个公共性再试部件,话不多说,立即干!

三、编码实践活动

此次赔偿服务项目选用 rabbitmq 消息中间件开展解决,别的消息中间件解决构思也相近!

3.1、建立一个信息再试dao层

 
  1. @Data 
  2. @EqualsAndHashCode(callSuper = false
  3. @Accessors(chain = true
  4. public class MessageRetryDTO implements Serializable { 
  5.  
  6.     private static final long serialVersionUID = 1L; 
  7.  
  8.     /** 
  9.      * 初始信息body 
  10.      */ 
  11.     private String bodyMsg; 
  12.  
  13.     /** 
  14.      * 信息源ID 
  15.      */ 
  16.     private String sourceId; 
  17.  
  18.     /** 
  19.      * 信息源叙述 
  20.      */ 
  21.     private String sourceDesc; 
  22.  
  23.     /** 
  24.      * 交换机 
  25.      */ 
  26.     private String exchangeName; 
  27.  
  28.     /** 
  29.      * 路由器键 
  30.      */ 
  31.     private String routingKey; 
  32.  
  33.     /** 
  34.      * 序列 
  35.      */ 
  36.     private String queueName; 
  37.  
  38.     /** 
  39.      * 情况,1:复位,2:取得成功,3:不成功 
  40.      */ 
  41.     private Integer status = 1; 
  42.  
  43.     /** 
  44.      * 较大 再试频次 
  45.      */ 
  46.     private Integer maxTryCount = 3; 
  47.  
  48.     /** 
  49.      * 当今再试频次 
  50.      */ 
  51.     private Integer currentRetryCount = 0; 
  52.  
  53.     /** 
  54.      * 再试间隔时间(ms) 
  55.      */ 
  56.     private Long retryIntervalTime = 0L; 
  57.  
  58.     /** 
  59.      * 每日任务不成功信息内容 
  60.      */ 
  61.     private String errorMsg; 
  62.  
  63.     /** 
  64.      * 建立時间 
  65.      */ 
  66.     private Date createTime; 
  67.  
  68.     @Override 
  69.     public String toString() { 
  70.         return "MessageRetryDTO{"   
  71.                 "bodyMsg='"   bodyMsg   '\''   
  72.                 ", sourceId='"   sourceId   '\''   
  73.                 ", sourceDesc='"   sourceDesc   '\''   
  74.                 ", exchangeName='"   exchangeName   '\''   
  75.                 ", routingKey='"   routingKey   '\''   
  76.                 ", queueName='"   queueName   '\''   
  77.                 ", status="   status   
  78.                 ", maxTryCount="   maxTryCount   
  79.                 ", currentRetryCount="   currentRetryCount   
  80.                 ", retryIntervalTime="   retryIntervalTime   
  81.                 ", errorMsg='"   errorMsg   '\''   
  82.                 ", createTime="   createTime   
  83.                 '}'
  84.     } 
  85.  
  86.     /** 
  87.      * 查验再试频次是不是超出最高值 
  88.      * 
  89.      * @return 
  90.      */ 
  91.     public boolean checkRetryCount() { 
  92.         retryCountCalculate(); 
  93.         //查验再试频次是不是超出最高值 
  94.         if (this.currentRetryCount < this.maxTryCount) { 
  95.             return true
  96.         } 
  97.         return false
  98.     } 
  99.  
  100.     /** 
  101.      * 再次测算再试频次 
  102.      */ 
  103.     private void retryCountCalculate() { 
  104.         this.currentRetryCount = this.currentRetryCount   1; 
  105.     } 
  106.  

3.2、撰写服务项目再试抽象类

 
  1. public abstract class CommonMessageRetryService { 
  2.  
  3.     private static final Logger log = LoggerFactory.getLogger(CommonMessageRetryService.class); 
  4.  
  5.     @Autowired 
  6.     private RabbitTemplate rabbitTemplate; 
  7.  
  8.     @Autowired 
  9.     private MongoTemplate mongoTemplate; 
  10.  
  11.  
  12.     /** 
  13.      * 复位信息 
  14.      * 
  15.      * @param message 
  16.      */ 
  17.     public void initMessage(Message message) { 
  18.         log.info("{} 接到信息: {},业务流程数据信息:{}", this.getClass().getName(), message.toString(), new String(message.getBody())); 
  19.         try { 
  20.             //封裝信息 
  21.             MessageRetryDTO messageRetryDto = buildMessageRetryInfo(message); 
  22.             if (log.isInfoEnabled()) { 
  23.                 log.info("反序列化信息:{}", messageRetryDto.toString()); 
  24.             } 
  25.             prepareAction(messageRetryDto); 
  26.         } catch (Exception e) { 
  27.             log.warn("解决信息出现异常,错误报告:", e); 
  28.         } 
  29.     } 
  30.  
  31.     /** 
  32.      * 提前准备实行 
  33.      * 
  34.      * @param retryDto 
  35.      */ 
  36.     protected void prepareAction(MessageRetryDTO retryDto) { 
  37.         try { 
  38.             execute(retryDto); 
  39.             doSuccessCallBack(retryDto); 
  40.         } catch (Exception e) { 
  41.             log.error("当前任务实行出现异常,业务流程数据信息:"   retryDto.toString(), e); 
  42.             //实行不成功,测算是不是还必须再次再试 
  43.             if (retryDto.checkRetryCount()) { 
  44.                 if (log.isInfoEnabled()) { 
  45.                     log.info("再试信息:{}", retryDto.toString()); 
  46.                 } 
  47.                 retrySend(retryDto); 
  48.             } else { 
  49.                 if (log.isWarnEnabled()) { 
  50.                     log.warn("当前任务再试频次早已抵达较大 频次,业务流程数据信息:"   retryDto.toString(), e); 
  51.                 } 
  52.                 doFailCallBack(retryDto.setErrorMsg(e.getMessage())); 
  53.             } 
  54.         } 
  55.     } 
  56.  
  57.     /** 
  58.      * 每日任务实行取得成功,回调函数服务项目(依据必须开展重写) 
  59.      * 
  60.      * @param messageRetryDto 
  61.      */ 
  62.     private void doSuccessCallBack(MessageRetryDTO messageRetryDto) { 
  63.         try { 
  64.             successCallback(messageRetryDto); 
  65.         } catch (Exception e) { 
  66.             log.warn("实行取得成功回调函数出现异常,序列叙述:{},不正确缘故:{}", messageRetryDto.getSourceDesc(), e.getMessage()); 
  67.         } 
  68.     } 
  69.  
  70.     /** 
  71.      * 每日任务实行不成功,回调函数服务项目(依据必须开展重写) 
  72.      * 
  73.      * @param messageRetryDto 
  74.      */ 
  75.     private void doFailCallBack(MessageRetryDTO messageRetryDto) { 
  76.         try { 
  77.             saveMessageRetryInfo(messageRetryDto.setErrorMsg(messageRetryDto.getErrorMsg())); 
  78.             failCallback(messageRetryDto); 
  79.         } catch (Exception e) { 
  80.             log.warn("实行不成功回调函数出现异常,序列叙述:{},不正确缘故:{}", messageRetryDto.getSourceDesc(), e.getMessage()); 
  81.         } 
  82.     } 
  83.  
  84.     /** 
  85.      * 执行任务 
  86.      * 
  87.      * @param messageRetryDto 
  88.      */ 
  89.     protected abstract void execute(MessageRetryDTO messageRetryDto); 
  90.  
  91.     /** 
  92.      * 取得成功回调函数 
  93.      * 
  94.      * @param messageRetryDto 
  95.      */ 
  96.     protected abstract void successCallback(MessageRetryDTO messageRetryDto); 
  97.  
  98.     /** 
  99.      * 不成功回调函数 
  100.      * 
  101.      * @param messageRetryDto 
  102.      */ 
  103.     protected abstract void failCallback(MessageRetryDTO messageRetryDto); 
  104.  
  105.     /** 
  106.      * 搭建信息赔偿实体线 
  107.      * @param message 
  108.      * @return 
  109.      */ 
  110.     private MessageRetryDTO buildMessageRetryInfo(Message message){ 
  111.         //假如头顶部包括赔偿信息实体线,立即回到 
  112.         Map<String, Object> messageHeaders = message.getMessageProperties().getHeaders(); 
  113.         if(messageHeaders.containsKey("message_retry_info")){ 
  114.             Object retryMsg = messageHeaders.get("message_retry_info"); 
  115.             if(Objects.nonNull(retryMsg)){ 
  116.                 return JSONObject.parseObject(String.valueOf(retryMsg), MessageRetryDTO.class); 
  117.             } 
  118.         } 
  119.         //全自动将业务流程信息添加赔偿实体线 
  120.         MessageRetryDTO messageRetryDto = new MessageRetryDTO(); 
  121.         messageRetryDto.setBodyMsg(new String(message.getBody(), StandardCharsets.UTF_8)); 
  122.         messageRetryDto.setExchangeName(message.getMessageProperties().getReceivedExchange()); 
  123.         messageRetryDto.setRoutingKey(message.getMessageProperties().getReceivedRoutingKey()); 
  124.         messageRetryDto.setQueueName(message.getMessageProperties().getConsumerQueue()); 
  125.         messageRetryDto.setCreateTime(new Date()); 
  126.         return messageRetryDto; 
  127.     } 
  128.  
  129.     /** 
  130.      * 出现异常信息再次进库 
  131.      * @param retryDto 
  132.      */ 
  133.     private void retrySend(MessageRetryDTO retryDto){ 
  134.         //将赔偿信息实体线放进头顶部,初始信息內容维持不会改变 
  135.         MessageProperties messageProperties = new MessageProperties(); 
  136.         messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON); 
  137.         messageProperties.setHeader("message_retry_info", JSONObject.toJSON(retryDto)); 
  138.         Message message = new Message(retryDto.getBodyMsg().getBytes(), messageProperties); 
  139.         rabbitTemplate.convertAndSend(retryDto.getExchangeName(), retryDto.getRoutingKey(), message); 
  140.     } 
  141.  
  142.  
  143.  
  144.     /** 
  145.      * 将出现异常信息储存到mongodb中 
  146.      * @param retryDto 
  147.      */ 
  148.     private void saveMessageRetryInfo(MessageRetryDTO retryDto){ 
  149.         try { 
  150.             mongoTemplate.save(retryDto, "message_retry_info"); 
  151.         } catch (Exception e){ 
  152.             log.error("将出现异常信息储存到mongodb不成功,信息数据信息:"   retryDto.toString(), e); 
  153.         } 
  154.     } 

3.3、撰写监视服务项目类

在消費端运用的情况下,也比较简单,比如,对于扣除库存量实际操作,我们可以根据以下方法开展解决!

 
  1. @Component 
  2. public class OrderServiceListener extends CommonMessageRetryService { 
  3.  
  4.     private static final Logger log = LoggerFactory.getLogger(OrderServiceListener.class); 
  5.  
  6.     /** 
  7.      * 监视订单管理系统提交订单取得成功信息 
  8.      * @param message 
  9.      */ 
  10.     @RabbitListener(queues = "mq.order.add"
  11.     public void consume(Message message) { 
  12.         log.info("接到订单信息提交订单取得成功信息: {}", message.toString()); 
  13.         super.initMessage(message); 
  14.     } 
  15.  
  16.  
  17.     @Override 
  18.     protected void execute(MessageRetryDTO messageRetryDto) { 
  19.         //启用扣除库存量服务项目,将业务流程出现异常抛出来 
  20.     } 
  21.  
  22.     @Override 
  23.     protected void successCallback(MessageRetryDTO messageRetryDto) { 
  24.         //业务流程解决取得成功,回调函数 
  25.     } 
  26.  
  27.     @Override 
  28.     protected void failCallback(MessageRetryDTO messageRetryDto) { 
  29.         //业务流程解决不成功,回调函数 
  30.     } 

当信息消費不成功,并超出较大 频次时,会将信息储存到 mongodb 中,随后像基本数据库操作一样,能够 根据 web 插口查看出现异常信息,并对于实际情景开展再试!

四、总结

很有可能有的老同学聚会问,为什么不将出现异常信息存有数据库查询?

最初确实是储存在 MYSQL 中,可是伴随着业务流程的迅速发展趋势,订单信息信息算法设计愈来愈繁杂,信息量也十分的大,乃至大到 MYSQL 中的 text 种类都没法储存,另外这类算法设计都不太合适在 MYSQL 中储存,因而将其转移到 mongodb!

文中关键紧紧围绕信息消費不成功这类情景,开展基本的计划方案和编码实践活动解读,很有可能有了解不及时的地区,欢迎批评强调!

五、参照

1、石杉的构架手记 - 如何处理信息消費不成功难题

the end
免责声明:本文不代表本站的观点和立场,如有侵权请联系本站删除!本站仅提供信息存储空间服务。