前言

在橙单的基础框架中,我们提供了 common-datasync 数据同步组件。其主要适用于,对数据同步可靠性要求极高,特别是对数据操作的顺序有着严格一致性要求的业务场景。与此同时,同步的并发性和频度都不能过高,否则会引发性能问题。目前在橙单中的主要业务场景如下。

  • 数据权限中的部门同步,数据权限需要支持基于部门及其子部门的数据过滤。在微服务架构中,有些业务表的数据和部门表位于不同的数据库中,并由不同的微服务提供数据访问接口。比如「工程项目表」和「部门表」分属不同的数据库,那么试想一下,在查询工程项目的列表数据时,如果需要进行部门及子部门级别的数据过滤,原有正常的 SQL 查询语句是无法实现该功能的。然而数据权限过滤是属于非常通用且高频的业务场景,最佳的实现方式就是在每个业务数据库中,均实时备份完整的「部门及所有下级部门的关联关系表」数据,具体可参考开发文档 部门组织结构章节。这样所有的数据权限过滤,基于普通的 SQL 即可完成。
  • 多租户架构中的租户权限数据同步,租户及其权限数据,通常都是由租户平台的后台管理用户,在「租户后台管理系统」中进行维护的。并将增删改后的数据实时同步到每个租户所在的「租户业务运营」数据库中。这样在租户用户登录时,便可以直接从租户业务数据库中获取租户及租户用户所需的各种权限数据了。
  • 多租户架构中的租户基础数据同步,如全局编码字典、在线表单数据源、报表数据源等,具体同步规则基本等同于上一点。

生产者

以极高的可靠性保证同步方数据的变化,被实时发送到消息队列 (RocketMQ),发送过程中,不仅要确保 AT LEAST ONCE,同时还要保证同步端数据变化的顺序与消息投递的顺序完全一致,这可以简单的理解为是一次重放式的数据同步。

设计目标

  • 高可靠性。毕竟操作数据库本地事务与发送消息的行为,不可能真正的做到原子性,因此如果本地事务提交成功,而消息却发送失败 (如服务所在进程瞬时崩溃),此时就需要有后续的定时补偿机制,将同步端变化的操作数据,再次发送给消息队列,直到成功。
  • 高可用性。我们通常都会同时启动多个应用服务来保证业务端的 HA,在定时任务进行同步数据补偿发送时,需要以分布式锁的方式进行排他操作,以避免两个 Job 同时补偿相同数据的错误情况发生。
  • 顺序的一致性。比如,新增了一个部门,紧跟着又修改了该部门的数据,如果此时投递同步消息的顺序不是按照「新增部门 -> 修改部门」顺序发送到消息队列,那么消费者在收到消息后,就一定会出现数据操作逻辑上的错误。

实现逻辑

这里我们以最典型的新增租户的业务实现为例。

  • 在保存租户业务数据的事务中,先保存租户的数据到租户后台管理数据库。
  • 在同一事务中,保存一份待同步消息的数据,到租户后台数据库的同步消息生产者流水表中 (zz_data_sync_producer_trans),状态为「尚未发送」。
  • 以上两步数据操作,一定是在同一事务内完成。
  • 在本地事务提交成功后,通知消息发送线程,有新消息来了,赶快干活儿。
  • 数据同步消息发送线程,会从生产者流水表 (zz_data_sync_producer_trans) 中读取尚未同步的数据列表。
  • 发送过程要严格保证数据插入到流水表的顺序和消息发送的顺序完全一致,前面没有发送成功,就不能发送后面的数据。
  • 当多条消息发送成功后,批量更新生产者流水表中的数据发送状态。
  • 在以上步骤中,如果消息发送环节出现任何异常,都会导致消息发送的暂时中断。定时补偿任务会自动定时启动,扫描生产者流水表中尚未发送的消息数据,并进行补偿发送,直到最终发送成功。

流程图

代码分析

  • 在生产者业务服务实现类的事务方法中 (SysTenantServiceImpl),保存本地业务数据的同时,还要调用 common-datasync 中的顺序发送同步数据的方法 DataSyncRocketMqProducer.sendOrderly。
