• 正文
    • 一、案例背景
    • 二、領(lǐng)域事件
    • 三、環(huán)境安裝
    • 四、工程實現(xiàn)
    • 五、測試驗證
  • 推薦器件
  • 相關(guān)推薦
申請入駐 產(chǎn)業(yè)圖譜

DDD 架構(gòu),MQ 應(yīng)該放哪一層使用?

2023/08/01
2480
加入交流群
掃碼加入
獲取工程師必備禮包
參與熱點資訊討論

作者:小傅哥
博客:https://bugstack.cn

本文的宗旨在于通過簡單干凈實踐的方式教會讀者,使用 Docker 配置 RocketMQ 并在基于 DDD 分層結(jié)構(gòu)的 SpringBoot 工程中使用 RocketMQ 技術(shù)。因為大部分 MQ 的發(fā)送都是基于特定業(yè)務(wù)場景的,所以本章節(jié)也是基于 《MyBatis 使用教程和插件開發(fā)》 章節(jié)的擴展。

本章也會包括關(guān)于 MQ 消息的發(fā)送和接收應(yīng)該處于 DDD 的哪一層的實踐講解和使用。

本文涉及的工程:

    xfg-dev-tech-rocketmq:https://gitcode.net/KnowledgePlanet/road-map/xfg-dev-tech-rocketmqRocketMQ Docker 安裝:rocketmq-docker-compose-mac-amd-arm.yml導(dǎo)入測試庫表 road-map.sql

一、案例背景

首先我們要知道,MQ 消息的作用是用于;解耦過長的業(yè)務(wù)流程應(yīng)對流量沖擊的消峰。如;用戶下單支付完成后,拿到支付消息推動后續(xù)的發(fā)貨流程。也可以是我們基于 《MyBatis 使用教程和插件開發(fā)》 中的案例場景,給雇員提升級別和薪資的時候,也發(fā)送一條MQ消息,用于發(fā)送郵件通知給用戶。

    從薪資調(diào)整到郵件發(fā)送,這里是2個業(yè)務(wù)流程,通過 MQ 消息的方式進行連接。其實MQ消息的使用場景特別多,原來你可能使用多線程的一些操作,現(xiàn)在就擴展為多實例的操作了。發(fā)送 MQ 消息出來,讓應(yīng)用的各個實例接收并進行消費。

二、領(lǐng)域事件

因為我們本章所講解的內(nèi)容是把 RocketMQ 放入 DDD 架構(gòu)中進行使用,那么也就引申出領(lǐng)域事件定義。所以我們先來了解下,什么是領(lǐng)域事件。

領(lǐng)域事件,可以說是解耦微服務(wù)設(shè)計的關(guān)鍵。領(lǐng)域事件也是領(lǐng)域模型中非常重要的一部分內(nèi)容,用于標(biāo)示當(dāng)前領(lǐng)域模型中發(fā)生的事件行為。一個領(lǐng)域事件會推進業(yè)務(wù)流程的進一步操作,在實現(xiàn)業(yè)務(wù)解耦的同時,也推動了整個業(yè)務(wù)的閉環(huán)。

    首先,我們需要在領(lǐng)域模型層,添加一塊 event 區(qū)域。它的存在是為了定義出于當(dāng)前領(lǐng)域下所需的事件消息信息。信息的類型可以是model 下的實體對象、聚合對象。之后,消息的發(fā)送是放在基礎(chǔ)設(shè)置層。本身基礎(chǔ)設(shè)置層就是依賴倒置于模型層,所以在模型層所定義的 event 對象,可以很方便的在基礎(chǔ)設(shè)置層使用。而且大部分開發(fā)的時候,MQ消息的發(fā)送與數(shù)據(jù)庫操作都是關(guān)聯(lián)的,采用的方式是,做完數(shù)據(jù)落庫后,推送MQ消息。所以定義在倉儲中實現(xiàn),會更加得心應(yīng)手、水到渠成。最后,就是 MQ 的消息,MQ 的消費可以是自身服務(wù)所發(fā)出的消息,也可以是外部其他微服務(wù)的消息。就在小傅哥所整體講述的這套簡明教程中 DDD 部分的觸發(fā)器層。

