基于 DDD、EventSourcing 的现代响应式 CQRS 架构微服务开发框架
领域驱动 | 事件驱动 | 测试驱动 | 声明式设计 | 响应式编程 | 命令查询职责分离 | 事件溯源
更新内容 🎉 🎉 🎉
支持标记忽略事件溯源接口(IgnoreSourcing)
- 依赖: 更新 CosId 版本 v2.4.0
- 依赖: 更新 io.gitlab.arturbosch.detekt 版本 v1.23.1
- 特性: 增强 Spring-Boot 配置提示
- 特性: 增强单元测试套件断言失败提示
- 依赖: 更新 com.google.guava:guava 版本 v32.1.2-jre
- 特性: 支持标记忽略事件溯源接口(IgnoreSourcing)
- 应用场景:执行聚合根命令时,业务校验失败需要生成失败事件以便下游订阅者处理。并且不需要溯源领域事件。
- 触发条件:
- 领域事件标记 ErrorInfo ,标记该事件为失败事件
- 领域事件标记 IgnoreSourcing
- 领域事件版本=1
- 影响:
- 忽略该领域事件的溯源,且不会变更聚合版本。
- 将忽略未初始化的状态聚合发送到状态事件总线。
- 聚合快照处理器将无法接受到该状态事件,即不会存储到快照仓库。
架构图
事件源
可观测性
Spring WebFlux 集成
自动注册 命令 路由处理函数 (HandlerFunction),开发人员仅需编写领域模型,即可完成服务开发。
测试套件:80%+ 的测试覆盖率轻而易举
Given -> When -> Expect 。
前置条件
- 理解 领域驱动设计:《实现领域驱动设计》、《领域驱动设计:软件核心复杂性应对之道》
- 理解 命令查询职责分离(CQRS)
- 理解 事件溯源架构
- 理解 响应式编程
特性
-
Aggregate Modeling
- Single Class
- Inheritance Pattern
- Aggregation Pattern
-
Saga Modeling
-
StatelessSaga
-
兼容性测试规范(TCK)
- EventStore
- MongoDB (Recommend)
-
R2dbc
- Database Sharding
- Table Sharding
- Redis
-
MongoDB
- Database Sharding
- Table Sharding
-
SENT : 命令发送成功后发送完成信号
-
InMemoryCommandBus
-
InMemoryDomainEventBus
-
InMemoryStateEventBus
-
Spring Boot Auto Configuration
-
OpenTelemetry
-
wow-compiler
Example
Example
单元测试套件
80%+ 的测试覆盖率轻而易举。
Given -> When -> Expect 。
Aggregate Unit Test (AggregateVerifier)
Aggregate Test
internal class OrderTest { private fun mockCreateOrder(): VerifiedStage<OrderState> { val tenantId = GlobalIdGenerator.generateAsString() val customerId = GlobalIdGenerator.generateAsString() val orderItem = OrderItem( GlobalIdGenerator.generateAsString(), GlobalIdGenerator.generateAsString(), BigDecimal.valueOf(10), 10, ) val orderItems = listOf(orderItem) val inventoryService = object : InventoryService { override fun getInventory(productId: String): Mono<Int> { return orderItems.filter { it.productId == productId }.map { it.quantity }.first().toMono() } } val pricingService = object : PricingService { override fun getProductPrice(productId: String): Mono<BigDecimal> { return orderItems.filter { it.productId == productId }.map { it.price }.first().toMono() } } return aggregateVerifier<Order, OrderState>(tenantId = tenantId) .inject(DefaultCreateOrderSpec(inventoryService, pricingService)) .given() .`when`(CreateOrder(customerId, orderItems, SHIPPING_ADDRESS, false)) .expectEventCount(1) .expectEventType(OrderCreated::class.java) .expectStateAggregate { assertThat(it.aggregateId.tenantId, equalTo(tenantId)) } .expectState { assertThat(it.id, notNullValue()) assertThat(it.customerId, equalTo(customerId)) assertThat(it.address, equalTo(SHIPPING_ADDRESS)) assertThat(it.items, equalTo(orderItems)) assertThat(it.status, equalTo(OrderStatus.CREATED)) } .verify() } /** * 创建订单 */ @Test fun createOrder() { mockCreateOrder() } @Test fun createOrderGivenEmptyItems() { val customerId = GlobalIdGenerator.generateAsString() aggregateVerifier<Order, OrderState>() .inject(mockk<CreateOrderSpec>(), "createOrderSpec") .given() .`when`(CreateOrder(customerId, listOf(), SHIPPING_ADDRESS, false)) .expectErrorType(IllegalArgumentException::class.java) .expectStateAggregate { /* * 该聚合对象处于未初始化状态,即该聚合未创建成功. */ assertThat(it.initialized, equalTo(false)) }.verify() } /** * 创建订单-库存不足 */ @Test fun createOrderWhenInventoryShortage() { val customerId = GlobalIdGenerator.generateAsString() val orderItem = OrderItem( GlobalIdGenerator.generateAsString(), GlobalIdGenerator.generateAsString(), BigDecimal.valueOf(10), 10, ) val orderItems = listOf(orderItem) val inventoryService = object : InventoryService { override fun getInventory(productId: String): Mono<Int> { return orderItems.filter { it.productId == productId } /* * 模拟库存不足 */ .map { it.quantity - 1 }.first().toMono() } } val pricingService = object : PricingService { override fun getProductPrice(productId: String): Mono<BigDecimal> { return orderItems.filter { it.productId == productId }.map { it.price }.first().toMono() } } aggregateVerifier<Order, OrderState>() .inject(DefaultCreateOrderSpec(inventoryService, pricingService)) .given() .`when`(CreateOrder(customerId, orderItems, SHIPPING_ADDRESS, false)) /* * 期望:库存不足异常. */ .expectErrorType(InventoryShortageException::class.java) .expectStateAggregate { /* * 该聚合对象处于未初始化状态,即该聚合未创建成功. */ assertThat(it.initialized, equalTo(false)) }.verify() } /** * 创建订单-下单价格与当前价格不一致 */ @Test fun createOrderWhenPriceInconsistency() { val customerId = GlobalIdGenerator.generateAsString() val orderItem = OrderItem( GlobalIdGenerator.generateAsString(), GlobalIdGenerator.generateAsString(), BigDecimal.valueOf(10), 10, ) val orderItems = listOf(orderItem) val inventoryService = object : InventoryService { override fun getInventory(productId: String): Mono<Int> { return orderItems.filter { it.productId == productId }.map { it.quantity }.first().toMono() } } val pricingService = object : PricingService { override fun getProductPrice(productId: String): Mono<BigDecimal> { return orderItems.filter { it.productId == productId } /* * 模拟下单价格、商品定价不一致 */ .map { it.price.plus(BigDecimal.valueOf(1)) }.first().toMono() } } aggregateVerifier<Order, OrderState>() .inject(DefaultCreateOrderSpec(inventoryService, pricingService)) .given() .`when`(CreateOrder(customerId, orderItems, SHIPPING_ADDRESS, false)) /* * 期望:价格不一致异常. */ .expectErrorType(PriceInconsistencyException::class.java).verify() } }
Saga Unit Test (SagaVerifier)
Saga Test
class CartSagaTest { @Test fun onOrderCreated() { val orderItem = OrderItem( GlobalIdGenerator.generateAsString(), GlobalIdGenerator.generateAsString(), BigDecimal.valueOf(10), 10, ) sagaVerifier<CartSaga>() .`when`( mockk<OrderCreated> { every { customerId } returns "customerId" every { items } returns listOf(orderItem) every { fromCart } returns true }, ) .expectCommandBody<RemoveCartItem> { assertThat(it.id, equalTo("customerId")) assertThat(it.productIds, hasSize(1)) assertThat(it.productIds.first(), equalTo(orderItem.productId)) } .verify() } }
设计
聚合建模
Single Class | Inheritance Pattern | Aggregation Pattern |
---|---|---|