RechargeService.php 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497
  1. <?php
  2. namespace App\Services;
  3. use App\Services\BaseService;
  4. use App\Models\Collect;
  5. use App\Models\Recharge;
  6. use App\Models\User;
  7. use Illuminate\Support\Facades\DB;
  8. use Illuminate\Support\Collection;
  9. use Illuminate\Support\Facades\Cache;
  10. use Illuminate\Support\Facades\Log;
  11. use App\Services\WalletService;
  12. use App\Services\CollectService;
  13. use App\Services\UserService;
  14. use App\Services\BalanceLogService;
  15. use App\Helpers\TronHelper;
  16. use App\Models\Config;
  17. /**
  18. * 用户充值记录
  19. */
  20. class RechargeService extends BaseService
  21. {
  22. public static string $MODEL = Recharge::class;
  23. const PENDING_SYNC_CACHE_KEY = 'usdt_recharge_pending_sync_members';
  24. const RECENT_ACTIVE_SYNC_CACHE_KEY = 'usdt_recharge_recent_active_synced_at';
  25. const TRONGRID_SYNC_LOCK_KEY = 'usdt_recharge_trongrid_sync_lock';
  26. const ADDRESS_SYNC_CACHE_PREFIX = 'usdt_recharge_address_synced_at:';
  27. const PENDING_SYNC_TTL = 7200;
  28. const ADDRESS_SYNC_COOLDOWN = 60;
  29. const TRONGRID_SYNC_LOCK_SECONDS = 2;
  30. const SYNC_BATCH_SIZE = 1;
  31. const RECENT_ACTIVE_MINUTES = 30;
  32. const RECENT_ACTIVE_LIMIT = 20;
  33. const RECENT_ACTIVE_ENQUEUE_INTERVAL = 300;
  34. /**
  35. * @description: 模型
  36. * @return {string}
  37. */
  38. public static function model(): string
  39. {
  40. return Recharge::class;
  41. }
  42. /**
  43. * @description: 枚举
  44. * @return {*}
  45. */
  46. public static function enum(): string
  47. {
  48. return '';
  49. }
  50. /**
  51. * @description: 获取查询条件
  52. * @param {array} $search 查询内容
  53. * @return {array}
  54. */
  55. public static function getWhere(array $search = []): array
  56. {
  57. $where = [];
  58. if (isset($search['coin']) && !empty($search['coin'])) {
  59. $where[] = ['coin', '=', $search['coin']];
  60. }
  61. if (isset($search['net']) && !empty($search['net'])) {
  62. $where[] = ['net', '=', $search['net']];
  63. }
  64. if (isset($search['to_address']) && !empty($search['to_address'])) {
  65. $where[] = ['to_address', '=', $search['to_address']];
  66. }
  67. if (isset($search['id']) && !empty($search['id'])) {
  68. $where[] = ['id', '=', $search['id']];
  69. }
  70. if (isset($search['member_id']) && !empty($search['member_id'])) {
  71. $where[] = ['member_id', '=', $search['member_id']];
  72. }
  73. if (isset($search['txid']) && !empty($search['txid'])) {
  74. $where[] = ['txid', '=', $search['txid']];
  75. }
  76. if (isset($search['status']) && $search['status'] != '') {
  77. $where[] = ['status', '=', $search['status']];
  78. }
  79. if (isset($search['type']) && $search['type'] != '') {
  80. $where[] = ['type', '=', $search['type']];
  81. }
  82. return $where;
  83. }
  84. /**
  85. * @description: 查询单条数据
  86. * @param array $search
  87. * @return \App\Models\Coin|null
  88. */
  89. public static function findOne(array $search): ?Recharge
  90. {
  91. return static::$MODEL::where(self::getWhere($search))->first();
  92. }
  93. /**
  94. * @description: 查询所有数据
  95. * @param array $search
  96. * @return \Illuminate\Database\Eloquent\Collection
  97. */
  98. public static function findAll(array $search = [])
  99. {
  100. return static::$MODEL::where(self::getWhere($search))->get();
  101. }
  102. /**
  103. * @description: 分页查询
  104. * @param array $search
  105. * @return array
  106. */
  107. public static function paginate(array $search = [])
  108. {
  109. $limit = isset($search['limit']) ? $search['limit'] : 15;
  110. $paginator = static::$MODEL::where(self::getWhere($search))
  111. ->with(['member'])
  112. ->orderBy("created_at", 'desc')
  113. ->paginate($limit);
  114. $list = $paginator->items();
  115. $totalAmount = 0;
  116. $totalSuccess = 0;
  117. $totalFail = 0;
  118. foreach ($list as $item) {
  119. $item['amount'] = floatval($item['amount']);
  120. $totalAmount += $item['amount'];
  121. if ($item['status'] == 1) $totalSuccess += $item['amount'];
  122. if ($item['status'] == 2) $totalFail += $item['amount'];
  123. }
  124. return [
  125. 'total' => $paginator->total(),
  126. 'total_amount' => $totalAmount,
  127. 'total_success' => $totalSuccess,
  128. 'total_fail' => $totalFail,
  129. 'data' => $list];
  130. }
  131. /**
  132. * @description: 同步会员的USDT充值记录
  133. * @param {*} $memberId
  134. * @return {*}
  135. */
  136. public static function syncUsdtRechargeRecords($memberId, $walletInfo = null, $force = false)
  137. {
  138. $walletInfo = $walletInfo ?: WalletService::findOne(['member_id' => $memberId, 'coin' => 'USDT']);
  139. if (empty($walletInfo) || empty($walletInfo->address)) {
  140. return 0;
  141. }
  142. if (!$force && !self::canSyncAddress($walletInfo->address)) {
  143. return 0;
  144. }
  145. try {
  146. $data = TronHelper::getTrc20UsdtRecharges($walletInfo->address);
  147. } catch (\Throwable $e) {
  148. Log::warning('同步USDT充值记录失败', [
  149. 'member_id' => $memberId,
  150. 'address' => $walletInfo->address,
  151. 'error' => $e->getMessage(),
  152. ]);
  153. return 0;
  154. }
  155. if (empty($data)) {
  156. return 0;
  157. }
  158. foreach ($data as $k => $v) {
  159. $v['member_id'] = $walletInfo->member_id ?: $memberId;
  160. $v['net'] = $walletInfo->net;
  161. $v['type'] = static::$MODEL::TYPE_AUTO;
  162. $v['created_at'] = now();
  163. $v['updated_at'] = now();
  164. $data[$k] = $v;
  165. }
  166. $m = new (static::$MODEL);
  167. $result = $m->insertOrIgnore($data);
  168. return $result;
  169. }
  170. /**
  171. * @description: 强制同步并确认单个会员的USDT充值记录
  172. * @param {*} $memberId
  173. * @param bool $syncChain 是否先拉链上新充值
  174. * @param bool $force 是否忽略地址同步节流
  175. * @return array
  176. */
  177. public static function syncAndConfirmMemberRecharge($memberId, $syncChain = true, $force = false)
  178. {
  179. $walletInfo = WalletService::findOne(['member_id' => $memberId, 'coin' => 'USDT']);
  180. if (empty($walletInfo) || empty($walletInfo->address)) {
  181. return [
  182. 'success' => false,
  183. 'message' => '未找到该用户的USDT钱包地址',
  184. 'member_id' => $memberId,
  185. ];
  186. }
  187. $synced = 0;
  188. if ($syncChain) {
  189. $synced = self::syncUsdtRechargeRecords($memberId, $walletInfo, $force);
  190. }
  191. $pendingList = self::model()::where(self::getWhere([
  192. 'member_id' => $memberId,
  193. 'status' => self::model()::STATUS_STAY,
  194. 'type' => self::model()::TYPE_AUTO,
  195. ]))->orderBy('id')->get();
  196. $checked = 0;
  197. $confirmed = 0;
  198. $confirmedTxids = [];
  199. foreach ($pendingList as $item) {
  200. $checked++;
  201. self::handleRechargeConfirmation($item->txid);
  202. $fresh = self::findOne(['id' => $item->id]);
  203. if ($fresh && intval($fresh->status) === self::model()::STATUS_SUCCESS) {
  204. $confirmed++;
  205. $confirmedTxids[] = $fresh->txid;
  206. }
  207. }
  208. $remainingPending = self::model()::where(self::getWhere([
  209. 'member_id' => $memberId,
  210. 'status' => self::model()::STATUS_STAY,
  211. 'type' => self::model()::TYPE_AUTO,
  212. ]))->count();
  213. $collectResult = self::ensureCollectRecordsForMember($memberId);
  214. return [
  215. 'success' => true,
  216. 'member_id' => $memberId,
  217. 'address' => $walletInfo->address,
  218. 'synced' => $synced,
  219. 'checked' => $checked,
  220. 'confirmed' => $confirmed,
  221. 'remaining_pending' => $remainingPending,
  222. 'confirmed_txids' => $confirmedTxids,
  223. 'collect_ensured' => $collectResult['ensured'],
  224. 'collect_addresses' => $collectResult['addresses'],
  225. ];
  226. }
  227. /**
  228. * @description: 为已确认充值补建待归集记录
  229. * @param {*} $memberId
  230. * @return array
  231. */
  232. public static function ensureCollectRecordsForMember($memberId)
  233. {
  234. $list = self::model()::where(self::getWhere([
  235. 'member_id' => $memberId,
  236. 'status' => self::model()::STATUS_SUCCESS,
  237. 'type' => self::model()::TYPE_AUTO,
  238. ]))->orderByDesc('id')->get();
  239. $ensured = 0;
  240. $addresses = [];
  241. $seen = [];
  242. foreach ($list as $item) {
  243. $address = $item->to_address;
  244. if (empty($address) || isset($seen[$address])) {
  245. continue;
  246. }
  247. $seen[$address] = true;
  248. $hasOpenCollect = Collect::where('from_address', $address)
  249. ->whereIn('status', [Collect::STATUS_STAY, Collect::STATUS_START])
  250. ->exists();
  251. if ($hasOpenCollect) {
  252. continue;
  253. }
  254. CollectService::createCollect($address, $item->coin, $item->net);
  255. $ensured++;
  256. $addresses[] = $address;
  257. }
  258. return [
  259. 'ensured' => $ensured,
  260. 'addresses' => $addresses,
  261. ];
  262. }
  263. /**
  264. * @description: 标记用户进入USDT充值同步队列
  265. * @param {*} $memberId
  266. * @return void
  267. */
  268. public static function markUsdtRechargePending($memberId)
  269. {
  270. if (empty($memberId)) {
  271. return;
  272. }
  273. $queue = self::getPendingSyncQueue();
  274. $key = strval($memberId);
  275. $now = time();
  276. $queue[$key] = [
  277. 'member_id' => $memberId,
  278. 'last_synced_at' => $queue[$key]['last_synced_at'] ?? 0,
  279. 'updated_at' => $now,
  280. 'expires_at' => $now + self::PENDING_SYNC_TTL,
  281. ];
  282. self::putPendingSyncQueue($queue);
  283. }
  284. /**
  285. * @description: 同步待处理队列中的USDT充值记录
  286. * @param int $limit
  287. * @return array
  288. */
  289. public static function syncPendingUsdtRechargeRecords($limit = self::SYNC_BATCH_SIZE)
  290. {
  291. self::markRecentActiveMembersPending();
  292. $queue = self::getPendingSyncQueue();
  293. uasort($queue, function ($a, $b) {
  294. return ($a['last_synced_at'] ?? 0) <=> ($b['last_synced_at'] ?? 0);
  295. });
  296. $synced = 0;
  297. $checked = 0;
  298. $now = time();
  299. foreach ($queue as $key => $item) {
  300. if ($checked >= $limit) {
  301. break;
  302. }
  303. if (!empty($item['last_synced_at']) && $now - $item['last_synced_at'] < self::ADDRESS_SYNC_COOLDOWN) {
  304. continue;
  305. }
  306. $walletInfo = WalletService::findOne(['member_id' => $item['member_id'] ?? 0, 'coin' => 'USDT']);
  307. if (empty($walletInfo) || empty($walletInfo->address)) {
  308. unset($queue[$key]);
  309. continue;
  310. }
  311. $synced += self::syncUsdtRechargeRecords($walletInfo->member_id, $walletInfo);
  312. $checked++;
  313. $queue[$key]['last_synced_at'] = time();
  314. $queue[$key]['updated_at'] = time();
  315. }
  316. self::putPendingSyncQueue($queue);
  317. return [
  318. 'synced' => $synced,
  319. 'checked' => $checked,
  320. 'queued' => count($queue),
  321. ];
  322. }
  323. /**
  324. * @description: 兼容旧调用,避免再次全量扫描钱包
  325. * @return array
  326. */
  327. public static function syncAllUsdtRechargeRecords()
  328. {
  329. return self::syncPendingUsdtRechargeRecords();
  330. }
  331. private static function canSyncAddress($address)
  332. {
  333. $addressKey = self::ADDRESS_SYNC_CACHE_PREFIX . $address;
  334. if (Cache::has($addressKey)) {
  335. return false;
  336. }
  337. if (!Cache::add(self::TRONGRID_SYNC_LOCK_KEY, time(), self::TRONGRID_SYNC_LOCK_SECONDS)) {
  338. return false;
  339. }
  340. Cache::put($addressKey, time(), self::ADDRESS_SYNC_COOLDOWN);
  341. return true;
  342. }
  343. private static function getPendingSyncQueue()
  344. {
  345. $queue = Cache::get(self::PENDING_SYNC_CACHE_KEY, []);
  346. if (!is_array($queue)) {
  347. return [];
  348. }
  349. $now = time();
  350. foreach ($queue as $key => $item) {
  351. if (!is_array($item) || empty($item['member_id']) || ($item['expires_at'] ?? 0) < $now) {
  352. unset($queue[$key]);
  353. }
  354. }
  355. return $queue;
  356. }
  357. private static function putPendingSyncQueue(array $queue)
  358. {
  359. Cache::put(self::PENDING_SYNC_CACHE_KEY, $queue, self::PENDING_SYNC_TTL);
  360. }
  361. private static function markRecentActiveMembersPending()
  362. {
  363. if (Cache::has(self::RECENT_ACTIVE_SYNC_CACHE_KEY)) {
  364. return;
  365. }
  366. Cache::put(self::RECENT_ACTIVE_SYNC_CACHE_KEY, time(), self::RECENT_ACTIVE_ENQUEUE_INTERVAL);
  367. $since = date('Y-m-d H:i:s', time() - self::RECENT_ACTIVE_MINUTES * 60);
  368. $members = User::whereNotNull('last_active_time')
  369. ->where('last_active_time', '>=', $since)
  370. ->orderByDesc('last_active_time')
  371. ->limit(self::RECENT_ACTIVE_LIMIT)
  372. ->pluck('member_id');
  373. foreach ($members as $memberId) {
  374. self::markUsdtRechargePending($memberId);
  375. }
  376. }
  377. /**
  378. * @description: 充值确认
  379. * @param {*} $txid
  380. * @return {*}
  381. */
  382. public static function handleRechargeConfirmation($txid)
  383. {
  384. $info = self::findOne(['txid' => $txid]); // 获取充值的信息
  385. // 汇率
  386. $rate = Config::where('field', 'exchange_rate_rmb')->first()->val ?? 1;
  387. // 待处理进行充值
  388. if ($info['status'] == static::$MODEL::STATUS_STAY) {
  389. $result = TronHelper::getTransactionConfirmations($txid);
  390. if ($result['success']) {
  391. $data = [];
  392. $data['block_height'] = $result['block_number'];
  393. $data['confirmations'] = $result['latest_block'] - $result['block_number'];
  394. $data['exchange_rate'] = $rate;
  395. if ($data['confirmations'] >= TronHelper::CONFIRMED_NUMBER) {
  396. $data['status'] = static::$MODEL::STATUS_SUCCESS;
  397. $where = self::getWhere(['txid' => $txid, 'status' => static::$MODEL::STATUS_STAY]);
  398. $recharge = static::$MODEL::where($where)->update($data);
  399. $amount = floatval($info->amount);
  400. $rate_amount = bcmul($amount, $rate, 10); // 汇率转换后分数
  401. // 更新成功,变动可用余额
  402. if ($recharge) {
  403. $balanceData = WalletService::updateBalance($info->member_id, $rate_amount);
  404. BalanceLogService::addLog($info->member_id, $rate_amount, $balanceData['before_balance'], $balanceData['after_balance'], '充值', $info->id, '');
  405. CollectService::createCollect($info->to_address, $info->coin, $info->net);
  406. $amount = floatval($info->amount);
  407. $text = "充值结果通知\n";
  408. $text .= "充值数量:{$amount} USDT\n";
  409. $text .= "充值地址:{$info->to_address}\n";
  410. $text .= "汇率:1 USDT = {$rate} RMB\n";
  411. $text .= "折合金额:" . number_format($rate_amount, 2) . " RMB\n";
  412. $text .= "状态:成功\n";
  413. TopUpService::notifyTransferSuccess($info->member_id, $text);
  414. }
  415. }
  416. }
  417. }
  418. }
  419. /**
  420. * @description: 同步待处理的充值记录
  421. * @return {*}
  422. */
  423. public static function syncRechargeStay()
  424. {
  425. $list = self::findAll(['status' => static::$MODEL::STATUS_STAY, 'type' => static::$MODEL::TYPE_AUTO]);
  426. foreach ($list as $k => $v) {
  427. self::handleRechargeConfirmation($v->txid);
  428. }
  429. }
  430. }