三、環(huán)境安裝

本案例涉及了數(shù)據(jù)庫和RocketMQ的使用,都已經(jīng)在工程中提供了安裝腳本,可以按需執(zhí)行。

這里主要介紹 RocketMQ 的安裝;

1. 執(zhí)行 compose yml

文件:docs/rocketmq/rocketmq-docker-compose-mac-amd-arm.yml - 關(guān)于安裝小傅哥提供了不同的鏡像,包括Mac、Mac M1、Windows 可以按需選擇使用。

version:?'3'
services:
??#?https://hub.docker.com/r/xuchengen/rocketmq
??#?注意修改項;
??#?01:data/rocketmq/conf/broker.conf?添加?brokerIP1=127.0.0.1
??#?02:data/console/config/application.properties?server.port=9009?-?如果8080端口被占用,可以修改或者添加映射端口
??rocketmq:
????image:?livinphp/rocketmq:5.1.0
????container_name:?rocketmq
????ports:
??????-?9009:9009
??????-?9876:9876
??????-?10909:10909
??????-?10911:10911
??????-?10912:10912
????volumes:
??????-?./data:/home/app/data
????environment:
??????TZ:?"Asia/Shanghai"
??????NAMESRV_ADDR:?"rocketmq:9876"
    • 在 IDEA 中打開 rocketmq-docker-compose-mac-amd-arm.yml 你會看到一個綠色的按鈕在左側(cè)側(cè)邊欄,點擊即可安裝?;蛘吣阋部梢允褂妹畎惭b:

# /usr/local/bin/docker-compose -f /docs/dev-ops/environment/environment-docker-compose.yml up -d

    - 比較適合在云服務(wù)器上執(zhí)行。首次安裝可能使用不了,一個原因是 brokerIP1 未配置IP,另外一個是默認的 8080 端口占用??梢园凑杖缦滦「蹈缯f的方式修改。

2. 修改默認配合

    1. 打開

data/rocketmq/conf/broker.conf

    1. 添加一條

brokerIP1=127.0.0.1

    在結(jié)尾
#?集群名稱
brokerClusterName?=?DefaultCluster
#?BROKER?名稱
brokerName?=?broker-a
#?0?表示?Master,?>?0?表示?Slave
brokerId?=?0
#?刪除文件時間點,默認凌晨?4?點
deleteWhen?=?04
#?文件保留時間,默認?48?小時
fileReservedTime?=?48
#?BROKER?角色?ASYNC_MASTER為異步主節(jié)點,SYNC_MASTER為同步主節(jié)點,SLAVE為從節(jié)點
brokerRole?=?ASYNC_MASTER
#?刷新數(shù)據(jù)到磁盤的方式,ASYNC_FLUSH?刷新
flushDiskType?=?ASYNC_FLUSH
#?存儲路徑
storePathRootDir?=?/home/app/data/rocketmq/store
#?IP地址
brokerIP1?=?127.0.0.1
    1. 打開 ``data/console/config/application.properties

