|
|
@@ -5,9 +5,11 @@ namespace App\Services;
|
|
|
|
|
|
use App\Services\BaseService;
|
|
|
use App\Models\Recharge;
|
|
|
+use App\Models\User;
|
|
|
use Illuminate\Support\Facades\DB;
|
|
|
use Illuminate\Support\Collection;
|
|
|
use Illuminate\Support\Facades\Cache;
|
|
|
+use Illuminate\Support\Facades\Log;
|
|
|
use App\Services\WalletService;
|
|
|
use App\Services\CollectService;
|
|
|
use App\Services\UserService;
|
|
|
@@ -21,6 +23,17 @@ use App\Models\Config;
|
|
|
class RechargeService extends BaseService
|
|
|
{
|
|
|
public static string $MODEL = Recharge::class;
|
|
|
+ const PENDING_SYNC_CACHE_KEY = 'usdt_recharge_pending_sync_members';
|
|
|
+ const RECENT_ACTIVE_SYNC_CACHE_KEY = 'usdt_recharge_recent_active_synced_at';
|
|
|
+ const TRONGRID_SYNC_LOCK_KEY = 'usdt_recharge_trongrid_sync_lock';
|
|
|
+ const ADDRESS_SYNC_CACHE_PREFIX = 'usdt_recharge_address_synced_at:';
|
|
|
+ const PENDING_SYNC_TTL = 7200;
|
|
|
+ const ADDRESS_SYNC_COOLDOWN = 60;
|
|
|
+ const TRONGRID_SYNC_LOCK_SECONDS = 2;
|
|
|
+ const SYNC_BATCH_SIZE = 1;
|
|
|
+ const RECENT_ACTIVE_MINUTES = 30;
|
|
|
+ const RECENT_ACTIVE_LIMIT = 20;
|
|
|
+ const RECENT_ACTIVE_ENQUEUE_INTERVAL = 300;
|
|
|
/**
|
|
|
* @description: 模型
|
|
|
* @return {string}
|
|
|
@@ -132,14 +145,28 @@ class RechargeService extends BaseService
|
|
|
* @param {*} $memberId
|
|
|
* @return {*}
|
|
|
*/
|
|
|
- public static function syncUsdtRechargeRecords($memberId, $walletInfo = null)
|
|
|
+ public static function syncUsdtRechargeRecords($memberId, $walletInfo = null, $force = false)
|
|
|
{
|
|
|
$walletInfo = $walletInfo ?: WalletService::findOne(['member_id' => $memberId, 'coin' => 'USDT']);
|
|
|
if (empty($walletInfo) || empty($walletInfo->address)) {
|
|
|
return 0;
|
|
|
}
|
|
|
|
|
|
- $data = TronHelper::getTrc20UsdtRecharges($walletInfo->address);
|
|
|
+ if (!$force && !self::canSyncAddress($walletInfo->address)) {
|
|
|
+ return 0;
|
|
|
+ }
|
|
|
+
|
|
|
+ try {
|
|
|
+ $data = TronHelper::getTrc20UsdtRecharges($walletInfo->address);
|
|
|
+ } catch (\Throwable $e) {
|
|
|
+ Log::warning('同步USDT充值记录失败', [
|
|
|
+ 'member_id' => $memberId,
|
|
|
+ 'address' => $walletInfo->address,
|
|
|
+ 'error' => $e->getMessage(),
|
|
|
+ ]);
|
|
|
+ return 0;
|
|
|
+ }
|
|
|
+
|
|
|
if (empty($data)) {
|
|
|
return 0;
|
|
|
}
|
|
|
@@ -158,23 +185,201 @@ class RechargeService extends BaseService
|
|
|
}
|
|
|
|
|
|
/**
|
|
|
- * @description: 同步所有USDT钱包的新充值记录
|
|
|
- * @return int
|
|
|
+ * @description: 强制同步并确认单个会员的USDT充值记录
|
|
|
+ * @param {*} $memberId
|
|
|
+ * @param bool $syncChain 是否先拉链上新充值
|
|
|
+ * @param bool $force 是否忽略地址同步节流
|
|
|
+ * @return array
|
|
|
*/
|
|
|
- public static function syncAllUsdtRechargeRecords()
|
|
|
+ public static function syncAndConfirmMemberRecharge($memberId, $syncChain = true, $force = false)
|
|
|
{
|
|
|
- $wallets = WalletService::findAll(['coin' => 'USDT']);
|
|
|
- $total = 0;
|
|
|
+ $walletInfo = WalletService::findOne(['member_id' => $memberId, 'coin' => 'USDT']);
|
|
|
+ if (empty($walletInfo) || empty($walletInfo->address)) {
|
|
|
+ return [
|
|
|
+ 'success' => false,
|
|
|
+ 'message' => '未找到该用户的USDT钱包地址',
|
|
|
+ 'member_id' => $memberId,
|
|
|
+ ];
|
|
|
+ }
|
|
|
+
|
|
|
+ $synced = 0;
|
|
|
+ if ($syncChain) {
|
|
|
+ $synced = self::syncUsdtRechargeRecords($memberId, $walletInfo, $force);
|
|
|
+ }
|
|
|
+
|
|
|
+ $pendingList = self::model()::where(self::getWhere([
|
|
|
+ 'member_id' => $memberId,
|
|
|
+ 'status' => self::model()::STATUS_STAY,
|
|
|
+ 'type' => self::model()::TYPE_AUTO,
|
|
|
+ ]))->orderBy('id')->get();
|
|
|
+
|
|
|
+ $checked = 0;
|
|
|
+ $confirmed = 0;
|
|
|
+ $confirmedTxids = [];
|
|
|
+
|
|
|
+ foreach ($pendingList as $item) {
|
|
|
+ $checked++;
|
|
|
+ self::handleRechargeConfirmation($item->txid);
|
|
|
+ $fresh = self::findOne(['id' => $item->id]);
|
|
|
+ if ($fresh && intval($fresh->status) === self::model()::STATUS_SUCCESS) {
|
|
|
+ $confirmed++;
|
|
|
+ $confirmedTxids[] = $fresh->txid;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ $remainingPending = self::model()::where(self::getWhere([
|
|
|
+ 'member_id' => $memberId,
|
|
|
+ 'status' => self::model()::STATUS_STAY,
|
|
|
+ 'type' => self::model()::TYPE_AUTO,
|
|
|
+ ]))->count();
|
|
|
+
|
|
|
+ return [
|
|
|
+ 'success' => true,
|
|
|
+ 'member_id' => $memberId,
|
|
|
+ 'address' => $walletInfo->address,
|
|
|
+ 'synced' => $synced,
|
|
|
+ 'checked' => $checked,
|
|
|
+ 'confirmed' => $confirmed,
|
|
|
+ 'remaining_pending' => $remainingPending,
|
|
|
+ 'confirmed_txids' => $confirmedTxids,
|
|
|
+ ];
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * @description: 标记用户进入USDT充值同步队列
|
|
|
+ * @param {*} $memberId
|
|
|
+ * @return void
|
|
|
+ */
|
|
|
+ public static function markUsdtRechargePending($memberId)
|
|
|
+ {
|
|
|
+ if (empty($memberId)) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ $queue = self::getPendingSyncQueue();
|
|
|
+ $key = strval($memberId);
|
|
|
+ $now = time();
|
|
|
+ $queue[$key] = [
|
|
|
+ 'member_id' => $memberId,
|
|
|
+ 'last_synced_at' => $queue[$key]['last_synced_at'] ?? 0,
|
|
|
+ 'updated_at' => $now,
|
|
|
+ 'expires_at' => $now + self::PENDING_SYNC_TTL,
|
|
|
+ ];
|
|
|
+ self::putPendingSyncQueue($queue);
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * @description: 同步待处理队列中的USDT充值记录
|
|
|
+ * @param int $limit
|
|
|
+ * @return array
|
|
|
+ */
|
|
|
+ public static function syncPendingUsdtRechargeRecords($limit = self::SYNC_BATCH_SIZE)
|
|
|
+ {
|
|
|
+ self::markRecentActiveMembersPending();
|
|
|
+
|
|
|
+ $queue = self::getPendingSyncQueue();
|
|
|
+ uasort($queue, function ($a, $b) {
|
|
|
+ return ($a['last_synced_at'] ?? 0) <=> ($b['last_synced_at'] ?? 0);
|
|
|
+ });
|
|
|
+
|
|
|
+ $synced = 0;
|
|
|
+ $checked = 0;
|
|
|
+ $now = time();
|
|
|
+
|
|
|
+ foreach ($queue as $key => $item) {
|
|
|
+ if ($checked >= $limit) {
|
|
|
+ break;
|
|
|
+ }
|
|
|
|
|
|
- foreach ($wallets as $walletInfo) {
|
|
|
- if (empty($walletInfo->member_id) || empty($walletInfo->address)) {
|
|
|
+ if (!empty($item['last_synced_at']) && $now - $item['last_synced_at'] < self::ADDRESS_SYNC_COOLDOWN) {
|
|
|
continue;
|
|
|
}
|
|
|
|
|
|
- $total += self::syncUsdtRechargeRecords($walletInfo->member_id, $walletInfo);
|
|
|
+ $walletInfo = WalletService::findOne(['member_id' => $item['member_id'] ?? 0, 'coin' => 'USDT']);
|
|
|
+ if (empty($walletInfo) || empty($walletInfo->address)) {
|
|
|
+ unset($queue[$key]);
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ $synced += self::syncUsdtRechargeRecords($walletInfo->member_id, $walletInfo);
|
|
|
+ $checked++;
|
|
|
+ $queue[$key]['last_synced_at'] = time();
|
|
|
+ $queue[$key]['updated_at'] = time();
|
|
|
+ }
|
|
|
+
|
|
|
+ self::putPendingSyncQueue($queue);
|
|
|
+
|
|
|
+ return [
|
|
|
+ 'synced' => $synced,
|
|
|
+ 'checked' => $checked,
|
|
|
+ 'queued' => count($queue),
|
|
|
+ ];
|
|
|
+ }
|
|
|
+
|
|
|
+ /**
|
|
|
+ * @description: 兼容旧调用,避免再次全量扫描钱包
|
|
|
+ * @return array
|
|
|
+ */
|
|
|
+ public static function syncAllUsdtRechargeRecords()
|
|
|
+ {
|
|
|
+ return self::syncPendingUsdtRechargeRecords();
|
|
|
+ }
|
|
|
+
|
|
|
+ private static function canSyncAddress($address)
|
|
|
+ {
|
|
|
+ $addressKey = self::ADDRESS_SYNC_CACHE_PREFIX . $address;
|
|
|
+ if (Cache::has($addressKey)) {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (!Cache::add(self::TRONGRID_SYNC_LOCK_KEY, time(), self::TRONGRID_SYNC_LOCK_SECONDS)) {
|
|
|
+ return false;
|
|
|
}
|
|
|
|
|
|
- return $total;
|
|
|
+ Cache::put($addressKey, time(), self::ADDRESS_SYNC_COOLDOWN);
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
+ private static function getPendingSyncQueue()
|
|
|
+ {
|
|
|
+ $queue = Cache::get(self::PENDING_SYNC_CACHE_KEY, []);
|
|
|
+ if (!is_array($queue)) {
|
|
|
+ return [];
|
|
|
+ }
|
|
|
+
|
|
|
+ $now = time();
|
|
|
+ foreach ($queue as $key => $item) {
|
|
|
+ if (!is_array($item) || empty($item['member_id']) || ($item['expires_at'] ?? 0) < $now) {
|
|
|
+ unset($queue[$key]);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return $queue;
|
|
|
+ }
|
|
|
+
|
|
|
+ private static function putPendingSyncQueue(array $queue)
|
|
|
+ {
|
|
|
+ Cache::put(self::PENDING_SYNC_CACHE_KEY, $queue, self::PENDING_SYNC_TTL);
|
|
|
+ }
|
|
|
+
|
|
|
+ private static function markRecentActiveMembersPending()
|
|
|
+ {
|
|
|
+ if (Cache::has(self::RECENT_ACTIVE_SYNC_CACHE_KEY)) {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ Cache::put(self::RECENT_ACTIVE_SYNC_CACHE_KEY, time(), self::RECENT_ACTIVE_ENQUEUE_INTERVAL);
|
|
|
+
|
|
|
+ $since = date('Y-m-d H:i:s', time() - self::RECENT_ACTIVE_MINUTES * 60);
|
|
|
+ $members = User::whereNotNull('last_active_time')
|
|
|
+ ->where('last_active_time', '>=', $since)
|
|
|
+ ->orderByDesc('last_active_time')
|
|
|
+ ->limit(self::RECENT_ACTIVE_LIMIT)
|
|
|
+ ->pluck('member_id');
|
|
|
+
|
|
|
+ foreach ($members as $memberId) {
|
|
|
+ self::markUsdtRechargePending($memberId);
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
/**
|