@Transactional(rollbackFor = Exception.class)
@Override
public SysTenant saveNew(SysTenant sysTenant) {
   // 在本地存储新增租户的业务数据。
   this.buildDefaultValue(sysTenant);
   sysTenantMapper.insert(sysTenant);
   List<SysTenantMenu> menuList = sysTenantMenuService.getAllList();
   // 下面的代码都是为了数据同步服务的。
   // 1. 打包需要同步的租户业务数据,包括租户对象数据,以及与其关联的菜单数据。
   JSONObject messageJsonData = new JSONObject();
   messageJsonData.put("sysTenant", sysTenant);
   if (CollUtil.isNotEmpty(menuList)) {
       messageJsonData.put("menuList", menuList);
   }
   // 2. 将前端打包的消息数据,在当前事务内,先保存到本地生产者流水表中(zz_data_sync_producer_trans),
   // 同时发布事务提交事件,同时消息同步发送线程,开始干活了。
   dataSyncProducer.sendOrderly(
           sysTenant.getTenantId(),
           applicationConfig.getTenantUpmsSyncTopic(),
           modelClass.getSimpleName(),
           DataSyncCommandType.INSERT.name(),
           messageJsonData.toJSONString(),
           MultiTenantConstant.MESSAGE_QUEUE_SELECTOR_KEY);
   return sysTenant;
}
  • 顺序发送方法 (sendOrderly) 与他的调用者,通常位于同一事务之内,因此保存生产者消息流水和业务数据变化的操作,是会同时成功或同时失败回滚的。依据数据库事务隔离性的原则,不同的数据库 Session 之间,是无法读取到尚未提交的事务数据的。因此,需要在事务提交之后,才能通知消息发送线程开始读取生产者消息并执行同步发送。
public void sendOrderly(
       Object id,
       String topic,
       String messageType,
       String commandType,
       String messageData,
       String messageQueueSelectorKey) {
   // 与业务数据同库的生产者流水表中(zz_data_sync_producer_trans),保存本次发送的消息水流数据。
   DataSyncProducerTrans trans = producerTransService.saveNew(
           id, topic, messageType, commandType, messageData, messageQueueSelectorKey);
   // 发送事务提交事件,在事务提交后才会触发,该事件的监听器方法,会通知发送者线程,新消息来了。
   applicationEventPublisher.publishEvent(new DataSyncTransEvent(lock, condition, trans));
}
  • 消息发送线程,在没有同步任务时,都会处于休眠状态,因此对于系统性能影响极低。该线程出现异常失败,定时任务也会重启拉起,从而保证消息同步发送过程的可靠性。当启动多个服务时,每个服务都消息同步线程都会操作相同的数据,为了避免出现多线程的数据竞用问题,我们在消息同步线程中加了分布式锁,以保证在同一时刻,只有一个服务实例的消息同步线程处于工作状态。