修改

    server.port=9009` 端口。
server.address=0.0.0.0
server.port=9009
    修改配置后,重啟服務(wù)。

3. RockMQ登錄與配置

3.1 登錄

RocketMQ 此鏡像,會在安裝后在控制臺打印登錄賬號信息,你可以查看使用。

登錄:http://localhost:9009/

3.2 創(chuàng)建Topic

    • 也可以使用命令創(chuàng)建:

docker exec -it rocketmq sh /home/app/rocketmq/bin/mqadmin updateTopic -n localhost:9876 -c DefaultCluster -t xfg-mq

3.3 創(chuàng)建消費者組

    • 也可以使用命令創(chuàng)建:

docker exec -it rocketmq sh /home/app/rocketmq/bin/mqadmin updateSubGroup -n localhost:9876 -c DefaultCluster -g xfg-group

四、工程實現(xiàn)

1. 工程結(jié)構(gòu)

    MQ 的使用無論是 RocketMQ 還是 Kafka 等,都很簡單。但在使用之前,要考慮好怎么在架構(gòu)中合理的使用。如果最初沒有定義好這些,那么胡亂的任何地方都能發(fā)送和接收MQ,最后的工程將非常難以維護。所以這里整個MQ的生產(chǎn)和消費,是按照整個 DDD 領(lǐng)域事件結(jié)構(gòu)進行設(shè)計。分為在 domain 使用基礎(chǔ)層生產(chǎn)消息,再有 trigger 層接收消息。

2. 配置文件

引入POM

<!--?https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client-java?-->
<dependency>
????<groupId>org.apache.rocketmq</groupId>
????<artifactId>rocketmq-client-java</artifactId>
????<version>5.0.4</version>
</dependency>
<dependency>
????<groupId>org.apache.rocketmq</groupId>
????<artifactId>rocketmq-spring-boot-starter</artifactId>
????<version>2.2.0</version>
</dependency>

添加配置

#?RocketMQ?配置
rocketmq:
??name-server:?127.0.0.1:9876
??consumer:
????group:?xfg-group
????#?一次拉取消息最大值,注意是拉取消息的最大值而非消費最大值
????pull-batch-size:?10
??producer:
????#?發(fā)送同一類消息的設(shè)置為同一個group,保證唯一
????group:?xfg-group
????#?發(fā)送消息超時時間,默認3000
????sendMessageTimeout:?10000
????#?發(fā)送消息失敗重試次數(shù),默認2
????retryTimesWhenSendFailed:?2
????#?異步消息重試此處,默認2
????retryTimesWhenSendAsyncFailed:?2
????#?消息最大長度,默認1024?*?1024?*?4(默認4M)
????maxMessageSize:?4096
????#?壓縮消息閾值,默認4k(1024?*?4)
????compressMessageBodyThreshold:?4096
????#?是否在內(nèi)部發(fā)送失敗時重試另一個broker,默認false
????retryNextServer:?false

3. 定義領(lǐng)域事件

源碼:cn.bugstack.xfg.dev.tech.domain.salary.event.SalaryAdjustEvent

@EqualsAndHashCode(callSuper?=?true)
@Data
public?class?SalaryAdjustEvent?extends?BaseEvent<AdjustSalaryApplyOrderAggregate>?{

????public?static?String?TOPIC?=?"xfg-mq";

????public?static?SalaryAdjustEvent?create(AdjustSalaryApplyOrderAggregate?adjustSalaryApplyOrderAggregate)?{
????????SalaryAdjustEvent?event?=?new?SalaryAdjustEvent();
????????event.setId(RandomStringUtils.randomNumeric(11));
????????event.setTimestamp(new?Date());
????????event.setData(adjustSalaryApplyOrderAggregate);
????????return?event;
????}

}
    每個領(lǐng)域的消息,都有領(lǐng)域自己定義。發(fā)送的時候再交給基礎(chǔ)設(shè)施層來發(fā)送。

4. 消息發(fā)送

源碼:cn.bugstack.xfg.dev.tech.infrastructure.event.EventPublisher

@Component
@Slf4j
public?class?EventPublisher?{

????@Setter(onMethod_?=?@Autowired)
????private?RocketMQTemplate?rocketmqTemplate;

????/**
?????*?普通消息
?????*
?????*?@param?topic???主題
?????*?@param?message?消息
?????*/
????public?void?publish(String?topic,?BaseEvent<?>?message)?{
????????try?{
????????????String?mqMessage?=?JSON.toJSONString(message);
????????????log.info("發(fā)送MQ消息?topic:{}?message:{}",?topic,?mqMessage);
????????????rocketmqTemplate.convertAndSend(topic,?mqMessage);
????????}?catch?(Exception?e)?{
????????????log.error("發(fā)送MQ消息失敗?topic:{}?message:{}",?topic,?JSON.toJSONString(message),?e);
????????????//?大部分MQ發(fā)送失敗后,會需要任務(wù)補償
????????}
????}

????/**
?????*?延遲消息
?????*
?????*?@param?topic??????????主題
?????*?@param?message????????消息
?????*?@param?delayTimeLevel?延遲時長
?????*/
????public?void?publishDelivery(String?topic,?BaseEvent<?>?message,?int?delayTimeLevel)?{
????????try?{
????????????String?mqMessage?=?JSON.toJSONString(message);
????????????log.info("發(fā)送MQ延遲消息?topic:{}?message:{}",?topic,?mqMessage);
????????????rocketmqTemplate.syncSend(topic,?MessageBuilder.withPayload(message).build(),?1000,?delayTimeLevel);
????????}?catch?(Exception?e)?{
????????????log.error("發(fā)送MQ延遲消息失敗?topic:{}?message:{}",?topic,?JSON.toJSONString(message),?e);
????????????//?大部分MQ發(fā)送失敗后,會需要任務(wù)補償
????????}
????}

}
    在基礎(chǔ)設(shè)施層提供 event 事件的處理,也就是 MQ 消息的發(fā)送。

源碼:cn.bugstack.xfg.dev.tech.infrastructure.repository.SalaryAdjustRepository

@Resource
private?EventPublisher?eventPublisher;
????
@Override
@Transactional(rollbackFor?=?Exception.class,?timeout?=?350,?propagation?=?Propagation.REQUIRED,?isolation?=?Isolation.DEFAULT)
public?String?adjustSalary(AdjustSalaryApplyOrderAggregate?adjustSalaryApplyOrderAggregate)?{
???
??//?...?省略部分代碼?

????eventPublisher.publish(SalaryAdjustEvent.TOPIC,?SalaryAdjustEvent.create(adjustSalaryApplyOrderAggregate));
????return?orderId;
}

在 SalaryAdjustRepository 倉儲的實現(xiàn)中,做完業(yè)務(wù)流程開始發(fā)送 MQ 消息。這里有2點要注意;

    消息發(fā)送,不要寫在數(shù)據(jù)庫事務(wù)中。因為事務(wù)一直占用數(shù)據(jù)庫連接,需要快速釋放。對于一些強MQ要求的場景,需要在發(fā)送MQ前,寫入一條數(shù)據(jù)庫 Task 記錄,發(fā)送消息后更新 Task 狀態(tài)為成功。如果長時間未更新數(shù)據(jù)庫狀態(tài)或者為失敗的,則需要由任務(wù)補償進行處理。

5. 消費消息

源碼:cn.bugstack.xfg.dev.tech.trigger.mq.SalaryAdjustMQListener

@Component
@Slf4j
@RocketMQMessageListener(topic?=?"xfg-mq",?consumerGroup?=?"xfg-group")
public?class?SalaryAdjustMQListener?implements?RocketMQListener<String>?{

????@Override
????public?void?onMessage(String?s)?{
????????log.info("接收到MQ消息?{}",?s);
????}

}
    消費消息,配置消費者組合消費的主題,之后就可以接收到消息了。接收以后你可以做自己的業(yè)務(wù),如果拋出異常,消息會進行重新接收處理。

五、測試驗證

1. 單獨發(fā)送消息測試

@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest
public?class?RocketMQTest?{

????@Setter(onMethod_?=?@Autowired)
????private?RocketMQTemplate?rocketmqTemplate;

????@Test
????public?void?test()?throws?InterruptedException?{
????????while?(true)?{
????????????rocketmqTemplate.convertAndSend("xfg-mq",?"我是測試消息");
????????????Thread.sleep(3000);
????????}
????}

}
    這里方便你來發(fā)送消息,驗證流程。

2. 業(yè)務(wù)流程消息驗證

@Test
public?void?test_execSalaryAdjust()?throws?InterruptedException?{
????AdjustSalaryApplyOrderAggregate?adjustSalaryApplyOrderAggregate?=?AdjustSalaryApplyOrderAggregate.builder()
????????????.employeeNumber("10000001")
????????????.orderId("100908977676003")
????????????.employeeEntity(EmployeeEntity.builder().employeeLevel(EmployeePostVO.T3).employeeTitle(EmployeePostVO.T3).build())
????????????.employeeSalaryAdjustEntity(EmployeeSalaryAdjustEntity.builder()
????????????????????.adjustTotalAmount(new?BigDecimal(100))
????????????????????.adjustBaseAmount(new?BigDecimal(80))
????????????????????.adjustMeritAmount(new?BigDecimal(20)).build())
????????????.build();
????String?orderId?=?salaryAdjustApplyService.execSalaryAdjust(adjustSalaryApplyOrderAggregate);
????log.info("調(diào)薪測試?req:?{}?res:?{}",?JSON.toJSONString(adjustSalaryApplyOrderAggregate),?orderId);
????Thread.sleep(Integer.MAX_VALUE);
}
23-07-29.15:40:52.307?[main????????????]?INFO??HikariDataSource???????-?HikariPool-1?-?Start?completed.
23-07-29.15:40:52.445?[main????????????]?INFO??EventPublisher?????????-?發(fā)送MQ消息?topic:xfg-mq?message:{"data":{"employeeEntity":{"employeeLevel":"T3","employeeTitle":"T3"},"employeeNumber":"10000001","employeeSalaryAdjustEntity":{"adjustBaseAmount":80,"adjustMeritAmount":20,"adjustTotalAmount":100},"orderId":"100908977676004"},"id":"98117654515","timestamp":"2023-07-29?15:40:52.425"}
23-07-29.15:40:52.517?[main????????????]?INFO??ISalaryAdjustApplyServiceTest?-?調(diào)薪測試?req:?{"employeeEntity":{"employeeLevel":"T3","employeeTitle":"T3"},"employeeNumber":"10000001","employeeSalaryAdjustEntity":{"adjustBaseAmount":80,"adjustMeritAmount":20,"adjustTotalAmount":100},"orderId":"100908977676004"}?res:?100908977676004
23-07-29.15:40:52.520?[ConsumeMessageThread_1]?INFO??SalaryAdjustMQListener?-?接收到MQ消息?{"data":{"employeeEntity":{"employeeLevel":"T3","employeeTitle":"T3"},"employeeNumber":"10000001","employeeSalaryAdjustEntity":{"adjustBaseAmount":80,"adjustMeritAmount":20,"adjustTotalAmount":100},"orderId":"100908977676004"},"id":"98117654515","timestamp":"2023-07-29?15:40:52.425"}
    當(dāng)執(zhí)行一次加薪調(diào)整后,就會接收到MQ消息了。

-你好,我是小傅哥。一線互聯(lián)網(wǎng)java?工程師、T8架構(gòu)師,開發(fā)過交易&營銷、寫過運營&活動、設(shè)計過中間件也倒騰過中繼器、IO板卡。不只是寫Java語言,也搞過C#、PHP,是一個技術(shù)活躍的折騰者。

推薦器件

更多器件
器件型號 數(shù)量 器件廠商 器件描述 數(shù)據(jù)手冊 ECAD模型 風(fēng)險等級 參考價格 更多信息
SN74LVC1G14DCKRE4 1 Texas Instruments Single 1.65-V to 5.5-V inverter with Schmitt-Trigger inputs 5-SC70 -40 to 125

ECAD模型

下載ECAD模型
$0.41 查看
AT24CM02-SSHD-B 1 Atmel Corporation EEPROM, 256KX8, Serial, CMOS, PDSO8, 0.150 INCH, GREEN, PLASTIC, MS-012AA, SOIC-8

ECAD模型

下載ECAD模型
$3.3 查看
ASEM1-16.000MHZ-LC-T 1 Abracon Corporation MEMS OSC XO 16.0000MHZ CMOS SMD
$1.63 查看

相關(guān)推薦

登錄即可解鎖
  • 海量技術(shù)文章
  • 設(shè)計資源下載
  • 產(chǎn)業(yè)鏈客戶資源
  • 寫文章/發(fā)需求
立即登錄

作者小傅哥多年從事一線互聯(lián)網(wǎng)Java開發(fā),從19年開始編寫工作和學(xué)習(xí)歷程的技術(shù)匯總,旨在為大家提供一個較清晰詳細的核心技能學(xué)習(xí)文檔。如果本文能為您提供幫助,請給予支持(關(guān)注、點贊、分享)!