文中转载微信公众平台「Java极客技术性」,创作者鸭血粉丝汤。转截文中请联络Java极客技术性微信公众号。
一、详细介绍
在详细介绍消息中间件 MQ 以前,大家先来简易的了解一下,为什么要引入消息中间件。
比如,在电子商务平台中,普遍的客户提交订单,会历经下列好多个步骤。
当客户提交订单时,建立完订单信息以后,会启用第三方支付服务平台,对客户的账户金额开展扣费,假如服务平台付款扣费取得成功,会将結果通告到相匹配的业务管理系统,然后业务管理系统会升级订单信息,另外启用库房插口,开展减库存量,通告货运物流开展送货!
设想一下,从订单信息升级、到扣除库存量、通告物流发货都是在一个方式 内同歩进行,倘若客户验证成功、订单信息升级也取得成功,可是在扣除库存量或是通告物流发货流程失败了,那麼便会导致一个难题,客户早已验证成功了,仅仅在库房扣除库存量层面不成功,进而造成 全部支付失败!
一单不成功,老总能够 装作看不到,可是假如上百个订单都因而不成功,那麼因系统软件导致的业务流程损害,将是极大的,老总很有可能注意力不集中了!
因而,对于这类业务场景,系统架构师们引进了异步通信技术规范,进而确保服务项目的高可用性,大致步骤以下:
当订单管理系统接到支付系统推送的扣费結果以后,会将订单信息信息发送至 MQ 消息中间件,另外也会升级订单信息。
在另一端,由仓库系统来多线程监视订单管理系统推送的信息,当接到订单信息信息以后,再实际操作扣除库存量、通告货运物流公司送货等服务项目!
在提升后的步骤下,即便扣除库存量服务项目不成功,也不会危害客户买卖。
如同《人月神话》中常说的,软件开发,沒有银弹!
当引进了 MQ 消息中间件以后,一样也会产生另一个难题,倘若 MQ 消息中间件忽然服务器宕机了,造成 信息没法推送出来,那仓库系统就没法接纳到订单信息信息,从而也没法送货!
对于这个问题,业内流行的解决方案是选用群集布署,一主多从方式,进而完成服务项目的高可用性,即便一台设备忽然服务器宕机了,也仍然能确保服务项目能用,在网络服务器常见故障期内,根据运维管理方式,将服务项目重启,以后服务项目仍然能一切正常运作!
可是也有另一个难题,倘若仓库系统早已接到订单信息信息了,可是业务流程解决出现异常,或是网络服务器出现异常,造成 当今产品库存量并沒有扣除,都没有送货!
这个时候又改如何处理呢?
今日大家所要详细介绍的恰好是这类情景,倘若信息消費不成功,大家应当如何处理?
二、解决方法
对于信息消費不成功的情景,大家一般会根据以下方法开展解决:
当信息在手机客户端消費不成功时,大家会将出现异常的信息添加到一个信息再试目标中,另外设定较大 再试频次,并将信息再次消息推送到 MQ 消息中间件里,当再试频次超出最高值时,会将出现异常的信息储存到 MongoDB数据库查询中,便捷事后查看出现异常的信息内容。
根据之上系统软件实体模型,我们可以撰写一个公共性再试部件,话不多说,立即干!
三、编码实践活动
此次赔偿服务项目选用 rabbitmq 消息中间件开展解决,别的消息中间件解决构思也相近!
3.1、建立一个信息再试dao层
- @Data
- @EqualsAndHashCode(callSuper = false)
- @Accessors(chain = true)
- public class MessageRetryDTO implements Serializable {
- private static final long serialVersionUID = 1L;
- /**
- * 初始信息body
- */
- private String bodyMsg;
- /**
- * 信息源ID
- */
- private String sourceId;
- /**
- * 信息源叙述
- */
- private String sourceDesc;
- /**
- * 交换机
- */
- private String exchangeName;
- /**
- * 路由器键
- */
- private String routingKey;
- /**
- * 序列
- */
- private String queueName;
- /**
- * 情况,1:复位,2:取得成功,3:不成功
- */
- private Integer status = 1;
- /**
- * 较大 再试频次
- */
- private Integer maxTryCount = 3;
- /**
- * 当今再试频次
- */
- private Integer currentRetryCount = 0;
- /**
- * 再试间隔时间(ms)
- */
- private Long retryIntervalTime = 0L;
- /**
- * 每日任务不成功信息内容
- */
- private String errorMsg;
- /**
- * 建立時间
- */
- private Date createTime;
- @Override
- public String toString() {
- return "MessageRetryDTO{"
- "bodyMsg='" bodyMsg '\''
- ", sourceId='" sourceId '\''
- ", sourceDesc='" sourceDesc '\''
- ", exchangeName='" exchangeName '\''
- ", routingKey='" routingKey '\''
- ", queueName='" queueName '\''
- ", status=" status
- ", maxTryCount=" maxTryCount
- ", currentRetryCount=" currentRetryCount
- ", retryIntervalTime=" retryIntervalTime
- ", errorMsg='" errorMsg '\''
- ", createTime=" createTime
- '}';
- }
- /**
- * 查验再试频次是不是超出最高值
- *
- * @return
- */
- public boolean checkRetryCount() {
- retryCountCalculate();
- //查验再试频次是不是超出最高值
- if (this.currentRetryCount < this.maxTryCount) {
- return true;
- }
- return false;
- }
- /**
- * 再次测算再试频次
- */
- private void retryCountCalculate() {
- this.currentRetryCount = this.currentRetryCount 1;
- }
- }
3.2、撰写服务项目再试抽象类
- public abstract class CommonMessageRetryService {
- private static final Logger log = LoggerFactory.getLogger(CommonMessageRetryService.class);
- @Autowired
- private RabbitTemplate rabbitTemplate;
- @Autowired
- private MongoTemplate mongoTemplate;
- /**
- * 复位信息
- *
- * @param message
- */
- public void initMessage(Message message) {
- log.info("{} 接到信息: {},业务流程数据信息:{}", this.getClass().getName(), message.toString(), new String(message.getBody()));
- try {
- //封裝信息
- MessageRetryDTO messageRetryDto = buildMessageRetryInfo(message);
- if (log.isInfoEnabled()) {
- log.info("反序列化信息:{}", messageRetryDto.toString());
- }
- prepareAction(messageRetryDto);
- } catch (Exception e) {
- log.warn("解决信息出现异常,错误报告:", e);
- }
- }
- /**
- * 提前准备实行
- *
- * @param retryDto
- */
- protected void prepareAction(MessageRetryDTO retryDto) {
- try {
- execute(retryDto);
- doSuccessCallBack(retryDto);
- } catch (Exception e) {
- log.error("当前任务实行出现异常,业务流程数据信息:" retryDto.toString(), e);
- //实行不成功,测算是不是还必须再次再试
- if (retryDto.checkRetryCount()) {
- if (log.isInfoEnabled()) {
- log.info("再试信息:{}", retryDto.toString());
- }
- retrySend(retryDto);
- } else {
- if (log.isWarnEnabled()) {
- log.warn("当前任务再试频次早已抵达较大 频次,业务流程数据信息:" retryDto.toString(), e);
- }
- doFailCallBack(retryDto.setErrorMsg(e.getMessage()));
- }
- }
- }
- /**
- * 每日任务实行取得成功,回调函数服务项目(依据必须开展重写)
- *
- * @param messageRetryDto
- */
- private void doSuccessCallBack(MessageRetryDTO messageRetryDto) {
- try {
- successCallback(messageRetryDto);
- } catch (Exception e) {
- log.warn("实行取得成功回调函数出现异常,序列叙述:{},不正确缘故:{}", messageRetryDto.getSourceDesc(), e.getMessage());
- }
- }
- /**
- * 每日任务实行不成功,回调函数服务项目(依据必须开展重写)
- *
- * @param messageRetryDto
- */
- private void doFailCallBack(MessageRetryDTO messageRetryDto) {
- try {
- saveMessageRetryInfo(messageRetryDto.setErrorMsg(messageRetryDto.getErrorMsg()));
- failCallback(messageRetryDto);
- } catch (Exception e) {
- log.warn("实行不成功回调函数出现异常,序列叙述:{},不正确缘故:{}", messageRetryDto.getSourceDesc(), e.getMessage());
- }
- }
- /**
- * 执行任务
- *
- * @param messageRetryDto
- */
- protected abstract void execute(MessageRetryDTO messageRetryDto);
- /**
- * 取得成功回调函数
- *
- * @param messageRetryDto
- */
- protected abstract void successCallback(MessageRetryDTO messageRetryDto);
- /**
- * 不成功回调函数
- *
- * @param messageRetryDto
- */
- protected abstract void failCallback(MessageRetryDTO messageRetryDto);
- /**
- * 搭建信息赔偿实体线
- * @param message
- * @return
- */
- private MessageRetryDTO buildMessageRetryInfo(Message message){
- //假如头顶部包括赔偿信息实体线,立即回到
- Map<String, Object> messageHeaders = message.getMessageProperties().getHeaders();
- if(messageHeaders.containsKey("message_retry_info")){
- Object retryMsg = messageHeaders.get("message_retry_info");
- if(Objects.nonNull(retryMsg)){
- return JSONObject.parseObject(String.valueOf(retryMsg), MessageRetryDTO.class);
- }
- }
- //全自动将业务流程信息添加赔偿实体线
- MessageRetryDTO messageRetryDto = new MessageRetryDTO();
- messageRetryDto.setBodyMsg(new String(message.getBody(), StandardCharsets.UTF_8));
- messageRetryDto.setExchangeName(message.getMessageProperties().getReceivedExchange());
- messageRetryDto.setRoutingKey(message.getMessageProperties().getReceivedRoutingKey());
- messageRetryDto.setQueueName(message.getMessageProperties().getConsumerQueue());
- messageRetryDto.setCreateTime(new Date());
- return messageRetryDto;
- }
- /**
- * 出现异常信息再次进库
- * @param retryDto
- */
- private void retrySend(MessageRetryDTO retryDto){
- //将赔偿信息实体线放进头顶部,初始信息內容维持不会改变
- MessageProperties messageProperties = new MessageProperties();
- messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
- messageProperties.setHeader("message_retry_info", JSONObject.toJSON(retryDto));
- Message message = new Message(retryDto.getBodyMsg().getBytes(), messageProperties);
- rabbitTemplate.convertAndSend(retryDto.getExchangeName(), retryDto.getRoutingKey(), message);
- }
- /**
- * 将出现异常信息储存到mongodb中
- * @param retryDto
- */
- private void saveMessageRetryInfo(MessageRetryDTO retryDto){
- try {
- mongoTemplate.save(retryDto, "message_retry_info");
- } catch (Exception e){
- log.error("将出现异常信息储存到mongodb不成功,信息数据信息:" retryDto.toString(), e);
- }
- }
- }
3.3、撰写监视服务项目类
在消費端运用的情况下,也比较简单,比如,对于扣除库存量实际操作,我们可以根据以下方法开展解决!
- @Component
- public class OrderServiceListener extends CommonMessageRetryService {
- private static final Logger log = LoggerFactory.getLogger(OrderServiceListener.class);
- /**
- * 监视订单管理系统提交订单取得成功信息
- * @param message
- */
- @RabbitListener(queues = "mq.order.add")
- public void consume(Message message) {
- log.info("接到订单信息提交订单取得成功信息: {}", message.toString());
- super.initMessage(message);
- }
- @Override
- protected void execute(MessageRetryDTO messageRetryDto) {
- //启用扣除库存量服务项目,将业务流程出现异常抛出来
- }
- @Override
- protected void successCallback(MessageRetryDTO messageRetryDto) {
- //业务流程解决取得成功,回调函数
- }
- @Override
- protected void failCallback(MessageRetryDTO messageRetryDto) {
- //业务流程解决不成功,回调函数
- }
- }
当信息消費不成功,并超出较大 频次时,会将信息储存到 mongodb 中,随后像基本数据库操作一样,能够 根据 web 插口查看出现异常信息,并对于实际情景开展再试!
四、总结
很有可能有的老同学聚会问,为什么不将出现异常信息存有数据库查询?
最初确实是储存在 MYSQL 中,可是伴随着业务流程的迅速发展趋势,订单信息信息算法设计愈来愈繁杂,信息量也十分的大,乃至大到 MYSQL 中的 text 种类都没法储存,另外这类算法设计都不太合适在 MYSQL 中储存,因而将其转移到 mongodb!
文中关键紧紧围绕信息消費不成功这类情景,开展基本的计划方案和编码实践活动解读,很有可能有了解不及时的地区,欢迎批评强调!
五、参照
1、石杉的构架手记 - 如何处理信息消費不成功难题