diff --git a/yudao-module-pay/yudao-module-pay-biz/src/main/java/cn/iocoder/yudao/module/pay/dal/redis/RedisKeyConstants.java b/yudao-module-pay/yudao-module-pay-biz/src/main/java/cn/iocoder/yudao/module/pay/dal/redis/RedisKeyConstants.java index 30081c6e8..6de0e2144 100644 --- a/yudao-module-pay/yudao-module-pay-biz/src/main/java/cn/iocoder/yudao/module/pay/dal/redis/RedisKeyConstants.java +++ b/yudao-module-pay/yudao-module-pay-biz/src/main/java/cn/iocoder/yudao/module/pay/dal/redis/RedisKeyConstants.java @@ -16,6 +16,15 @@ public interface RedisKeyConstants { */ String PAY_NOTIFY_LOCK = "pay_notify:lock:%d"; + /** + * 支付钱包的分布式锁 + * + * KEY 格式:pay_wallet:lock:%d + * VALUE 数据格式:HASH // RLock.class:Redisson 的 Lock 锁,使用 Hash 数据结构 + * 过期时间:不固定 + */ + String PAY_WALLET_LOCK = "pay_wallet:lock:%d"; + /** * 支付序号的缓存 * diff --git a/yudao-module-pay/yudao-module-pay-biz/src/main/java/cn/iocoder/yudao/module/pay/dal/redis/wallet/PayWalletLockRedisDAO.java b/yudao-module-pay/yudao-module-pay-biz/src/main/java/cn/iocoder/yudao/module/pay/dal/redis/wallet/PayWalletLockRedisDAO.java new file mode 100644 index 000000000..1c427278d --- /dev/null +++ b/yudao-module-pay/yudao-module-pay-biz/src/main/java/cn/iocoder/yudao/module/pay/dal/redis/wallet/PayWalletLockRedisDAO.java @@ -0,0 +1,42 @@ +package cn.iocoder.yudao.module.pay.dal.redis.wallet; + +import jakarta.annotation.Resource; +import org.redisson.api.RLock; +import org.redisson.api.RedissonClient; +import org.springframework.stereotype.Repository; + +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; + +import static cn.iocoder.yudao.module.pay.dal.redis.RedisKeyConstants.PAY_WALLET_LOCK; + +/** + * 支付钱包的锁 Redis DAO + * + * @author 芋道源码 + */ +@Repository +public class PayWalletLockRedisDAO { + + @Resource + private RedissonClient redissonClient; + + public V lock(Long id, Long timeoutMillis, Callable callable) throws Exception { + String lockKey = formatKey(id); + RLock lock = redissonClient.getLock(lockKey); + try { + lock.lock(timeoutMillis, TimeUnit.MILLISECONDS); + // 执行逻辑 + return callable.call(); + } catch (Exception e) { + throw e; + } finally { + lock.unlock(); + } + } + + private static String formatKey(Long id) { + return String.format(PAY_WALLET_LOCK, id); + } + +} diff --git a/yudao-module-pay/yudao-module-pay-biz/src/main/java/cn/iocoder/yudao/module/pay/service/wallet/PayWalletServiceImpl.java b/yudao-module-pay/yudao-module-pay-biz/src/main/java/cn/iocoder/yudao/module/pay/service/wallet/PayWalletServiceImpl.java index b844e3769..cc9d570a2 100644 --- a/yudao-module-pay/yudao-module-pay-biz/src/main/java/cn/iocoder/yudao/module/pay/service/wallet/PayWalletServiceImpl.java +++ b/yudao-module-pay/yudao-module-pay-biz/src/main/java/cn/iocoder/yudao/module/pay/service/wallet/PayWalletServiceImpl.java @@ -2,17 +2,20 @@ package cn.iocoder.yudao.module.pay.service.wallet; import cn.hutool.core.lang.Assert; import cn.iocoder.yudao.framework.common.pojo.PageResult; +import cn.iocoder.yudao.framework.common.util.date.DateUtils; import cn.iocoder.yudao.module.pay.controller.admin.wallet.vo.wallet.PayWalletPageReqVO; import cn.iocoder.yudao.module.pay.dal.dataobject.order.PayOrderExtensionDO; import cn.iocoder.yudao.module.pay.dal.dataobject.refund.PayRefundDO; import cn.iocoder.yudao.module.pay.dal.dataobject.wallet.PayWalletDO; import cn.iocoder.yudao.module.pay.dal.dataobject.wallet.PayWalletTransactionDO; import cn.iocoder.yudao.module.pay.dal.mysql.wallet.PayWalletMapper; +import cn.iocoder.yudao.module.pay.dal.redis.wallet.PayWalletLockRedisDAO; import cn.iocoder.yudao.module.pay.enums.wallet.PayWalletBizTypeEnum; import cn.iocoder.yudao.module.pay.service.order.PayOrderService; import cn.iocoder.yudao.module.pay.service.refund.PayRefundService; import cn.iocoder.yudao.module.pay.service.wallet.bo.WalletTransactionCreateReqBO; import jakarta.annotation.Resource; +import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.springframework.context.annotation.Lazy; import org.springframework.stereotype.Service; @@ -32,10 +35,17 @@ import static cn.iocoder.yudao.module.pay.enums.wallet.PayWalletBizTypeEnum.PAYM */ @Service @Slf4j -public class PayWalletServiceImpl implements PayWalletService { +public class PayWalletServiceImpl implements PayWalletService { + + /** + * 通知超时时间,单位:毫秒 + */ + public static final long UPDATE_TIMEOUT_MILLIS = 120 * DateUtils.SECOND_MILLIS; @Resource private PayWalletMapper walletMapper; + @Resource + private PayWalletLockRedisDAO lockRedisDAO; @Resource @Lazy // 延迟加载,避免循环依赖 @@ -121,75 +131,87 @@ public class PayWalletServiceImpl implements PayWalletService { } @Override + @Transactional(rollbackFor = Exception.class) + @SneakyThrows public PayWalletTransactionDO reduceWalletBalance(Long walletId, Long bizId, PayWalletBizTypeEnum bizType, Integer price) { // 1. 获取钱包 PayWalletDO payWallet = getWallet(walletId); if (payWallet == null) { - log.error("[reduceWalletBalance],用户钱包({})不存在.", walletId); + log.error("[reduceWalletBalance][用户钱包({})不存在]", walletId); throw exception(WALLET_NOT_FOUND); } - // 2.1 扣除余额 - int updateCounts; - switch (bizType) { - case PAYMENT: { - updateCounts = walletMapper.updateWhenConsumption(payWallet.getId(), price); - break; + // 2. 加锁,更新钱包余额(目的:避免钱包流水的并发更新时,余额变化不连贯) + return lockRedisDAO.lock(walletId, UPDATE_TIMEOUT_MILLIS, () -> { + // 2. 扣除余额 + int updateCounts; + switch (bizType) { + case PAYMENT: { + updateCounts = walletMapper.updateWhenConsumption(payWallet.getId(), price); + break; + } + case RECHARGE_REFUND: { + updateCounts = walletMapper.updateWhenRechargeRefund(payWallet.getId(), price); + break; + } + default: { + // TODO 其它类型待实现 + throw new UnsupportedOperationException("待实现"); + } } - case RECHARGE_REFUND: { - updateCounts = walletMapper.updateWhenRechargeRefund(payWallet.getId(), price); - break; + if (updateCounts == 0) { + throw exception(WALLET_BALANCE_NOT_ENOUGH); } - default: { - // TODO 其它类型待实现 - throw new UnsupportedOperationException("待实现"); - } - } - if (updateCounts == 0) { - throw exception(WALLET_BALANCE_NOT_ENOUGH); - } - // 2.2 生成钱包流水 - Integer afterBalance = payWallet.getBalance() - price; - WalletTransactionCreateReqBO bo = new WalletTransactionCreateReqBO().setWalletId(payWallet.getId()) - .setPrice(-price).setBalance(afterBalance).setBizId(String.valueOf(bizId)) - .setBizType(bizType.getType()).setTitle(bizType.getDescription()); - return walletTransactionService.createWalletTransaction(bo); + + // 3. 生成钱包流水 + Integer afterBalance = payWallet.getBalance() - price; + WalletTransactionCreateReqBO bo = new WalletTransactionCreateReqBO().setWalletId(payWallet.getId()) + .setPrice(-price).setBalance(afterBalance).setBizId(String.valueOf(bizId)) + .setBizType(bizType.getType()).setTitle(bizType.getDescription()); + return walletTransactionService.createWalletTransaction(bo); + }); } @Override + @Transactional(rollbackFor = Exception.class) + @SneakyThrows public PayWalletTransactionDO addWalletBalance(Long walletId, String bizId, PayWalletBizTypeEnum bizType, Integer price) { - // 1.1 获取钱包 + // 1. 获取钱包 PayWalletDO payWallet = getWallet(walletId); if (payWallet == null) { - log.error("[addWalletBalance],用户钱包({})不存在.", walletId); + log.error("[addWalletBalance][用户钱包({})不存在]", walletId); throw exception(WALLET_NOT_FOUND); } - // 1.2 更新钱包金额 - switch (bizType) { - case PAYMENT_REFUND: { // 退款更新 - walletMapper.updateWhenConsumptionRefund(payWallet.getId(), price); - break; - } - case RECHARGE: { // 充值更新 - walletMapper.updateWhenRecharge(payWallet.getId(), price); - break; - } - case UPDATE_BALANCE: // 更新余额 - walletMapper.updateWhenRecharge(payWallet.getId(), price); - break; - default: { - // TODO 其它类型待实现 - throw new UnsupportedOperationException("待实现"); - } - } - // 2. 生成钱包流水 - WalletTransactionCreateReqBO transactionCreateReqBO = new WalletTransactionCreateReqBO() - .setWalletId(payWallet.getId()).setPrice(price).setBalance(payWallet.getBalance() + price) - .setBizId(bizId).setBizType(bizType.getType()).setTitle(bizType.getDescription()); - return walletTransactionService.createWalletTransaction(transactionCreateReqBO); + // 2. 加锁,更新钱包余额(目的:避免钱包流水的并发更新时,余额变化不连贯) + return lockRedisDAO.lock(walletId, UPDATE_TIMEOUT_MILLIS, () -> { + // 2. 更新钱包金额 + switch (bizType) { + case PAYMENT_REFUND: { // 退款更新 + walletMapper.updateWhenConsumptionRefund(payWallet.getId(), price); + break; + } + case RECHARGE: { // 充值更新 + walletMapper.updateWhenRecharge(payWallet.getId(), price); + break; + } + case UPDATE_BALANCE: // 更新余额 + walletMapper.updateWhenRecharge(payWallet.getId(), price); + break; + default: { + // TODO 其它类型待实现 + throw new UnsupportedOperationException("待实现"); + } + } + + // 3. 生成钱包流水 + WalletTransactionCreateReqBO transactionCreateReqBO = new WalletTransactionCreateReqBO() + .setWalletId(payWallet.getId()).setPrice(price).setBalance(payWallet.getBalance() + price) + .setBizId(bizId).setBizType(bizType.getType()).setTitle(bizType.getDescription()); + return walletTransactionService.createWalletTransaction(transactionCreateReqBO); + }); } @Override diff --git a/yudao-module-pay/yudao-module-pay-biz/src/main/java/cn/iocoder/yudao/module/pay/service/wallet/PayWalletTransactionServiceImpl.java b/yudao-module-pay/yudao-module-pay-biz/src/main/java/cn/iocoder/yudao/module/pay/service/wallet/PayWalletTransactionServiceImpl.java index a2f3d92d6..4c01cd85f 100644 --- a/yudao-module-pay/yudao-module-pay-biz/src/main/java/cn/iocoder/yudao/module/pay/service/wallet/PayWalletTransactionServiceImpl.java +++ b/yudao-module-pay/yudao-module-pay-biz/src/main/java/cn/iocoder/yudao/module/pay/service/wallet/PayWalletTransactionServiceImpl.java @@ -87,8 +87,6 @@ public class PayWalletTransactionServiceImpl implements PayWalletTransactionServ @Override public AppPayWalletTransactionSummaryRespVO getWalletTransactionSummary(Long userId, Integer userType, LocalDateTime[] createTime) { PayWalletDO wallet = payWalletService.getOrCreateWallet(userId, userType); - AppPayWalletTransactionSummaryRespVO summary = new AppPayWalletTransactionSummaryRespVO() - .setTotalExpense(1).setTotalIncome(100); return new AppPayWalletTransactionSummaryRespVO() .setTotalExpense(payWalletTransactionMapper.selectPriceSum(wallet.getId(), TYPE_EXPENSE, createTime)) .setTotalIncome(payWalletTransactionMapper.selectPriceSum(wallet.getId(), TYPE_INCOME, createTime));