private void messageSender() {
   lock.lock();
   try {
       // 无论是被通知唤醒,还是自动超时,都需要读取数据同步的生产者流水数据。
       // 唤醒一般表示新消息来了,所以及时性是可以保证的。而自动超时,通常用于未正常发送消息数据的补偿,
       // 如果没有待补偿数据,唤醒后发现没有数据,也会重新回到休眠状态的。
       condition.await(producerProperties.getFetchTransIntervalMins(), TimeUnit.MINUTES);
       // 为了避免与该其他服务实例的发送者线程产生竞用冲突,只有拿到分布式锁的一个实例才能执行消息发送任务。
       if (distLock.tryLock()) {
           try {
               do {
                   // 从当前业务库的生产者流水表中,读取尚未发送的同步消息数据。
                   List<DataSyncProducerTrans> transList =
                           producerTransService.getLatestNonUpdateTransList();
                   // 如果没有待发送消息,就直接退出了,并很快再次进入休眠状态。
                   if (CollUtil.isEmpty(transList)) {
                       log.info("No DataSync Trans Data");
                       break;
                   }
                   List<Long> sentIdList = new ArrayList<>(transList.size());
                   // 迭代并逐个顺序发送生产者消息到RocketMQ。
                   for (DataSyncProducerTrans trans : transList) {
                       // 根据参数构建消息数据对象。
                       Message<String> m = MessageBuilder.withPayload(trans.getMessageData())
                               .setHeader(RocketMQHeaders.TRANSACTION_ID, trans.getTransId())
                               .setHeader(RocketMQHeaders.KEYS, trans.getTransId()).build();
                       // 同步顺序的发送消息,并按照消息的(MessageQueueSelectorKey)选择topic中的MessageQueue,
                       // 从而保障相同的MessageQueueSelectorKey,都会被投放到同一MessageQueue。
                       SendResult result = rocketMqTemplate.syncSendOrderly(
                               trans.getMessageTopic(), m, trans.getMessageQueueSelectorKey());
                       if (result.getSendStatus() != SendStatus.SEND_OK) {
                           log.error("Failed to send message for TransId {}," +
                                   "MessageType {}, CommandType{}, traceId{}, sendResult{}",
                                   trans.getTransId(),
                                   trans.getMessageType(),
                                   trans.getMessageCommand(),
                                   trans.getProducerTraceId(),
                                   result.getSendStatus());
                           break;
                       }
                       sentIdList.add(trans.getId());
                   }
                   // 同步更新流水数据的已发送状态,以及当前操作之后,已发送流水的最大主键Id。
                   // 这里是批量同步的,以便尽量较少与数据库的交互次数,提升效率。
                   if (CollUtil.isNotEmpty(sentIdList)) {
                       producerTransService.batchUpdate(sentIdList);
                   }
                   // 只有当发送消息出现问题的时候,才会走这里的分支。
                   // 出现该场景后,直接退出本次任务,下次再次唤醒时进行重试。
                   if (sentIdList.size() != transList.size()) {
                       break;
                   }
               } while (true);
           } catch (Exception e) {
               log.error("Failed to execute sendMessage or batchUpdate", e);
           } finally {
               distLock.unlock();
           }
       }
   } catch (Exception e) {
       log.error("Failed to messageSender", e);
   } finally {
       lock.unlock();
   }
}

消费者

以极高的可靠性保证同步消息被正常的消费,消费过程中,不仅要确保 AT LEAST ONCE 和幂等性,同时还要保证数据消费的顺序和发送顺序完全一致。此外,当某一被同步的目标数据库出现故障时,其余目标数据库的同步操作不会受到任何影响,一旦恢复后,对该目标数据库的消息同步操作也将自动恢复,并按顺序逐个执行每个消息的数据同步操作。

设计目标

  • 高可靠性。可以为每个目标数据库,提供一个 RocketMQ 的消费者组,每个消费者组包含多个消费者。这样当某一目标数据库出现故障时,其他数据库的消费者仍然可以正常工作。故障数据库恢复后,与其对应的消费者会继续执行该数据库的数据同步。
  • 高可用性。根据 RocketMQ 的消费者组机制,同一主题的同一消费者组内的多个消费者,在同一时刻只能有一个消费者消费消息,一旦该消费者的服务进程崩溃,同一群组内的其他消费者可以继续提供该主题的消费服务。由此可以保证数据同步的整体 HA 能力。
  • 幂等性。在同一事务内, 会将业务同步数据写入到目标数据表和数据同步消费流水表 (zz_data_sync_consumer_trans),利用消息Id的唯一性,避免同一消息的数据被多次执行,

实现逻辑

