first(); } /** * @description: 查询所有数据 * @param array $search * @return \Illuminate\Database\Eloquent\Collection */ public static function findAll(array $search = []) { return static::$MODEL::where(self::getWhere($search))->get(); } /** * @description: 分页查询 * @param array $search * @return array */ public static function paginate(array $search = []) { $limit = isset($search['limit']) ? $search['limit'] : 15; $paginator = static::$MODEL::where(self::getWhere($search)) ->with(['member']) ->orderBy("created_at", 'desc') ->paginate($limit); $list = $paginator->items(); $totalAmount = 0; $totalSuccess = 0; $totalFail = 0; foreach ($list as $item) { $item['amount'] = floatval($item['amount']); $totalAmount += $item['amount']; if ($item['status'] == 1) $totalSuccess += $item['amount']; if ($item['status'] == 2) $totalFail += $item['amount']; } return [ 'total' => $paginator->total(), 'total_amount' => $totalAmount, 'total_success' => $totalSuccess, 'total_fail' => $totalFail, 'data' => $list]; } /** * @description: 同步会员的USDT充值记录 * @param {*} $memberId * @return {*} */ public static function syncUsdtRechargeRecords($memberId, $walletInfo = null, $force = false) { $walletInfo = $walletInfo ?: WalletService::findOne(['member_id' => $memberId, 'coin' => 'USDT']); if (empty($walletInfo) || empty($walletInfo->address)) { return 0; } if (!$force && !self::canSyncAddress($walletInfo->address)) { return 0; } try { $data = TronHelper::getTrc20UsdtRecharges($walletInfo->address); } catch (\Throwable $e) { Log::warning('同步USDT充值记录失败', [ 'member_id' => $memberId, 'address' => $walletInfo->address, 'error' => $e->getMessage(), ]); return 0; } if (empty($data)) { return 0; } foreach ($data as $k => $v) { $v['member_id'] = $walletInfo->member_id ?: $memberId; $v['net'] = $walletInfo->net; $v['type'] = static::$MODEL::TYPE_AUTO; $v['created_at'] = now(); $v['updated_at'] = now(); $data[$k] = $v; } $m = new (static::$MODEL); $result = $m->insertOrIgnore($data); return $result; } /** * @description: 强制同步并确认单个会员的USDT充值记录 * @param {*} $memberId * @param bool $syncChain 是否先拉链上新充值 * @param bool $force 是否忽略地址同步节流 * @return array */ public static function syncAndConfirmMemberRecharge($memberId, $syncChain = true, $force = false) { $walletInfo = WalletService::findOne(['member_id' => $memberId, 'coin' => 'USDT']); if (empty($walletInfo) || empty($walletInfo->address)) { return [ 'success' => false, 'message' => '未找到该用户的USDT钱包地址', 'member_id' => $memberId, ]; } $synced = 0; if ($syncChain) { $synced = self::syncUsdtRechargeRecords($memberId, $walletInfo, $force); } $pendingList = self::model()::where(self::getWhere([ 'member_id' => $memberId, 'status' => self::model()::STATUS_STAY, 'type' => self::model()::TYPE_AUTO, ]))->orderBy('id')->get(); $checked = 0; $confirmed = 0; $confirmedTxids = []; foreach ($pendingList as $item) { $checked++; self::handleRechargeConfirmation($item->txid); $fresh = self::findOne(['id' => $item->id]); if ($fresh && intval($fresh->status) === self::model()::STATUS_SUCCESS) { $confirmed++; $confirmedTxids[] = $fresh->txid; } } $remainingPending = self::model()::where(self::getWhere([ 'member_id' => $memberId, 'status' => self::model()::STATUS_STAY, 'type' => self::model()::TYPE_AUTO, ]))->count(); $collectResult = self::ensureCollectRecordsForMember($memberId); return [ 'success' => true, 'member_id' => $memberId, 'address' => $walletInfo->address, 'synced' => $synced, 'checked' => $checked, 'confirmed' => $confirmed, 'remaining_pending' => $remainingPending, 'confirmed_txids' => $confirmedTxids, 'collect_ensured' => $collectResult['ensured'], 'collect_addresses' => $collectResult['addresses'], ]; } /** * @description: 为已确认充值补建待归集记录 * @param {*} $memberId * @return array */ public static function ensureCollectRecordsForMember($memberId) { $list = self::model()::where(self::getWhere([ 'member_id' => $memberId, 'status' => self::model()::STATUS_SUCCESS, 'type' => self::model()::TYPE_AUTO, ]))->orderByDesc('id')->get(); $ensured = 0; $addresses = []; $seen = []; foreach ($list as $item) { $address = $item->to_address; if (empty($address) || isset($seen[$address])) { continue; } $seen[$address] = true; $hasOpenCollect = Collect::where('from_address', $address) ->whereIn('status', [Collect::STATUS_STAY, Collect::STATUS_START]) ->exists(); if ($hasOpenCollect) { continue; } CollectService::createCollect($address, $item->coin, $item->net); $ensured++; $addresses[] = $address; } return [ 'ensured' => $ensured, 'addresses' => $addresses, ]; } /** * @description: 标记用户进入USDT充值同步队列 * @param {*} $memberId * @return void */ public static function markUsdtRechargePending($memberId) { if (empty($memberId)) { return; } $queue = self::getPendingSyncQueue(); $key = strval($memberId); $now = time(); $queue[$key] = [ 'member_id' => $memberId, 'last_synced_at' => $queue[$key]['last_synced_at'] ?? 0, 'updated_at' => $now, 'expires_at' => $now + self::PENDING_SYNC_TTL, ]; self::putPendingSyncQueue($queue); } /** * @description: 同步待处理队列中的USDT充值记录 * @param int $limit * @return array */ public static function syncPendingUsdtRechargeRecords($limit = self::SYNC_BATCH_SIZE) { self::markRecentActiveMembersPending(); $queue = self::getPendingSyncQueue(); uasort($queue, function ($a, $b) { return ($a['last_synced_at'] ?? 0) <=> ($b['last_synced_at'] ?? 0); }); $synced = 0; $checked = 0; $now = time(); foreach ($queue as $key => $item) { if ($checked >= $limit) { break; } if (!empty($item['last_synced_at']) && $now - $item['last_synced_at'] < self::ADDRESS_SYNC_COOLDOWN) { continue; } $walletInfo = WalletService::findOne(['member_id' => $item['member_id'] ?? 0, 'coin' => 'USDT']); if (empty($walletInfo) || empty($walletInfo->address)) { unset($queue[$key]); continue; } $synced += self::syncUsdtRechargeRecords($walletInfo->member_id, $walletInfo); $checked++; $queue[$key]['last_synced_at'] = time(); $queue[$key]['updated_at'] = time(); } self::putPendingSyncQueue($queue); return [ 'synced' => $synced, 'checked' => $checked, 'queued' => count($queue), ]; } /** * @description: 兼容旧调用,避免再次全量扫描钱包 * @return array */ public static function syncAllUsdtRechargeRecords() { return self::syncPendingUsdtRechargeRecords(); } private static function canSyncAddress($address) { $addressKey = self::ADDRESS_SYNC_CACHE_PREFIX . $address; if (Cache::has($addressKey)) { return false; } if (!Cache::add(self::TRONGRID_SYNC_LOCK_KEY, time(), self::TRONGRID_SYNC_LOCK_SECONDS)) { return false; } Cache::put($addressKey, time(), self::ADDRESS_SYNC_COOLDOWN); return true; } private static function getPendingSyncQueue() { $queue = Cache::get(self::PENDING_SYNC_CACHE_KEY, []); if (!is_array($queue)) { return []; } $now = time(); foreach ($queue as $key => $item) { if (!is_array($item) || empty($item['member_id']) || ($item['expires_at'] ?? 0) < $now) { unset($queue[$key]); } } return $queue; } private static function putPendingSyncQueue(array $queue) { Cache::put(self::PENDING_SYNC_CACHE_KEY, $queue, self::PENDING_SYNC_TTL); } private static function markRecentActiveMembersPending() { if (Cache::has(self::RECENT_ACTIVE_SYNC_CACHE_KEY)) { return; } Cache::put(self::RECENT_ACTIVE_SYNC_CACHE_KEY, time(), self::RECENT_ACTIVE_ENQUEUE_INTERVAL); $since = date('Y-m-d H:i:s', time() - self::RECENT_ACTIVE_MINUTES * 60); $members = User::whereNotNull('last_active_time') ->where('last_active_time', '>=', $since) ->orderByDesc('last_active_time') ->limit(self::RECENT_ACTIVE_LIMIT) ->pluck('member_id'); foreach ($members as $memberId) { self::markUsdtRechargePending($memberId); } } /** * @description: 充值确认 * @param {*} $txid * @return {*} */ public static function handleRechargeConfirmation($txid) { $info = self::findOne(['txid' => $txid]); // 获取充值的信息 // 汇率 $rate = Config::where('field', 'exchange_rate_rmb')->first()->val ?? 1; // 待处理进行充值 if ($info['status'] == static::$MODEL::STATUS_STAY) { $result = TronHelper::getTransactionConfirmations($txid); if ($result['success']) { $data = []; $data['block_height'] = $result['block_number']; $data['confirmations'] = $result['latest_block'] - $result['block_number']; $data['exchange_rate'] = $rate; if ($data['confirmations'] >= TronHelper::CONFIRMED_NUMBER) { $data['status'] = static::$MODEL::STATUS_SUCCESS; $where = self::getWhere(['txid' => $txid, 'status' => static::$MODEL::STATUS_STAY]); $recharge = static::$MODEL::where($where)->update($data); $amount = floatval($info->amount); $rate_amount = bcmul($amount, $rate, 10); // 汇率转换后分数 // 更新成功,变动可用余额 if ($recharge) { $balanceData = WalletService::updateBalance($info->member_id, $rate_amount); BalanceLogService::addLog($info->member_id, $rate_amount, $balanceData['before_balance'], $balanceData['after_balance'], '充值', $info->id, ''); CollectService::createCollect($info->to_address, $info->coin, $info->net); $amount = floatval($info->amount); $text = "充值结果通知\n"; $text .= "充值数量:{$amount} USDT\n"; $text .= "充值地址:{$info->to_address}\n"; $text .= "汇率:1 USDT = {$rate} RMB\n"; $text .= "折合金额:" . number_format($rate_amount, 2) . " RMB\n"; $text .= "状态:成功\n"; TopUpService::notifyTransferSuccess($info->member_id, $text); } } } } } /** * @description: 同步待处理的充值记录 * @return {*} */ public static function syncRechargeStay() { $list = self::findAll(['status' => static::$MODEL::STATUS_STAY, 'type' => static::$MODEL::TYPE_AUTO]); foreach ($list as $k => $v) { self::handleRechargeConfirmation($v->txid); } } }