RechargeService.php 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503
  1. <?php
  2. namespace App\Services;
  3. use App\Services\BaseService;
  4. use App\Models\Collect;
  5. use App\Models\Recharge;
  6. use App\Models\User;
  7. use Illuminate\Support\Facades\DB;
  8. use Illuminate\Support\Collection;
  9. use Illuminate\Support\Facades\Cache;
  10. use Illuminate\Support\Facades\Log;
  11. use App\Services\WalletService;
  12. use App\Services\CollectService;
  13. use App\Services\UserService;
  14. use App\Services\BalanceLogService;
  15. use App\Helpers\TronHelper;
  16. use App\Models\Config;
  17. /**
  18. * 用户充值记录
  19. */
  20. class RechargeService extends BaseService
  21. {
  22. public static string $MODEL = Recharge::class;
  23. const PENDING_SYNC_CACHE_KEY = 'usdt_recharge_pending_sync_members';
  24. const RECENT_ACTIVE_SYNC_CACHE_KEY = 'usdt_recharge_recent_active_synced_at';
  25. const TRONGRID_SYNC_LOCK_KEY = 'usdt_recharge_trongrid_sync_lock';
  26. const ADDRESS_SYNC_CACHE_PREFIX = 'usdt_recharge_address_synced_at:';
  27. const PENDING_SYNC_TTL = 7200;
  28. const ADDRESS_SYNC_COOLDOWN = 60;
  29. const TRONGRID_SYNC_LOCK_SECONDS = 2;
  30. const SYNC_BATCH_SIZE = 1;
  31. const RECENT_ACTIVE_MINUTES = 30;
  32. const RECENT_ACTIVE_LIMIT = 20;
  33. const RECENT_ACTIVE_ENQUEUE_INTERVAL = 300;
  34. /**
  35. * @description: 模型
  36. * @return {string}
  37. */
  38. public static function model(): string
  39. {
  40. return Recharge::class;
  41. }
  42. /**
  43. * @description: 枚举
  44. * @return {*}
  45. */
  46. public static function enum(): string
  47. {
  48. return '';
  49. }
  50. /**
  51. * @description: 获取查询条件
  52. * @param {array} $search 查询内容
  53. * @return {array}
  54. */
  55. public static function getWhere(array $search = []): array
  56. {
  57. $where = [];
  58. if (isset($search['coin']) && !empty($search['coin'])) {
  59. $where[] = ['coin', '=', $search['coin']];
  60. }
  61. if (isset($search['net']) && !empty($search['net'])) {
  62. $where[] = ['net', '=', $search['net']];
  63. }
  64. if (isset($search['to_address']) && !empty($search['to_address'])) {
  65. $where[] = ['to_address', '=', $search['to_address']];
  66. }
  67. if (isset($search['id']) && !empty($search['id'])) {
  68. $where[] = ['id', '=', $search['id']];
  69. }
  70. if (isset($search['member_id']) && !empty($search['member_id'])) {
  71. $where[] = ['member_id', '=', $search['member_id']];
  72. }
  73. if (isset($search['txid']) && !empty($search['txid'])) {
  74. $where[] = ['txid', '=', $search['txid']];
  75. }
  76. if (isset($search['status']) && $search['status'] != '') {
  77. $where[] = ['status', '=', $search['status']];
  78. }
  79. if (isset($search['type']) && $search['type'] != '') {
  80. $where[] = ['type', '=', $search['type']];
  81. }
  82. return $where;
  83. }
  84. /**
  85. * @description: 查询单条数据
  86. * @param array $search
  87. * @return \App\Models\Coin|null
  88. */
  89. public static function findOne(array $search): ?Recharge
  90. {
  91. return static::$MODEL::where(self::getWhere($search))->first();
  92. }
  93. /**
  94. * @description: 查询所有数据
  95. * @param array $search
  96. * @return \Illuminate\Database\Eloquent\Collection
  97. */
  98. public static function findAll(array $search = [])
  99. {
  100. return static::$MODEL::where(self::getWhere($search))->get();
  101. }
  102. /**
  103. * @description: 分页查询
  104. * @param array $search
  105. * @return array
  106. */
  107. public static function paginate(array $search = [])
  108. {
  109. $limit = isset($search['limit']) ? $search['limit'] : 15;
  110. $first_name = $search['first_name'] ?? '';
  111. $paginator = static::$MODEL::where(self::getWhere($search))
  112. ->with(['member'])
  113. ->when($first_name, function ($query) use ($first_name) {
  114. $query->whereHas('member', function ($q) use ($first_name) {
  115. $q->where('first_name', 'like', "%{$first_name}%");
  116. });
  117. })
  118. ->orderBy("created_at", 'desc')
  119. ->paginate($limit);
  120. $list = $paginator->items();
  121. $totalAmount = 0;
  122. $totalSuccess = 0;
  123. $totalFail = 0;
  124. foreach ($list as $item) {
  125. $item['amount'] = floatval($item['amount']);
  126. $totalAmount += $item['amount'];
  127. if ($item['status'] == 1) $totalSuccess += $item['amount'];
  128. if ($item['status'] == 2) $totalFail += $item['amount'];
  129. }
  130. return [
  131. 'total' => $paginator->total(),
  132. 'total_amount' => $totalAmount,
  133. 'total_success' => $totalSuccess,
  134. 'total_fail' => $totalFail,
  135. 'data' => $list];
  136. }
  137. /**
  138. * @description: 同步会员的USDT充值记录
  139. * @param {*} $memberId
  140. * @return {*}
  141. */
  142. public static function syncUsdtRechargeRecords($memberId, $walletInfo = null, $force = false)
  143. {
  144. $walletInfo = $walletInfo ?: WalletService::findOne(['member_id' => $memberId, 'coin' => 'USDT']);
  145. if (empty($walletInfo) || empty($walletInfo->address)) {
  146. return 0;
  147. }
  148. if (!$force && !self::canSyncAddress($walletInfo->address)) {
  149. return 0;
  150. }
  151. try {
  152. $data = TronHelper::getTrc20UsdtRecharges($walletInfo->address);
  153. } catch (\Throwable $e) {
  154. Log::warning('同步USDT充值记录失败', [
  155. 'member_id' => $memberId,
  156. 'address' => $walletInfo->address,
  157. 'error' => $e->getMessage(),
  158. ]);
  159. return 0;
  160. }
  161. if (empty($data)) {
  162. return 0;
  163. }
  164. foreach ($data as $k => $v) {
  165. $v['member_id'] = $walletInfo->member_id ?: $memberId;
  166. $v['net'] = $walletInfo->net;
  167. $v['type'] = static::$MODEL::TYPE_AUTO;
  168. $v['created_at'] = now();
  169. $v['updated_at'] = now();
  170. $data[$k] = $v;
  171. }
  172. $m = new (static::$MODEL);
  173. $result = $m->insertOrIgnore($data);
  174. return $result;
  175. }
  176. /**
  177. * @description: 强制同步并确认单个会员的USDT充值记录
  178. * @param {*} $memberId
  179. * @param bool $syncChain 是否先拉链上新充值
  180. * @param bool $force 是否忽略地址同步节流
  181. * @return array
  182. */
  183. public static function syncAndConfirmMemberRecharge($memberId, $syncChain = true, $force = false)
  184. {
  185. $walletInfo = WalletService::findOne(['member_id' => $memberId, 'coin' => 'USDT']);
  186. if (empty($walletInfo) || empty($walletInfo->address)) {
  187. return [
  188. 'success' => false,
  189. 'message' => '未找到该用户的USDT钱包地址',
  190. 'member_id' => $memberId,
  191. ];
  192. }
  193. $synced = 0;
  194. if ($syncChain) {
  195. $synced = self::syncUsdtRechargeRecords($memberId, $walletInfo, $force);
  196. }
  197. $pendingList = self::model()::where(self::getWhere([
  198. 'member_id' => $memberId,
  199. 'status' => self::model()::STATUS_STAY,
  200. 'type' => self::model()::TYPE_AUTO,
  201. ]))->orderBy('id')->get();
  202. $checked = 0;
  203. $confirmed = 0;
  204. $confirmedTxids = [];
  205. foreach ($pendingList as $item) {
  206. $checked++;
  207. self::handleRechargeConfirmation($item->txid);
  208. $fresh = self::findOne(['id' => $item->id]);
  209. if ($fresh && intval($fresh->status) === self::model()::STATUS_SUCCESS) {
  210. $confirmed++;
  211. $confirmedTxids[] = $fresh->txid;
  212. }
  213. }
  214. $remainingPending = self::model()::where(self::getWhere([
  215. 'member_id' => $memberId,
  216. 'status' => self::model()::STATUS_STAY,
  217. 'type' => self::model()::TYPE_AUTO,
  218. ]))->count();
  219. $collectResult = self::ensureCollectRecordsForMember($memberId);
  220. return [
  221. 'success' => true,
  222. 'member_id' => $memberId,
  223. 'address' => $walletInfo->address,
  224. 'synced' => $synced,
  225. 'checked' => $checked,
  226. 'confirmed' => $confirmed,
  227. 'remaining_pending' => $remainingPending,
  228. 'confirmed_txids' => $confirmedTxids,
  229. 'collect_ensured' => $collectResult['ensured'],
  230. 'collect_addresses' => $collectResult['addresses'],
  231. ];
  232. }
  233. /**
  234. * @description: 为已确认充值补建待归集记录
  235. * @param {*} $memberId
  236. * @return array
  237. */
  238. public static function ensureCollectRecordsForMember($memberId)
  239. {
  240. $list = self::model()::where(self::getWhere([
  241. 'member_id' => $memberId,
  242. 'status' => self::model()::STATUS_SUCCESS,
  243. 'type' => self::model()::TYPE_AUTO,
  244. ]))->orderByDesc('id')->get();
  245. $ensured = 0;
  246. $addresses = [];
  247. $seen = [];
  248. foreach ($list as $item) {
  249. $address = $item->to_address;
  250. if (empty($address) || isset($seen[$address])) {
  251. continue;
  252. }
  253. $seen[$address] = true;
  254. $hasOpenCollect = Collect::where('from_address', $address)
  255. ->whereIn('status', [Collect::STATUS_STAY, Collect::STATUS_START])
  256. ->exists();
  257. if ($hasOpenCollect) {
  258. continue;
  259. }
  260. CollectService::createCollect($address, $item->coin, $item->net);
  261. $ensured++;
  262. $addresses[] = $address;
  263. }
  264. return [
  265. 'ensured' => $ensured,
  266. 'addresses' => $addresses,
  267. ];
  268. }
  269. /**
  270. * @description: 标记用户进入USDT充值同步队列
  271. * @param {*} $memberId
  272. * @return void
  273. */
  274. public static function markUsdtRechargePending($memberId)
  275. {
  276. if (empty($memberId)) {
  277. return;
  278. }
  279. $queue = self::getPendingSyncQueue();
  280. $key = strval($memberId);
  281. $now = time();
  282. $queue[$key] = [
  283. 'member_id' => $memberId,
  284. 'last_synced_at' => $queue[$key]['last_synced_at'] ?? 0,
  285. 'updated_at' => $now,
  286. 'expires_at' => $now + self::PENDING_SYNC_TTL,
  287. ];
  288. self::putPendingSyncQueue($queue);
  289. }
  290. /**
  291. * @description: 同步待处理队列中的USDT充值记录
  292. * @param int $limit
  293. * @return array
  294. */
  295. public static function syncPendingUsdtRechargeRecords($limit = self::SYNC_BATCH_SIZE)
  296. {
  297. self::markRecentActiveMembersPending();
  298. $queue = self::getPendingSyncQueue();
  299. uasort($queue, function ($a, $b) {
  300. return ($a['last_synced_at'] ?? 0) <=> ($b['last_synced_at'] ?? 0);
  301. });
  302. $synced = 0;
  303. $checked = 0;
  304. $now = time();
  305. foreach ($queue as $key => $item) {
  306. if ($checked >= $limit) {
  307. break;
  308. }
  309. if (!empty($item['last_synced_at']) && $now - $item['last_synced_at'] < self::ADDRESS_SYNC_COOLDOWN) {
  310. continue;
  311. }
  312. $walletInfo = WalletService::findOne(['member_id' => $item['member_id'] ?? 0, 'coin' => 'USDT']);
  313. if (empty($walletInfo) || empty($walletInfo->address)) {
  314. unset($queue[$key]);
  315. continue;
  316. }
  317. $synced += self::syncUsdtRechargeRecords($walletInfo->member_id, $walletInfo);
  318. $checked++;
  319. $queue[$key]['last_synced_at'] = time();
  320. $queue[$key]['updated_at'] = time();
  321. }
  322. self::putPendingSyncQueue($queue);
  323. return [
  324. 'synced' => $synced,
  325. 'checked' => $checked,
  326. 'queued' => count($queue),
  327. ];
  328. }
  329. /**
  330. * @description: 兼容旧调用,避免再次全量扫描钱包
  331. * @return array
  332. */
  333. public static function syncAllUsdtRechargeRecords()
  334. {
  335. return self::syncPendingUsdtRechargeRecords();
  336. }
  337. private static function canSyncAddress($address)
  338. {
  339. $addressKey = self::ADDRESS_SYNC_CACHE_PREFIX . $address;
  340. if (Cache::has($addressKey)) {
  341. return false;
  342. }
  343. if (!Cache::add(self::TRONGRID_SYNC_LOCK_KEY, time(), self::TRONGRID_SYNC_LOCK_SECONDS)) {
  344. return false;
  345. }
  346. Cache::put($addressKey, time(), self::ADDRESS_SYNC_COOLDOWN);
  347. return true;
  348. }
  349. private static function getPendingSyncQueue()
  350. {
  351. $queue = Cache::get(self::PENDING_SYNC_CACHE_KEY, []);
  352. if (!is_array($queue)) {
  353. return [];
  354. }
  355. $now = time();
  356. foreach ($queue as $key => $item) {
  357. if (!is_array($item) || empty($item['member_id']) || ($item['expires_at'] ?? 0) < $now) {
  358. unset($queue[$key]);
  359. }
  360. }
  361. return $queue;
  362. }
  363. private static function putPendingSyncQueue(array $queue)
  364. {
  365. Cache::put(self::PENDING_SYNC_CACHE_KEY, $queue, self::PENDING_SYNC_TTL);
  366. }
  367. private static function markRecentActiveMembersPending()
  368. {
  369. if (Cache::has(self::RECENT_ACTIVE_SYNC_CACHE_KEY)) {
  370. return;
  371. }
  372. Cache::put(self::RECENT_ACTIVE_SYNC_CACHE_KEY, time(), self::RECENT_ACTIVE_ENQUEUE_INTERVAL);
  373. $since = date('Y-m-d H:i:s', time() - self::RECENT_ACTIVE_MINUTES * 60);
  374. $members = User::whereNotNull('last_active_time')
  375. ->where('last_active_time', '>=', $since)
  376. ->orderByDesc('last_active_time')
  377. ->limit(self::RECENT_ACTIVE_LIMIT)
  378. ->pluck('member_id');
  379. foreach ($members as $memberId) {
  380. self::markUsdtRechargePending($memberId);
  381. }
  382. }
  383. /**
  384. * @description: 充值确认
  385. * @param {*} $txid
  386. * @return {*}
  387. */
  388. public static function handleRechargeConfirmation($txid)
  389. {
  390. $info = self::findOne(['txid' => $txid]); // 获取充值的信息
  391. // 汇率
  392. $rate = Config::where('field', 'exchange_rate_rmb')->first()->val ?? 1;
  393. // 待处理进行充值
  394. if ($info['status'] == static::$MODEL::STATUS_STAY) {
  395. $result = TronHelper::getTransactionConfirmations($txid);
  396. if ($result['success']) {
  397. $data = [];
  398. $data['block_height'] = $result['block_number'];
  399. $data['confirmations'] = $result['latest_block'] - $result['block_number'];
  400. $data['exchange_rate'] = $rate;
  401. if ($data['confirmations'] >= TronHelper::CONFIRMED_NUMBER) {
  402. $data['status'] = static::$MODEL::STATUS_SUCCESS;
  403. $where = self::getWhere(['txid' => $txid, 'status' => static::$MODEL::STATUS_STAY]);
  404. $recharge = static::$MODEL::where($where)->update($data);
  405. $amount = floatval($info->amount);
  406. $rate_amount = bcmul($amount, $rate, 10); // 汇率转换后分数
  407. // 更新成功,变动可用余额
  408. if ($recharge) {
  409. $balanceData = WalletService::updateBalance($info->member_id, $rate_amount);
  410. BalanceLogService::addLog($info->member_id, $rate_amount, $balanceData['before_balance'], $balanceData['after_balance'], '充值', $info->id, '');
  411. CollectService::createCollect($info->to_address, $info->coin, $info->net);
  412. $amount = floatval($info->amount);
  413. $text = "充值结果通知\n";
  414. $text .= "充值数量:{$amount} USDT\n";
  415. $text .= "充值地址:{$info->to_address}\n";
  416. $text .= "汇率:1 USDT = {$rate} RMB\n";
  417. $text .= "折合金额:" . number_format($rate_amount, 2) . " RMB\n";
  418. $text .= "状态:成功\n";
  419. TopUpService::notifyTransferSuccess($info->member_id, $text);
  420. }
  421. }
  422. }
  423. }
  424. }
  425. /**
  426. * @description: 同步待处理的充值记录
  427. * @return {*}
  428. */
  429. public static function syncRechargeStay()
  430. {
  431. $list = self::findAll(['status' => static::$MODEL::STATUS_STAY, 'type' => static::$MODEL::TYPE_AUTO]);
  432. foreach ($list as $k => $v) {
  433. self::handleRechargeConfirmation($v->txid);
  434. }
  435. }
  436. }