这里我们继续以最典型的新增租户的业务实现为例。

  • 在 RocketMQ 的消费者监听器对象中,从指定主题实时读取待同步的消息。并根据消息类型找到负责处理该消息的服务实现类,如本例中的 SysTenantServiceImpl。
  • 在服务实现类的基类中 (DataSyncConsumerService),先判断当前消息Id是否已经被处理过,如已处理,则自动跳过,从而保证消费的幂等性。
  • 在同一事物中,调用业务服务实现类的处理方法。
  • 在该方法中,会先行判断当前租户所在的业务数据库,与该消费者服务实例所配置的目标数据库是否一致,如不一致则跳过,说明该租户的数据没有存储在当前服务实例的目标数据库中,应由其他服务实例消费处理。
  • 在目标数据库中,保存租户的基本信息,以及初始化的权限数据,预置的管理员账户信息等。
  • 再次回到基类 (DataSyncConsumerService) 的 doHandle 方法,保存本次同步操作的消费流水数据,以保证消费的幂等性。
  • 以上几步数据操作,一定是在同一事务内完成。
  • 提交消息队列,确认消息处理完毕。
  • 在以上所述步骤中,业务数据的同步和消费流水的插入是在同一事务中提交的,如果该操作处理成功,然而 RocketMQ 的消息确认失败,下次重复读取相同消息时,可在消费流水表中查询具有全局唯一性的消息Id,如已存在,则直接跳过业务处理,并再次完成消息的成功消费确认。

流程图

代码分析

  • 在消费者业务服务中 (tenant-sync),我们通常会为每一种消息类型,注册一个业务服务实现类,如本例的 (SysTenantServiceImpl),用于处理该类型消息的数据同步逻辑。因此,在 common-datasync 的消费者监听方法中,会根据当前消息的类型,定位到与该类型的业务服务实现类,并调用通用的消息处理方法。
// 这里必须实现RocketMQListener接口。
@Slf4j
public abstract class BaseDataSyncRocketMqConsumer 
       implements RocketMQListener<MessageExt>, ApplicationContextAware {
   @Autowired
   private DataSyncConsumerProperties consumerProperties;
   @Autowired
   private DataSyncConsumerService consumerService;
   
   protected Map<String, BaseDataSyncConsumerService> serviceMap = new HashMap<>();
   // ... ... 为了节省篇幅,这里省略一部分与核心处理机制无关的代码。
 
   // RocketMQ监听器的回调模板方法。
   @Override
   public void onMessage(@NotNull MessageExt message) {
       // 逐个接口消息的各种参数和头数据,以便于后续的业务处理。
       String transId = message.getProperty(RocketMQHeaders.KEYS);
       String messageType = message.getProperty(DataSyncConstant.MESSAGE_HEADER_KEY_MESSAGE_TYPE);
       String messageCommand = message.getProperty(DataSyncConstant.MESSAGE_HEADER_KEY_COMMAND_TYPE);
       String producerTraceId = message.getProperty(DataSyncConstant.MESSAGE_HEADER_KEY_PRODUCER_TRACE_ID);
       JSONObject messageJsonData = JSON.parseObject(new String(message.getBody()));
       // 为了保证业务操作日志的可追溯性,我们这里将继续引用生产者traceId,并记录到消费者日志中。
       if (StrUtil.isNotBlank(producerTraceId)) {
           MDC.put(ApplicationConstant.HTTP_HEADER_TRACE_ID, producerTraceId);
       }
       try {
           this.handleMessage(transId, messageType, messageCommand, messageJsonData);
           log.info("End to Synchronize {}.", transId);
       } catch (Exception e) {
           log.error("Failed to consumer Message for TransId [" + transId + "].", e);
           throw e;
       }
   }
 
   protected void handleMessage(
           String transId, String messageType, String messageCommand, JSONObject messageJsonData) {
       Integer originalType = null;
       // 业务服务实现子类提供的方法,可以在这个方法中指定自己的数据源类型,这里会根据返回值,实现多数据源的自动切换。
       if (this.getDatasourceType() != null) {
           originalType = DataSourceContextHolder.setDataSourceType(this.getDatasourceType());
       }
       // 根据效率类型,找到匹配的业务服务实现类,在本例中就是后面给出的SysTenantServiceImpl。
       BaseDataSyncConsumerService service = serviceMap.get(messageType);
       Assert.notNull(service);
       try {
           // 调用业务处理方法了。
           consumerService.handleMessage(service, transId, messageCommand, messageJsonData);
       } finally {
           if (this.getDatasourceType() != null) {
               DataSourceContextHolder.unset(originalType);
           }
       }
   }
}
  • 为了保证消费端统一的幂等性处理,我们为业务服务实现类提供了一个公共基类,该基类只是提供了与幂等性相关的代码逻辑。需要特别留意的是,下面的 handleMessage 方法,是包含事务注解的,所以业务数据的处理和消费流水的保存,位于同一事务之内,即同时成功或同时失败回滚。
