- 系统架构文档 (architecture/README.md) - 6个领域文档: - 账户域 (01-account.md) - 账务域 (02-ledger.md) - 交易域 (03-transaction.md) - 对账域 (04-reconciliation.md) - 补偿域 (05-compensation.md) - 积分域 (06-points.md) - API 参考文档 (api/README.md) - 前后端对接清单 (integration/frontend-backend.md)
15 KiB
15 KiB
补偿域 (Compensation Domain)
一、领域概述
补偿域负责处理交易过程中的异常情况,包括超时检测、失败重试、死信队列处理等。该域是保障交易最终一致性的关键组件。
二、核心概念
2.1 补偿机制
┌─────────────────────────────────────────────────────────┐
│ 补偿流程 │
│ │
│ 交易超时 ──► 创建补偿任务 ──► 重试处理 ──► 成功/死信 │
│ │ │
│ └──► 指数退避 ──► 最大重试次数 │
└─────────────────────────────────────────────────────────┘
2.2 指数退避策略
重试间隔 = 基础间隔 × 2^重试次数
示例(基础间隔30秒):
- 第1次重试: 30秒后
- 第2次重试: 60秒后
- 第3次重试: 120秒后
- 第4次重试: 240秒后
2.3 死信队列
超过最大重试次数的任务进入死信队列,需要人工处理。
正常队列 ──► 重试失败 ──► 死信队列 ──► 人工处理
三、核心实体
3.1 补偿任务 (CompensationTask)
pub struct CompensationTask {
pub id: i64,
pub txn_no: String, // 关联交易号
pub task_type: CompensationTaskType, // 任务类型
pub status: CompensationTaskStatus, // 任务状态
pub retry_count: i32, // 重试次数
pub max_retries: i32, // 最大重试次数
pub next_retry_at: Option<DateTime<Utc>>,// 下次重试时间
pub error_message: Option<String>, // 错误信息
pub created_at: DateTime<Utc>, // 创建时间
pub updated_at: DateTime<Utc>, // 更新时间
pub completed_at: Option<DateTime<Utc>>, // 完成时间
}
核心方法:
impl CompensationTask {
// 检查是否可重试
pub fn can_retry(&self) -> bool {
self.status == CompensationTaskStatus::Failed
&& self.retry_count < self.max_retries
}
// 检查是否应该进入死信队列
pub fn should_dead_letter(&self) -> bool {
self.status == CompensationTaskStatus::Failed
&& self.retry_count >= self.max_retries
}
// 检查是否已到重试时间
pub fn is_ready_for_retry(&self) -> bool {
if let Some(next_retry) = self.next_retry_at {
Utc::now() >= next_retry
} else {
true
}
}
}
3.2 超时配置 (TimeoutConfig)
pub struct TimeoutConfig {
pub bank_timeout_seconds: i64, // 银行交易超时秒数
pub check_interval_seconds: i64, // 检查间隔秒数
pub retry_base_interval_seconds: i64, // 重试间隔基数
pub max_retries: i32, // 最大重试次数
}
impl Default for TimeoutConfig {
fn default() -> Self {
Self {
bank_timeout_seconds: 300, // 5分钟
check_interval_seconds: 60, // 1分钟检查一次
retry_base_interval_seconds: 30, // 30秒基础重试间隔
max_retries: 3,
}
}
}
计算下次重试时间:
impl TimeoutConfig {
pub fn calculate_next_retry(&self, retry_count: i32) -> DateTime<Utc> {
let delay_seconds = self.retry_base_interval_seconds * (2_i64.pow(retry_count as u32));
Utc::now() + chrono::Duration::seconds(delay_seconds)
}
}
四、枚举类型
4.1 补偿任务类型 (CompensationTaskType)
pub enum CompensationTaskType {
TimeoutCheck, // 超时检查
Reconcile, // 对账补偿
Reverse, // 冲正处理
Retry, // 重试
}
| 类型 | 触发条件 | 处理方式 |
|---|---|---|
| TimeoutCheck | 交易提交银行后超时 | 查询银行状态 |
| Reconcile | 对账发现差异 | 调整账务 |
| Reverse | 需要冲正 | 执行冲正 |
| Retry | 失败需重试 | 重新提交 |
4.2 补偿任务状态 (CompensationTaskStatus)
pub enum CompensationTaskStatus {
Pending, // 待处理
Processing, // 处理中
Completed, // 已完成
Failed, // 失败
DeadLetter, // 死信
}
状态转移图:
┌─────────┐ ┌────────────┐ ┌───────────┐
│ Pending │───►│ Processing │───►│ Completed │
└─────────┘ └────────────┘ └───────────┘
│
▼
┌──────────┐ ┌────────────┐
│ Failed │───►│ DeadLetter │
└──────────┘ └────────────┘
│
▼
┌──────────┐
│ Retry │ (返回 Pending)
└──────────┘
五、处理结果
5.1 补偿处理结果 (CompensationResult)
pub struct CompensationResult {
pub task_id: i64,
pub success: bool,
pub message: String,
pub needs_retry: bool,
}
5.2 超时检测结果 (TimeoutDetectionResult)
pub struct TimeoutDetectionResult {
pub timeout_count: i32, // 检测到的超时交易数
pub task_created: i32, // 创建的补偿任务数
pub failed_count: i32, // 处理失败数
}
六、领域服务
6.1 CompensationService
impl CompensationService {
// ========== 超时检测 ==========
// 检测超时交易
pub async fn detect_timeout_transactions(&self) -> Result<TimeoutDetectionResult>;
// ========== 任务管理 ==========
// 创建补偿任务
pub async fn create_compensation_task(&self, txn_no: &str, task_type: CompensationTaskType) -> Result<CompensationTask>;
// 处理单个补偿任务
pub async fn process_task(&self, task_id: i64) -> Result<CompensationResult>;
// 批量处理待处理任务
pub async fn process_pending_tasks(&self) -> Result<Vec<CompensationResult>>;
// ========== 重试管理 ==========
// 处理待重试任务
pub async fn process_ready_for_retry(&self) -> Result<Vec<CompensationResult>>;
// 标记任务完成
pub async fn mark_completed(&self, task_id: i64) -> Result<()>;
// 标记任务失败并计划重试
pub async fn mark_failed_with_retry(&self, task_id: i64, error: &str) -> Result<()>;
// ========== 死信处理 ==========
// 处理死信任务
pub async fn process_dead_letter_tasks(&self) -> Result<Vec<CompensationResult>>;
// 移入死信队列
pub async fn move_to_dead_letter(&self, task_id: i64) -> Result<()>;
// 手动重试死信任务
pub async fn retry_dead_letter(&self, task_id: i64) -> Result<()>;
}
6.2 超时检测实现
pub async fn detect_timeout_transactions(&self) -> Result<TimeoutDetectionResult> {
// 查找需要检查超时的交易
let bank_submitted_txns = self.txn_repo
.find_by_status(TransactionStatus::BankSubmitted)
.await?;
let mut timeout_count = 0;
let mut task_created = 0;
let mut failed_count = 0;
for txn in bank_submitted_txns {
// 检查是否超时
if txn.is_timeout(self.config.bank_timeout_seconds) {
timeout_count += 1;
// 检查是否已有补偿任务
if self.task_repo.has_pending_task(&txn.txn_no, CompensationTaskType::TimeoutCheck).await? {
continue;
}
// 更新交易状态为 Timeout
match self.txn_repo.update_status(txn.id, TransactionStatus::Timeout).await {
Ok(_) => {
// 创建补偿任务
match self.create_compensation_task(&txn.txn_no, CompensationTaskType::TimeoutCheck).await {
Ok(_) => task_created += 1,
Err(e) => {
warn!("创建补偿任务失败: {}", e);
failed_count += 1;
}
}
}
Err(e) => {
warn!("更新交易状态失败: {}", e);
failed_count += 1;
}
}
}
}
Ok(TimeoutDetectionResult { timeout_count, task_created, failed_count })
}
6.3 任务处理实现
pub async fn process_task(&self, task_id: i64) -> Result<CompensationResult> {
let task = self.task_repo.find_by_id(task_id).await?
.ok_or_else(|| AppError::NotFound("补偿任务不存在".into()))?;
// 更新状态为处理中
self.task_repo.update_status(task_id, CompensationTaskStatus::Processing, None).await?;
match task.task_type {
CompensationTaskType::TimeoutCheck => {
self.handle_timeout_check(&task).await
}
CompensationTaskType::Reconcile => {
self.handle_reconcile(&task).await
}
CompensationTaskType::Reverse => {
self.handle_reverse(&task).await
}
CompensationTaskType::Retry => {
self.handle_retry(&task).await
}
}
}
async fn handle_timeout_check(&self, task: &CompensationTask) -> Result<CompensationResult> {
// 1. 获取关联交易
let txn = self.txn_repo.find_by_txn_no(&task.txn_no).await?
.ok_or_else(|| AppError::NotFound("交易不存在".into()))?;
// 2. 查询银行状态
let bank_status = self.bank_client.query_transaction_status(&txn.bank_ref_no).await;
match bank_status {
Ok(status) if status.is_success() => {
// 银行确认成功,结转在途
self.ledger_service.settle_transit(
txn.from_account_id.unwrap(),
AccountType::Physical,
txn.amount
).await?;
self.txn_repo.update_status(txn.id, TransactionStatus::Success).await?;
self.task_repo.mark_completed(task.id).await?;
Ok(CompensationResult {
task_id: task.id,
success: true,
message: "银行确认成功".to_string(),
needs_retry: false,
})
}
Ok(status) if status.is_failed() => {
// 银行确认失败,回退在途
self.ledger_service.rollback_transit(
txn.from_account_id.unwrap(),
AccountType::Physical,
txn.amount
).await?;
self.txn_repo.update_status(txn.id, TransactionStatus::Failed).await?;
self.task_repo.mark_completed(task.id).await?;
Ok(CompensationResult {
task_id: task.id,
success: true,
message: "银行确认失败,已回退".to_string(),
needs_retry: false,
})
}
_ => {
// 仍然无法确认,需要重试
Ok(CompensationResult {
task_id: task.id,
success: false,
message: "无法确认银行状态".to_string(),
needs_retry: true,
})
}
}
}
七、仓储接口
7.1 CompensationTaskRepository
#[async_trait]
pub trait CompensationTaskRepository: Send + Sync {
// 创建任务
async fn create(&self, task: &CompensationTask) -> Result<i64>;
// 查询
async fn find_by_id(&self, id: i64) -> Result<Option<CompensationTask>>;
async fn find_by_txn_no(&self, txn_no: &str) -> Result<Vec<CompensationTask>>;
async fn find_pending(&self) -> Result<Vec<CompensationTask>>;
async fn find_ready_for_retry(&self) -> Result<Vec<CompensationTask>>;
async fn find_dead_letter(&self) -> Result<Vec<CompensationTask>>;
// 状态更新
async fn update_status(&self, id: i64, status: CompensationTaskStatus, error_message: Option<&str>) -> Result<()>;
async fn increment_retry(&self, id: i64, next_retry_at: DateTime<Utc>) -> Result<()>;
async fn mark_completed(&self, id: i64) -> Result<()>;
async fn mark_dead_letter(&self, id: i64) -> Result<()>;
// 检查
async fn has_pending_task(&self, txn_no: &str, task_type: CompensationTaskType) -> Result<bool>;
}
八、数据库表结构
8.1 compensation_task 表
CREATE TABLE compensation_task (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
txn_no VARCHAR(64) NOT NULL,
task_type VARCHAR(32) NOT NULL,
status VARCHAR(32) DEFAULT 'pending',
retry_count INT DEFAULT 0,
max_retries INT DEFAULT 3,
next_retry_at TIMESTAMP NULL,
error_message TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
completed_at TIMESTAMP NULL,
INDEX idx_txn_no (txn_no),
INDEX idx_status (status),
INDEX idx_next_retry (next_retry_at),
INDEX idx_task_type (task_type)
);
九、调度机制
9.1 定时任务
| 任务 | 频率 | 说明 |
|---|---|---|
| 超时检测 | 每分钟 | 扫描超时交易 |
| 待处理任务 | 每30秒 | 处理待处理任务 |
| 重试任务 | 每分钟 | 处理到期重试任务 |
| 死信巡检 | 每小时 | 检查死信队列 |
9.2 并发控制
- 分布式锁防止重复处理
- 乐观锁防止并发更新
- 任务状态机保证正确流转
十、监控告警
10.1 监控指标
| 指标 | 说明 | 阈值 |
|---|---|---|
| pending_tasks | 待处理任务数 | > 100 告警 |
| failed_tasks | 失败任务数 | > 10 告警 |
| dead_letter_count | 死信数量 | > 0 告警 |
| avg_retry_count | 平均重试次数 | > 2 关注 |
10.2 告警规则
alerts:
- name: compensation_task_backlog
condition: pending_tasks > 100
severity: warning
- name: dead_letter_alert
condition: dead_letter_count > 0
severity: critical
- name: high_failure_rate
condition: failed_tasks / total_tasks > 0.1
severity: error
十一、最佳实践
11.1 任务设计
- 任务幂等:同一任务多次执行结果相同
- 原子操作:任务内操作要么全成功要么全失败
- 超时设置:合理设置任务超时时间
11.2 重试策略
- 指数退避:避免系统过载
- 最大重试:防止无限重试
- 死信处理:异常任务人工介入
11.3 监控运维
- 任务可追溯:完整记录任务生命周期
- 告警及时:异常情况及时通知
- 手动干预:提供手动处理入口