# 补偿域 (Compensation Domain) ## 一、领域概述 补偿域负责处理交易过程中的异常情况,包括超时检测、失败重试、死信队列处理等。该域是保障交易最终一致性的关键组件。 ## 二、核心概念 ### 2.1 补偿机制 ``` ┌─────────────────────────────────────────────────────────┐ │ 补偿流程 │ │ │ │ 交易超时 ──► 创建补偿任务 ──► 重试处理 ──► 成功/死信 │ │ │ │ │ └──► 指数退避 ──► 最大重试次数 │ └─────────────────────────────────────────────────────────┘ ``` ### 2.2 指数退避策略 ``` 重试间隔 = 基础间隔 × 2^重试次数 示例(基础间隔30秒): - 第1次重试: 30秒后 - 第2次重试: 60秒后 - 第3次重试: 120秒后 - 第4次重试: 240秒后 ``` ### 2.3 死信队列 超过最大重试次数的任务进入死信队列,需要人工处理。 ``` 正常队列 ──► 重试失败 ──► 死信队列 ──► 人工处理 ``` ## 三、核心实体 ### 3.1 补偿任务 (CompensationTask) ```rust pub struct CompensationTask { pub id: i64, pub txn_no: String, // 关联交易号 pub task_type: CompensationTaskType, // 任务类型 pub status: CompensationTaskStatus, // 任务状态 pub retry_count: i32, // 重试次数 pub max_retries: i32, // 最大重试次数 pub next_retry_at: Option>,// 下次重试时间 pub error_message: Option, // 错误信息 pub created_at: DateTime, // 创建时间 pub updated_at: DateTime, // 更新时间 pub completed_at: Option>, // 完成时间 } ``` **核心方法**: ```rust impl CompensationTask { // 检查是否可重试 pub fn can_retry(&self) -> bool { self.status == CompensationTaskStatus::Failed && self.retry_count < self.max_retries } // 检查是否应该进入死信队列 pub fn should_dead_letter(&self) -> bool { self.status == CompensationTaskStatus::Failed && self.retry_count >= self.max_retries } // 检查是否已到重试时间 pub fn is_ready_for_retry(&self) -> bool { if let Some(next_retry) = self.next_retry_at { Utc::now() >= next_retry } else { true } } } ``` ### 3.2 超时配置 (TimeoutConfig) ```rust pub struct TimeoutConfig { pub bank_timeout_seconds: i64, // 银行交易超时秒数 pub check_interval_seconds: i64, // 检查间隔秒数 pub retry_base_interval_seconds: i64, // 重试间隔基数 pub max_retries: i32, // 最大重试次数 } impl Default for TimeoutConfig { fn default() -> Self { Self { bank_timeout_seconds: 300, // 5分钟 check_interval_seconds: 60, // 1分钟检查一次 retry_base_interval_seconds: 30, // 30秒基础重试间隔 max_retries: 3, } } } ``` **计算下次重试时间**: ```rust impl TimeoutConfig { pub fn calculate_next_retry(&self, retry_count: i32) -> DateTime { let delay_seconds = self.retry_base_interval_seconds * (2_i64.pow(retry_count as u32)); Utc::now() + chrono::Duration::seconds(delay_seconds) } } ``` ## 四、枚举类型 ### 4.1 补偿任务类型 (CompensationTaskType) ```rust pub enum CompensationTaskType { TimeoutCheck, // 超时检查 Reconcile, // 对账补偿 Reverse, // 冲正处理 Retry, // 重试 } ``` | 类型 | 触发条件 | 处理方式 | |------|----------|----------| | TimeoutCheck | 交易提交银行后超时 | 查询银行状态 | | Reconcile | 对账发现差异 | 调整账务 | | Reverse | 需要冲正 | 执行冲正 | | Retry | 失败需重试 | 重新提交 | ### 4.2 补偿任务状态 (CompensationTaskStatus) ```rust pub enum CompensationTaskStatus { Pending, // 待处理 Processing, // 处理中 Completed, // 已完成 Failed, // 失败 DeadLetter, // 死信 } ``` **状态转移图**: ``` ┌─────────┐ ┌────────────┐ ┌───────────┐ │ Pending │───►│ Processing │───►│ Completed │ └─────────┘ └────────────┘ └───────────┘ │ ▼ ┌──────────┐ ┌────────────┐ │ Failed │───►│ DeadLetter │ └──────────┘ └────────────┘ │ ▼ ┌──────────┐ │ Retry │ (返回 Pending) └──────────┘ ``` ## 五、处理结果 ### 5.1 补偿处理结果 (CompensationResult) ```rust pub struct CompensationResult { pub task_id: i64, pub success: bool, pub message: String, pub needs_retry: bool, } ``` ### 5.2 超时检测结果 (TimeoutDetectionResult) ```rust pub struct TimeoutDetectionResult { pub timeout_count: i32, // 检测到的超时交易数 pub task_created: i32, // 创建的补偿任务数 pub failed_count: i32, // 处理失败数 } ``` ## 六、领域服务 ### 6.1 CompensationService ```rust impl CompensationService { // ========== 超时检测 ========== // 检测超时交易 pub async fn detect_timeout_transactions(&self) -> Result; // ========== 任务管理 ========== // 创建补偿任务 pub async fn create_compensation_task(&self, txn_no: &str, task_type: CompensationTaskType) -> Result; // 处理单个补偿任务 pub async fn process_task(&self, task_id: i64) -> Result; // 批量处理待处理任务 pub async fn process_pending_tasks(&self) -> Result>; // ========== 重试管理 ========== // 处理待重试任务 pub async fn process_ready_for_retry(&self) -> Result>; // 标记任务完成 pub async fn mark_completed(&self, task_id: i64) -> Result<()>; // 标记任务失败并计划重试 pub async fn mark_failed_with_retry(&self, task_id: i64, error: &str) -> Result<()>; // ========== 死信处理 ========== // 处理死信任务 pub async fn process_dead_letter_tasks(&self) -> Result>; // 移入死信队列 pub async fn move_to_dead_letter(&self, task_id: i64) -> Result<()>; // 手动重试死信任务 pub async fn retry_dead_letter(&self, task_id: i64) -> Result<()>; } ``` ### 6.2 超时检测实现 ```rust pub async fn detect_timeout_transactions(&self) -> Result { // 查找需要检查超时的交易 let bank_submitted_txns = self.txn_repo .find_by_status(TransactionStatus::BankSubmitted) .await?; let mut timeout_count = 0; let mut task_created = 0; let mut failed_count = 0; for txn in bank_submitted_txns { // 检查是否超时 if txn.is_timeout(self.config.bank_timeout_seconds) { timeout_count += 1; // 检查是否已有补偿任务 if self.task_repo.has_pending_task(&txn.txn_no, CompensationTaskType::TimeoutCheck).await? { continue; } // 更新交易状态为 Timeout match self.txn_repo.update_status(txn.id, TransactionStatus::Timeout).await { Ok(_) => { // 创建补偿任务 match self.create_compensation_task(&txn.txn_no, CompensationTaskType::TimeoutCheck).await { Ok(_) => task_created += 1, Err(e) => { warn!("创建补偿任务失败: {}", e); failed_count += 1; } } } Err(e) => { warn!("更新交易状态失败: {}", e); failed_count += 1; } } } } Ok(TimeoutDetectionResult { timeout_count, task_created, failed_count }) } ``` ### 6.3 任务处理实现 ```rust pub async fn process_task(&self, task_id: i64) -> Result { let task = self.task_repo.find_by_id(task_id).await? .ok_or_else(|| AppError::NotFound("补偿任务不存在".into()))?; // 更新状态为处理中 self.task_repo.update_status(task_id, CompensationTaskStatus::Processing, None).await?; match task.task_type { CompensationTaskType::TimeoutCheck => { self.handle_timeout_check(&task).await } CompensationTaskType::Reconcile => { self.handle_reconcile(&task).await } CompensationTaskType::Reverse => { self.handle_reverse(&task).await } CompensationTaskType::Retry => { self.handle_retry(&task).await } } } async fn handle_timeout_check(&self, task: &CompensationTask) -> Result { // 1. 获取关联交易 let txn = self.txn_repo.find_by_txn_no(&task.txn_no).await? .ok_or_else(|| AppError::NotFound("交易不存在".into()))?; // 2. 查询银行状态 let bank_status = self.bank_client.query_transaction_status(&txn.bank_ref_no).await; match bank_status { Ok(status) if status.is_success() => { // 银行确认成功,结转在途 self.ledger_service.settle_transit( txn.from_account_id.unwrap(), AccountType::Physical, txn.amount ).await?; self.txn_repo.update_status(txn.id, TransactionStatus::Success).await?; self.task_repo.mark_completed(task.id).await?; Ok(CompensationResult { task_id: task.id, success: true, message: "银行确认成功".to_string(), needs_retry: false, }) } Ok(status) if status.is_failed() => { // 银行确认失败,回退在途 self.ledger_service.rollback_transit( txn.from_account_id.unwrap(), AccountType::Physical, txn.amount ).await?; self.txn_repo.update_status(txn.id, TransactionStatus::Failed).await?; self.task_repo.mark_completed(task.id).await?; Ok(CompensationResult { task_id: task.id, success: true, message: "银行确认失败,已回退".to_string(), needs_retry: false, }) } _ => { // 仍然无法确认,需要重试 Ok(CompensationResult { task_id: task.id, success: false, message: "无法确认银行状态".to_string(), needs_retry: true, }) } } } ``` ## 七、仓储接口 ### 7.1 CompensationTaskRepository ```rust #[async_trait] pub trait CompensationTaskRepository: Send + Sync { // 创建任务 async fn create(&self, task: &CompensationTask) -> Result; // 查询 async fn find_by_id(&self, id: i64) -> Result>; async fn find_by_txn_no(&self, txn_no: &str) -> Result>; async fn find_pending(&self) -> Result>; async fn find_ready_for_retry(&self) -> Result>; async fn find_dead_letter(&self) -> Result>; // 状态更新 async fn update_status(&self, id: i64, status: CompensationTaskStatus, error_message: Option<&str>) -> Result<()>; async fn increment_retry(&self, id: i64, next_retry_at: DateTime) -> Result<()>; async fn mark_completed(&self, id: i64) -> Result<()>; async fn mark_dead_letter(&self, id: i64) -> Result<()>; // 检查 async fn has_pending_task(&self, txn_no: &str, task_type: CompensationTaskType) -> Result; } ``` ## 八、数据库表结构 ### 8.1 compensation_task 表 ```sql CREATE TABLE compensation_task ( id BIGINT PRIMARY KEY AUTO_INCREMENT, txn_no VARCHAR(64) NOT NULL, task_type VARCHAR(32) NOT NULL, status VARCHAR(32) DEFAULT 'pending', retry_count INT DEFAULT 0, max_retries INT DEFAULT 3, next_retry_at TIMESTAMP NULL, error_message TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, completed_at TIMESTAMP NULL, INDEX idx_txn_no (txn_no), INDEX idx_status (status), INDEX idx_next_retry (next_retry_at), INDEX idx_task_type (task_type) ); ``` ## 九、调度机制 ### 9.1 定时任务 | 任务 | 频率 | 说明 | |------|------|------| | 超时检测 | 每分钟 | 扫描超时交易 | | 待处理任务 | 每30秒 | 处理待处理任务 | | 重试任务 | 每分钟 | 处理到期重试任务 | | 死信巡检 | 每小时 | 检查死信队列 | ### 9.2 并发控制 - 分布式锁防止重复处理 - 乐观锁防止并发更新 - 任务状态机保证正确流转 ## 十、监控告警 ### 10.1 监控指标 | 指标 | 说明 | 阈值 | |------|------|------| | pending_tasks | 待处理任务数 | > 100 告警 | | failed_tasks | 失败任务数 | > 10 告警 | | dead_letter_count | 死信数量 | > 0 告警 | | avg_retry_count | 平均重试次数 | > 2 关注 | ### 10.2 告警规则 ```yaml alerts: - name: compensation_task_backlog condition: pending_tasks > 100 severity: warning - name: dead_letter_alert condition: dead_letter_count > 0 severity: critical - name: high_failure_rate condition: failed_tasks / total_tasks > 0.1 severity: error ``` ## 十一、最佳实践 ### 11.1 任务设计 1. 任务幂等:同一任务多次执行结果相同 2. 原子操作:任务内操作要么全成功要么全失败 3. 超时设置:合理设置任务超时时间 ### 11.2 重试策略 1. 指数退避:避免系统过载 2. 最大重试:防止无限重试 3. 死信处理:异常任务人工介入 ### 11.3 监控运维 1. 任务可追溯:完整记录任务生命周期 2. 告警及时:异常情况及时通知 3. 手动干预:提供手动处理入口