RechargeService.php 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448
  1. <?php
  2. namespace App\Services;
  3. use App\Services\BaseService;
  4. use App\Models\Recharge;
  5. use App\Models\User;
  6. use Illuminate\Support\Facades\DB;
  7. use Illuminate\Support\Collection;
  8. use Illuminate\Support\Facades\Cache;
  9. use Illuminate\Support\Facades\Log;
  10. use App\Services\WalletService;
  11. use App\Services\CollectService;
  12. use App\Services\UserService;
  13. use App\Services\BalanceLogService;
  14. use App\Helpers\TronHelper;
  15. use App\Models\Config;
  16. /**
  17. * 用户充值记录
  18. */
  19. class RechargeService extends BaseService
  20. {
  21. public static string $MODEL = Recharge::class;
  22. const PENDING_SYNC_CACHE_KEY = 'usdt_recharge_pending_sync_members';
  23. const RECENT_ACTIVE_SYNC_CACHE_KEY = 'usdt_recharge_recent_active_synced_at';
  24. const TRONGRID_SYNC_LOCK_KEY = 'usdt_recharge_trongrid_sync_lock';
  25. const ADDRESS_SYNC_CACHE_PREFIX = 'usdt_recharge_address_synced_at:';
  26. const PENDING_SYNC_TTL = 7200;
  27. const ADDRESS_SYNC_COOLDOWN = 60;
  28. const TRONGRID_SYNC_LOCK_SECONDS = 2;
  29. const SYNC_BATCH_SIZE = 1;
  30. const RECENT_ACTIVE_MINUTES = 30;
  31. const RECENT_ACTIVE_LIMIT = 20;
  32. const RECENT_ACTIVE_ENQUEUE_INTERVAL = 300;
  33. /**
  34. * @description: 模型
  35. * @return {string}
  36. */
  37. public static function model(): string
  38. {
  39. return Recharge::class;
  40. }
  41. /**
  42. * @description: 枚举
  43. * @return {*}
  44. */
  45. public static function enum(): string
  46. {
  47. return '';
  48. }
  49. /**
  50. * @description: 获取查询条件
  51. * @param {array} $search 查询内容
  52. * @return {array}
  53. */
  54. public static function getWhere(array $search = []): array
  55. {
  56. $where = [];
  57. if (isset($search['coin']) && !empty($search['coin'])) {
  58. $where[] = ['coin', '=', $search['coin']];
  59. }
  60. if (isset($search['net']) && !empty($search['net'])) {
  61. $where[] = ['net', '=', $search['net']];
  62. }
  63. if (isset($search['to_address']) && !empty($search['to_address'])) {
  64. $where[] = ['to_address', '=', $search['to_address']];
  65. }
  66. if (isset($search['id']) && !empty($search['id'])) {
  67. $where[] = ['id', '=', $search['id']];
  68. }
  69. if (isset($search['member_id']) && !empty($search['member_id'])) {
  70. $where[] = ['member_id', '=', $search['member_id']];
  71. }
  72. if (isset($search['txid']) && !empty($search['txid'])) {
  73. $where[] = ['txid', '=', $search['txid']];
  74. }
  75. if (isset($search['status']) && $search['status'] != '') {
  76. $where[] = ['status', '=', $search['status']];
  77. }
  78. if (isset($search['type']) && $search['type'] != '') {
  79. $where[] = ['type', '=', $search['type']];
  80. }
  81. return $where;
  82. }
  83. /**
  84. * @description: 查询单条数据
  85. * @param array $search
  86. * @return \App\Models\Coin|null
  87. */
  88. public static function findOne(array $search): ?Recharge
  89. {
  90. return static::$MODEL::where(self::getWhere($search))->first();
  91. }
  92. /**
  93. * @description: 查询所有数据
  94. * @param array $search
  95. * @return \Illuminate\Database\Eloquent\Collection
  96. */
  97. public static function findAll(array $search = [])
  98. {
  99. return static::$MODEL::where(self::getWhere($search))->get();
  100. }
  101. /**
  102. * @description: 分页查询
  103. * @param array $search
  104. * @return array
  105. */
  106. public static function paginate(array $search = [])
  107. {
  108. $limit = isset($search['limit']) ? $search['limit'] : 15;
  109. $paginator = static::$MODEL::where(self::getWhere($search))
  110. ->with(['member'])
  111. ->orderBy("created_at", 'desc')
  112. ->paginate($limit);
  113. $list = $paginator->items();
  114. $totalAmount = 0;
  115. $totalSuccess = 0;
  116. $totalFail = 0;
  117. foreach ($list as $item) {
  118. $item['amount'] = floatval($item['amount']);
  119. $totalAmount += $item['amount'];
  120. if ($item['status'] == 1) $totalSuccess += $item['amount'];
  121. if ($item['status'] == 2) $totalFail += $item['amount'];
  122. }
  123. return [
  124. 'total' => $paginator->total(),
  125. 'total_amount' => $totalAmount,
  126. 'total_success' => $totalSuccess,
  127. 'total_fail' => $totalFail,
  128. 'data' => $list];
  129. }
  130. /**
  131. * @description: 同步会员的USDT充值记录
  132. * @param {*} $memberId
  133. * @return {*}
  134. */
  135. public static function syncUsdtRechargeRecords($memberId, $walletInfo = null, $force = false)
  136. {
  137. $walletInfo = $walletInfo ?: WalletService::findOne(['member_id' => $memberId, 'coin' => 'USDT']);
  138. if (empty($walletInfo) || empty($walletInfo->address)) {
  139. return 0;
  140. }
  141. if (!$force && !self::canSyncAddress($walletInfo->address)) {
  142. return 0;
  143. }
  144. try {
  145. $data = TronHelper::getTrc20UsdtRecharges($walletInfo->address);
  146. } catch (\Throwable $e) {
  147. Log::warning('同步USDT充值记录失败', [
  148. 'member_id' => $memberId,
  149. 'address' => $walletInfo->address,
  150. 'error' => $e->getMessage(),
  151. ]);
  152. return 0;
  153. }
  154. if (empty($data)) {
  155. return 0;
  156. }
  157. foreach ($data as $k => $v) {
  158. $v['member_id'] = $walletInfo->member_id ?: $memberId;
  159. $v['net'] = $walletInfo->net;
  160. $v['type'] = static::$MODEL::TYPE_AUTO;
  161. $v['created_at'] = now();
  162. $v['updated_at'] = now();
  163. $data[$k] = $v;
  164. }
  165. $m = new (static::$MODEL);
  166. $result = $m->insertOrIgnore($data);
  167. return $result;
  168. }
  169. /**
  170. * @description: 强制同步并确认单个会员的USDT充值记录
  171. * @param {*} $memberId
  172. * @param bool $syncChain 是否先拉链上新充值
  173. * @param bool $force 是否忽略地址同步节流
  174. * @return array
  175. */
  176. public static function syncAndConfirmMemberRecharge($memberId, $syncChain = true, $force = false)
  177. {
  178. $walletInfo = WalletService::findOne(['member_id' => $memberId, 'coin' => 'USDT']);
  179. if (empty($walletInfo) || empty($walletInfo->address)) {
  180. return [
  181. 'success' => false,
  182. 'message' => '未找到该用户的USDT钱包地址',
  183. 'member_id' => $memberId,
  184. ];
  185. }
  186. $synced = 0;
  187. if ($syncChain) {
  188. $synced = self::syncUsdtRechargeRecords($memberId, $walletInfo, $force);
  189. }
  190. $pendingList = self::model()::where(self::getWhere([
  191. 'member_id' => $memberId,
  192. 'status' => self::model()::STATUS_STAY,
  193. 'type' => self::model()::TYPE_AUTO,
  194. ]))->orderBy('id')->get();
  195. $checked = 0;
  196. $confirmed = 0;
  197. $confirmedTxids = [];
  198. foreach ($pendingList as $item) {
  199. $checked++;
  200. self::handleRechargeConfirmation($item->txid);
  201. $fresh = self::findOne(['id' => $item->id]);
  202. if ($fresh && intval($fresh->status) === self::model()::STATUS_SUCCESS) {
  203. $confirmed++;
  204. $confirmedTxids[] = $fresh->txid;
  205. }
  206. }
  207. $remainingPending = self::model()::where(self::getWhere([
  208. 'member_id' => $memberId,
  209. 'status' => self::model()::STATUS_STAY,
  210. 'type' => self::model()::TYPE_AUTO,
  211. ]))->count();
  212. return [
  213. 'success' => true,
  214. 'member_id' => $memberId,
  215. 'address' => $walletInfo->address,
  216. 'synced' => $synced,
  217. 'checked' => $checked,
  218. 'confirmed' => $confirmed,
  219. 'remaining_pending' => $remainingPending,
  220. 'confirmed_txids' => $confirmedTxids,
  221. ];
  222. }
  223. /**
  224. * @description: 标记用户进入USDT充值同步队列
  225. * @param {*} $memberId
  226. * @return void
  227. */
  228. public static function markUsdtRechargePending($memberId)
  229. {
  230. if (empty($memberId)) {
  231. return;
  232. }
  233. $queue = self::getPendingSyncQueue();
  234. $key = strval($memberId);
  235. $now = time();
  236. $queue[$key] = [
  237. 'member_id' => $memberId,
  238. 'last_synced_at' => $queue[$key]['last_synced_at'] ?? 0,
  239. 'updated_at' => $now,
  240. 'expires_at' => $now + self::PENDING_SYNC_TTL,
  241. ];
  242. self::putPendingSyncQueue($queue);
  243. }
  244. /**
  245. * @description: 同步待处理队列中的USDT充值记录
  246. * @param int $limit
  247. * @return array
  248. */
  249. public static function syncPendingUsdtRechargeRecords($limit = self::SYNC_BATCH_SIZE)
  250. {
  251. self::markRecentActiveMembersPending();
  252. $queue = self::getPendingSyncQueue();
  253. uasort($queue, function ($a, $b) {
  254. return ($a['last_synced_at'] ?? 0) <=> ($b['last_synced_at'] ?? 0);
  255. });
  256. $synced = 0;
  257. $checked = 0;
  258. $now = time();
  259. foreach ($queue as $key => $item) {
  260. if ($checked >= $limit) {
  261. break;
  262. }
  263. if (!empty($item['last_synced_at']) && $now - $item['last_synced_at'] < self::ADDRESS_SYNC_COOLDOWN) {
  264. continue;
  265. }
  266. $walletInfo = WalletService::findOne(['member_id' => $item['member_id'] ?? 0, 'coin' => 'USDT']);
  267. if (empty($walletInfo) || empty($walletInfo->address)) {
  268. unset($queue[$key]);
  269. continue;
  270. }
  271. $synced += self::syncUsdtRechargeRecords($walletInfo->member_id, $walletInfo);
  272. $checked++;
  273. $queue[$key]['last_synced_at'] = time();
  274. $queue[$key]['updated_at'] = time();
  275. }
  276. self::putPendingSyncQueue($queue);
  277. return [
  278. 'synced' => $synced,
  279. 'checked' => $checked,
  280. 'queued' => count($queue),
  281. ];
  282. }
  283. /**
  284. * @description: 兼容旧调用,避免再次全量扫描钱包
  285. * @return array
  286. */
  287. public static function syncAllUsdtRechargeRecords()
  288. {
  289. return self::syncPendingUsdtRechargeRecords();
  290. }
  291. private static function canSyncAddress($address)
  292. {
  293. $addressKey = self::ADDRESS_SYNC_CACHE_PREFIX . $address;
  294. if (Cache::has($addressKey)) {
  295. return false;
  296. }
  297. if (!Cache::add(self::TRONGRID_SYNC_LOCK_KEY, time(), self::TRONGRID_SYNC_LOCK_SECONDS)) {
  298. return false;
  299. }
  300. Cache::put($addressKey, time(), self::ADDRESS_SYNC_COOLDOWN);
  301. return true;
  302. }
  303. private static function getPendingSyncQueue()
  304. {
  305. $queue = Cache::get(self::PENDING_SYNC_CACHE_KEY, []);
  306. if (!is_array($queue)) {
  307. return [];
  308. }
  309. $now = time();
  310. foreach ($queue as $key => $item) {
  311. if (!is_array($item) || empty($item['member_id']) || ($item['expires_at'] ?? 0) < $now) {
  312. unset($queue[$key]);
  313. }
  314. }
  315. return $queue;
  316. }
  317. private static function putPendingSyncQueue(array $queue)
  318. {
  319. Cache::put(self::PENDING_SYNC_CACHE_KEY, $queue, self::PENDING_SYNC_TTL);
  320. }
  321. private static function markRecentActiveMembersPending()
  322. {
  323. if (Cache::has(self::RECENT_ACTIVE_SYNC_CACHE_KEY)) {
  324. return;
  325. }
  326. Cache::put(self::RECENT_ACTIVE_SYNC_CACHE_KEY, time(), self::RECENT_ACTIVE_ENQUEUE_INTERVAL);
  327. $since = date('Y-m-d H:i:s', time() - self::RECENT_ACTIVE_MINUTES * 60);
  328. $members = User::whereNotNull('last_active_time')
  329. ->where('last_active_time', '>=', $since)
  330. ->orderByDesc('last_active_time')
  331. ->limit(self::RECENT_ACTIVE_LIMIT)
  332. ->pluck('member_id');
  333. foreach ($members as $memberId) {
  334. self::markUsdtRechargePending($memberId);
  335. }
  336. }
  337. /**
  338. * @description: 充值确认
  339. * @param {*} $txid
  340. * @return {*}
  341. */
  342. public static function handleRechargeConfirmation($txid)
  343. {
  344. $info = self::findOne(['txid' => $txid]); // 获取充值的信息
  345. // 汇率
  346. $rate = Config::where('field', 'exchange_rate_rmb')->first()->val ?? 1;
  347. // 待处理进行充值
  348. if ($info['status'] == static::$MODEL::STATUS_STAY) {
  349. $result = TronHelper::getTransactionConfirmations($txid);
  350. if ($result['success']) {
  351. $data = [];
  352. $data['block_height'] = $result['block_number'];
  353. $data['confirmations'] = $result['latest_block'] - $result['block_number'];
  354. $data['exchange_rate'] = $rate;
  355. if ($data['confirmations'] >= TronHelper::CONFIRMED_NUMBER) {
  356. $data['status'] = static::$MODEL::STATUS_SUCCESS;
  357. $where = self::getWhere(['txid' => $txid, 'status' => static::$MODEL::STATUS_STAY]);
  358. $recharge = static::$MODEL::where($where)->update($data);
  359. $amount = floatval($info->amount);
  360. $rate_amount = bcmul($amount, $rate, 10); // 汇率转换后分数
  361. // 更新成功,变动可用余额
  362. if ($recharge) {
  363. $balanceData = WalletService::updateBalance($info->member_id, $rate_amount);
  364. BalanceLogService::addLog($info->member_id, $rate_amount, $balanceData['before_balance'], $balanceData['after_balance'], '充值', $info->id, '');
  365. CollectService::createCollect($info->to_address, $info->coin, $info->net);
  366. $amount = floatval($info->amount);
  367. $text = "充值结果通知\n";
  368. $text .= "充值数量:{$amount} USDT\n";
  369. $text .= "充值地址:{$info->to_address}\n";
  370. $text .= "汇率:1 USDT = {$rate} RMB\n";
  371. $text .= "折合金额:" . number_format($rate_amount, 2) . " RMB\n";
  372. $text .= "状态:成功\n";
  373. TopUpService::notifyTransferSuccess($info->member_id, $text);
  374. }
  375. }
  376. }
  377. }
  378. }
  379. /**
  380. * @description: 同步待处理的充值记录
  381. * @return {*}
  382. */
  383. public static function syncRechargeStay()
  384. {
  385. $list = self::findAll(['status' => static::$MODEL::STATUS_STAY, 'type' => static::$MODEL::TYPE_AUTO]);
  386. foreach ($list as $k => $v) {
  387. self::handleRechargeConfirmation($v->txid);
  388. }
  389. }
  390. }