| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448 |
- <?php
- namespace App\Services;
- use App\Services\BaseService;
- use App\Models\Recharge;
- use App\Models\User;
- use Illuminate\Support\Facades\DB;
- use Illuminate\Support\Collection;
- use Illuminate\Support\Facades\Cache;
- use Illuminate\Support\Facades\Log;
- use App\Services\WalletService;
- use App\Services\CollectService;
- use App\Services\UserService;
- use App\Services\BalanceLogService;
- use App\Helpers\TronHelper;
- use App\Models\Config;
- /**
- * 用户充值记录
- */
- class RechargeService extends BaseService
- {
- public static string $MODEL = Recharge::class;
- const PENDING_SYNC_CACHE_KEY = 'usdt_recharge_pending_sync_members';
- const RECENT_ACTIVE_SYNC_CACHE_KEY = 'usdt_recharge_recent_active_synced_at';
- const TRONGRID_SYNC_LOCK_KEY = 'usdt_recharge_trongrid_sync_lock';
- const ADDRESS_SYNC_CACHE_PREFIX = 'usdt_recharge_address_synced_at:';
- const PENDING_SYNC_TTL = 7200;
- const ADDRESS_SYNC_COOLDOWN = 60;
- const TRONGRID_SYNC_LOCK_SECONDS = 2;
- const SYNC_BATCH_SIZE = 1;
- const RECENT_ACTIVE_MINUTES = 30;
- const RECENT_ACTIVE_LIMIT = 20;
- const RECENT_ACTIVE_ENQUEUE_INTERVAL = 300;
- /**
- * @description: 模型
- * @return {string}
- */
- public static function model(): string
- {
- return Recharge::class;
- }
- /**
- * @description: 枚举
- * @return {*}
- */
- public static function enum(): string
- {
- return '';
- }
- /**
- * @description: 获取查询条件
- * @param {array} $search 查询内容
- * @return {array}
- */
- public static function getWhere(array $search = []): array
- {
- $where = [];
- if (isset($search['coin']) && !empty($search['coin'])) {
- $where[] = ['coin', '=', $search['coin']];
- }
- if (isset($search['net']) && !empty($search['net'])) {
- $where[] = ['net', '=', $search['net']];
- }
- if (isset($search['to_address']) && !empty($search['to_address'])) {
- $where[] = ['to_address', '=', $search['to_address']];
- }
- if (isset($search['id']) && !empty($search['id'])) {
- $where[] = ['id', '=', $search['id']];
- }
- if (isset($search['member_id']) && !empty($search['member_id'])) {
- $where[] = ['member_id', '=', $search['member_id']];
- }
- if (isset($search['txid']) && !empty($search['txid'])) {
- $where[] = ['txid', '=', $search['txid']];
- }
- if (isset($search['status']) && $search['status'] != '') {
- $where[] = ['status', '=', $search['status']];
- }
- if (isset($search['type']) && $search['type'] != '') {
- $where[] = ['type', '=', $search['type']];
- }
- return $where;
- }
- /**
- * @description: 查询单条数据
- * @param array $search
- * @return \App\Models\Coin|null
- */
- public static function findOne(array $search): ?Recharge
- {
- return static::$MODEL::where(self::getWhere($search))->first();
- }
- /**
- * @description: 查询所有数据
- * @param array $search
- * @return \Illuminate\Database\Eloquent\Collection
- */
- public static function findAll(array $search = [])
- {
- return static::$MODEL::where(self::getWhere($search))->get();
- }
- /**
- * @description: 分页查询
- * @param array $search
- * @return array
- */
- public static function paginate(array $search = [])
- {
- $limit = isset($search['limit']) ? $search['limit'] : 15;
- $paginator = static::$MODEL::where(self::getWhere($search))
- ->with(['member'])
- ->orderBy("created_at", 'desc')
- ->paginate($limit);
- $list = $paginator->items();
- $totalAmount = 0;
- $totalSuccess = 0;
- $totalFail = 0;
- foreach ($list as $item) {
- $item['amount'] = floatval($item['amount']);
- $totalAmount += $item['amount'];
- if ($item['status'] == 1) $totalSuccess += $item['amount'];
- if ($item['status'] == 2) $totalFail += $item['amount'];
- }
- return [
- 'total' => $paginator->total(),
- 'total_amount' => $totalAmount,
- 'total_success' => $totalSuccess,
- 'total_fail' => $totalFail,
- 'data' => $list];
- }
- /**
- * @description: 同步会员的USDT充值记录
- * @param {*} $memberId
- * @return {*}
- */
- public static function syncUsdtRechargeRecords($memberId, $walletInfo = null, $force = false)
- {
- $walletInfo = $walletInfo ?: WalletService::findOne(['member_id' => $memberId, 'coin' => 'USDT']);
- if (empty($walletInfo) || empty($walletInfo->address)) {
- return 0;
- }
- if (!$force && !self::canSyncAddress($walletInfo->address)) {
- return 0;
- }
- try {
- $data = TronHelper::getTrc20UsdtRecharges($walletInfo->address);
- } catch (\Throwable $e) {
- Log::warning('同步USDT充值记录失败', [
- 'member_id' => $memberId,
- 'address' => $walletInfo->address,
- 'error' => $e->getMessage(),
- ]);
- return 0;
- }
- if (empty($data)) {
- return 0;
- }
- foreach ($data as $k => $v) {
- $v['member_id'] = $walletInfo->member_id ?: $memberId;
- $v['net'] = $walletInfo->net;
- $v['type'] = static::$MODEL::TYPE_AUTO;
- $v['created_at'] = now();
- $v['updated_at'] = now();
- $data[$k] = $v;
- }
- $m = new (static::$MODEL);
- $result = $m->insertOrIgnore($data);
- return $result;
- }
- /**
- * @description: 强制同步并确认单个会员的USDT充值记录
- * @param {*} $memberId
- * @param bool $syncChain 是否先拉链上新充值
- * @param bool $force 是否忽略地址同步节流
- * @return array
- */
- public static function syncAndConfirmMemberRecharge($memberId, $syncChain = true, $force = false)
- {
- $walletInfo = WalletService::findOne(['member_id' => $memberId, 'coin' => 'USDT']);
- if (empty($walletInfo) || empty($walletInfo->address)) {
- return [
- 'success' => false,
- 'message' => '未找到该用户的USDT钱包地址',
- 'member_id' => $memberId,
- ];
- }
- $synced = 0;
- if ($syncChain) {
- $synced = self::syncUsdtRechargeRecords($memberId, $walletInfo, $force);
- }
- $pendingList = self::model()::where(self::getWhere([
- 'member_id' => $memberId,
- 'status' => self::model()::STATUS_STAY,
- 'type' => self::model()::TYPE_AUTO,
- ]))->orderBy('id')->get();
- $checked = 0;
- $confirmed = 0;
- $confirmedTxids = [];
- foreach ($pendingList as $item) {
- $checked++;
- self::handleRechargeConfirmation($item->txid);
- $fresh = self::findOne(['id' => $item->id]);
- if ($fresh && intval($fresh->status) === self::model()::STATUS_SUCCESS) {
- $confirmed++;
- $confirmedTxids[] = $fresh->txid;
- }
- }
- $remainingPending = self::model()::where(self::getWhere([
- 'member_id' => $memberId,
- 'status' => self::model()::STATUS_STAY,
- 'type' => self::model()::TYPE_AUTO,
- ]))->count();
- return [
- 'success' => true,
- 'member_id' => $memberId,
- 'address' => $walletInfo->address,
- 'synced' => $synced,
- 'checked' => $checked,
- 'confirmed' => $confirmed,
- 'remaining_pending' => $remainingPending,
- 'confirmed_txids' => $confirmedTxids,
- ];
- }
- /**
- * @description: 标记用户进入USDT充值同步队列
- * @param {*} $memberId
- * @return void
- */
- public static function markUsdtRechargePending($memberId)
- {
- if (empty($memberId)) {
- return;
- }
- $queue = self::getPendingSyncQueue();
- $key = strval($memberId);
- $now = time();
- $queue[$key] = [
- 'member_id' => $memberId,
- 'last_synced_at' => $queue[$key]['last_synced_at'] ?? 0,
- 'updated_at' => $now,
- 'expires_at' => $now + self::PENDING_SYNC_TTL,
- ];
- self::putPendingSyncQueue($queue);
- }
- /**
- * @description: 同步待处理队列中的USDT充值记录
- * @param int $limit
- * @return array
- */
- public static function syncPendingUsdtRechargeRecords($limit = self::SYNC_BATCH_SIZE)
- {
- self::markRecentActiveMembersPending();
- $queue = self::getPendingSyncQueue();
- uasort($queue, function ($a, $b) {
- return ($a['last_synced_at'] ?? 0) <=> ($b['last_synced_at'] ?? 0);
- });
- $synced = 0;
- $checked = 0;
- $now = time();
- foreach ($queue as $key => $item) {
- if ($checked >= $limit) {
- break;
- }
- if (!empty($item['last_synced_at']) && $now - $item['last_synced_at'] < self::ADDRESS_SYNC_COOLDOWN) {
- continue;
- }
- $walletInfo = WalletService::findOne(['member_id' => $item['member_id'] ?? 0, 'coin' => 'USDT']);
- if (empty($walletInfo) || empty($walletInfo->address)) {
- unset($queue[$key]);
- continue;
- }
- $synced += self::syncUsdtRechargeRecords($walletInfo->member_id, $walletInfo);
- $checked++;
- $queue[$key]['last_synced_at'] = time();
- $queue[$key]['updated_at'] = time();
- }
- self::putPendingSyncQueue($queue);
- return [
- 'synced' => $synced,
- 'checked' => $checked,
- 'queued' => count($queue),
- ];
- }
- /**
- * @description: 兼容旧调用,避免再次全量扫描钱包
- * @return array
- */
- public static function syncAllUsdtRechargeRecords()
- {
- return self::syncPendingUsdtRechargeRecords();
- }
- private static function canSyncAddress($address)
- {
- $addressKey = self::ADDRESS_SYNC_CACHE_PREFIX . $address;
- if (Cache::has($addressKey)) {
- return false;
- }
- if (!Cache::add(self::TRONGRID_SYNC_LOCK_KEY, time(), self::TRONGRID_SYNC_LOCK_SECONDS)) {
- return false;
- }
- Cache::put($addressKey, time(), self::ADDRESS_SYNC_COOLDOWN);
- return true;
- }
- private static function getPendingSyncQueue()
- {
- $queue = Cache::get(self::PENDING_SYNC_CACHE_KEY, []);
- if (!is_array($queue)) {
- return [];
- }
- $now = time();
- foreach ($queue as $key => $item) {
- if (!is_array($item) || empty($item['member_id']) || ($item['expires_at'] ?? 0) < $now) {
- unset($queue[$key]);
- }
- }
- return $queue;
- }
- private static function putPendingSyncQueue(array $queue)
- {
- Cache::put(self::PENDING_SYNC_CACHE_KEY, $queue, self::PENDING_SYNC_TTL);
- }
- private static function markRecentActiveMembersPending()
- {
- if (Cache::has(self::RECENT_ACTIVE_SYNC_CACHE_KEY)) {
- return;
- }
- Cache::put(self::RECENT_ACTIVE_SYNC_CACHE_KEY, time(), self::RECENT_ACTIVE_ENQUEUE_INTERVAL);
- $since = date('Y-m-d H:i:s', time() - self::RECENT_ACTIVE_MINUTES * 60);
- $members = User::whereNotNull('last_active_time')
- ->where('last_active_time', '>=', $since)
- ->orderByDesc('last_active_time')
- ->limit(self::RECENT_ACTIVE_LIMIT)
- ->pluck('member_id');
- foreach ($members as $memberId) {
- self::markUsdtRechargePending($memberId);
- }
- }
- /**
- * @description: 充值确认
- * @param {*} $txid
- * @return {*}
- */
- public static function handleRechargeConfirmation($txid)
- {
- $info = self::findOne(['txid' => $txid]); // 获取充值的信息
- // 汇率
- $rate = Config::where('field', 'exchange_rate_rmb')->first()->val ?? 1;
- // 待处理进行充值
- if ($info['status'] == static::$MODEL::STATUS_STAY) {
- $result = TronHelper::getTransactionConfirmations($txid);
- if ($result['success']) {
- $data = [];
- $data['block_height'] = $result['block_number'];
- $data['confirmations'] = $result['latest_block'] - $result['block_number'];
- $data['exchange_rate'] = $rate;
- if ($data['confirmations'] >= TronHelper::CONFIRMED_NUMBER) {
- $data['status'] = static::$MODEL::STATUS_SUCCESS;
- $where = self::getWhere(['txid' => $txid, 'status' => static::$MODEL::STATUS_STAY]);
- $recharge = static::$MODEL::where($where)->update($data);
- $amount = floatval($info->amount);
- $rate_amount = bcmul($amount, $rate, 10); // 汇率转换后分数
- // 更新成功,变动可用余额
- if ($recharge) {
- $balanceData = WalletService::updateBalance($info->member_id, $rate_amount);
- BalanceLogService::addLog($info->member_id, $rate_amount, $balanceData['before_balance'], $balanceData['after_balance'], '充值', $info->id, '');
- CollectService::createCollect($info->to_address, $info->coin, $info->net);
- $amount = floatval($info->amount);
- $text = "充值结果通知\n";
- $text .= "充值数量:{$amount} USDT\n";
- $text .= "充值地址:{$info->to_address}\n";
- $text .= "汇率:1 USDT = {$rate} RMB\n";
- $text .= "折合金额:" . number_format($rate_amount, 2) . " RMB\n";
- $text .= "状态:成功\n";
- TopUpService::notifyTransferSuccess($info->member_id, $text);
- }
- }
- }
- }
- }
- /**
- * @description: 同步待处理的充值记录
- * @return {*}
- */
- public static function syncRechargeStay()
- {
- $list = self::findAll(['status' => static::$MODEL::STATUS_STAY, 'type' => static::$MODEL::TYPE_AUTO]);
- foreach ($list as $k => $v) {
- self::handleRechargeConfirmation($v->txid);
- }
- }
- }
|