@Slf4j
@Service
public class DataSyncConsumerService {
   @Autowired
   private DataSyncConsumerTransService consumerTransService;

   // 为了避免重复同步更新,这里会插入消费流水数据,以保证消费操作的幂等性。
   @Transactional(rollbackFor = Exception.class)
   public void handleMessage(
           BaseDataSyncConsumerService service,
           String transId,
           String messageCommand,
           JSONObject messageJsonData) {
       // 判断当前消息的流水Id,在消费者流水表中是否存在,存在则跳过处理,
       // 直接反馈给消息队列,该消息已经正常消费了。
       if (consumerTransService.exist(transId)) {
           log.info("TransId {} has been processed before.", transId);
           return;
       }
       log.info("TransId {} begin processing.", transId);
       // 这里才是真正调用业务服务实现类中的实际业务处理的方法。
       service.doHandle(transId, messageCommand, messageJsonData);
       // 业务处理完毕后,保存当前消息的消费流水到同步目标库的消费流水表 (zz_data_sync_consumer_trans)。
       consumerTransService.saveNew(transId);
       log.info("TransId {} end to process.", transId);
   }
}
  • 对于实际的同步数据业务处理,这里我们给出了多租户的数据同步业务服务实现类的代码,作为参考。
@Slf4j
@Service
public class SysTenantServiceImpl implements BaseDataSyncConsumerService {
   @Autowired
   private TenantUpmsSyncConsumer tenantUpmsSyncConsumer;
   @Autowired
   private SysTenantMapper sysTenantMapper;
   @Autowired
   private SysTenantMenuMapper sysTenantMenuMapper;
   @Autowired
   private IdGeneratorWrapper idGenerator;
   
   // 在注册到指定消费者后,该Service对象处理的具体消息类型。
   @Override
   public String handleMessageType() {
       return SysTenant.class.getSimpleName();
   }
   @Override
   public void doHandle(String transId, String messageCommand, JSONObject messageJsonData) {
       // 解包租户同步消息数据。
       SysTenant sysTenant = messageJsonData.getJSONObject("sysTenant").toJavaObject(SysTenant.class);
       // 如果当前租户所在数据库,与当前同步服务实例所对应的目标数据库不同,就直接跳过了。
       if (!tenantUpmsSyncConsumer.acceptTenantDatabaseRouteKey(sysTenant.getDatabaseRouteKey())) {
           return;
       }
       // 判断消息的命令类型,该值可以自己随便自定义。多租户的消息我们只是定义了INSERT/UPDATE/DELETE。
       if (messageCommand.equals(DataSyncCommandType.INSERT.name())) {
           JSONArray menuListJson = messageJsonData.getJSONArray("menuList");
           List<SysTenantMenu> menuList = null;
           if (menuListJson != null && menuListJson.size() > 0) {
               menuList = menuListJson.toJavaList(SysTenantMenu.class);
           }
           // 保存租户和租户权限相关的数据,到目标的租户业务数据库。
           this.saveNew(sysTenant, menuList);
           // 为该租户创建管理账户。
           this.saveTenantAdminUserData(sysTenant.getTenantId());
       } else if (messageCommand.equals(DataSyncCommandType.UPDATE.name())) {
           this.update(sysTenant);
       } else if (messageCommand.equals(DataSyncCommandType.DELETE.name())) {
           sysTenantMapper.deleteById(sysTenant.getTenantId());
       }
   }
   // ... ... 为了节省篇幅,我们省略了若干其他代码。
}

结语

赠人玫瑰,手有余香,感谢您的支持和关注,选择橙单,效率乘三,收入翻番。