provider-fallback.js 23 KB


  1. /**
  2. * A **FallbackProvider** provides resilience, security and performance
  3. * in a way that is customizable and configurable.
  4. *
  5. * @_section: api/providers/fallback-provider:Fallback Provider [about-fallback-provider]
  6. */
  7. import { assert, assertArgument, getBigInt, getNumber, isError } from "../utils/index.js";
  8. import { AbstractProvider } from "./abstract-provider.js";
  9. import { Network } from "./network.js";
  10. const BN_1 = BigInt("1");
  11. const BN_2 = BigInt("2");
  12. function shuffle(array) {
  13. for (let i = array.length - 1; i > 0; i--) {
  14. const j = Math.floor(Math.random() * (i + 1));
  15. const tmp = array[i];
  16. array[i] = array[j];
  17. array[j] = tmp;
  18. }
  19. }
  20. function stall(duration) {
  21. return new Promise((resolve) => { setTimeout(resolve, duration); });
  22. }
  23. function getTime() { return (new Date()).getTime(); }
  24. function stringify(value) {
  25. return JSON.stringify(value, (key, value) => {
  26. if (typeof (value) === "bigint") {
  27. return { type: "bigint", value: value.toString() };
  28. }
  29. return value;
  30. });
  31. }
  32. ;
  33. const defaultConfig = { stallTimeout: 400, priority: 1, weight: 1 };
  34. const defaultState = {
  35. blockNumber: -2, requests: 0, lateResponses: 0, errorResponses: 0,
  36. outOfSync: -1, unsupportedEvents: 0, rollingDuration: 0, score: 0,
  37. _network: null, _updateNumber: null, _totalTime: 0,
  38. _lastFatalError: null, _lastFatalErrorTimestamp: 0
  39. };
  40. async function waitForSync(config, blockNumber) {
  41. while (config.blockNumber < 0 || config.blockNumber < blockNumber) {
  42. if (!config._updateNumber) {
  43. config._updateNumber = (async () => {
  44. try {
  45. const blockNumber = await config.provider.getBlockNumber();
  46. if (blockNumber > config.blockNumber) {
  47. config.blockNumber = blockNumber;
  48. }
  49. }
  50. catch (error) {
  51. config.blockNumber = -2;
  52. config._lastFatalError = error;
  53. config._lastFatalErrorTimestamp = getTime();
  54. }
  55. config._updateNumber = null;
  56. })();
  57. }
  58. await config._updateNumber;
  59. config.outOfSync++;
  60. if (config._lastFatalError) {
  61. break;
  62. }
  63. }
  64. }
  65. function _normalize(value) {
  66. if (value == null) {
  67. return "null";
  68. }
  69. if (Array.isArray(value)) {
  70. return "[" + (value.map(_normalize)).join(",") + "]";
  71. }
  72. if (typeof (value) === "object" && typeof (value.toJSON) === "function") {
  73. return _normalize(value.toJSON());
  74. }
  75. switch (typeof (value)) {
  76. case "boolean":
  77. case "symbol":
  78. return value.toString();
  79. case "bigint":
  80. case "number":
  81. return BigInt(value).toString();
  82. case "string":
  83. return JSON.stringify(value);
  84. case "object": {
  85. const keys = Object.keys(value);
  86. keys.sort();
  87. return "{" + keys.map((k) => `${JSON.stringify(k)}:${_normalize(value[k])}`).join(",") + "}";
  88. }
  89. }
  90. console.log("Could not serialize", value);
  91. throw new Error("Hmm...");
  92. }
  93. function normalizeResult(value) {
  94. if ("error" in value) {
  95. const error = value.error;
  96. return { tag: _normalize(error), value: error };
  97. }
  98. const result = value.result;
  99. return { tag: _normalize(result), value: result };
  100. }
  101. // This strategy picks the highest weight result, as long as the weight is
  102. // equal to or greater than quorum
  103. function checkQuorum(quorum, results) {
  104. const tally = new Map();
  105. for (const { value, tag, weight } of results) {
  106. const t = tally.get(tag) || { value, weight: 0 };
  107. t.weight += weight;
  108. tally.set(tag, t);
  109. }
  110. let best = null;
  111. for (const r of tally.values()) {
  112. if (r.weight >= quorum && (!best || r.weight > best.weight)) {
  113. best = r;
  114. }
  115. }
  116. if (best) {
  117. return best.value;
  118. }
  119. return undefined;
  120. }
  121. function getMedian(quorum, results) {
  122. let resultWeight = 0;
  123. const errorMap = new Map();
  124. let bestError = null;
  125. const values = [];
  126. for (const { value, tag, weight } of results) {
  127. if (value instanceof Error) {
  128. const e = errorMap.get(tag) || { value, weight: 0 };
  129. e.weight += weight;
  130. errorMap.set(tag, e);
  131. if (bestError == null || e.weight > bestError.weight) {
  132. bestError = e;
  133. }
  134. }
  135. else {
  136. values.push(BigInt(value));
  137. resultWeight += weight;
  138. }
  139. }
  140. if (resultWeight < quorum) {
  141. // We have quorum for an error
  142. if (bestError && bestError.weight >= quorum) {
  143. return bestError.value;
  144. }
  145. // We do not have quorum for a result
  146. return undefined;
  147. }
  148. // Get the sorted values
  149. values.sort((a, b) => ((a < b) ? -1 : (b > a) ? 1 : 0));
  150. const mid = Math.floor(values.length / 2);
  151. // Odd-length; take the middle value
  152. if (values.length % 2) {
  153. return values[mid];
  154. }
  155. // Even length; take the ceiling of the mean of the center two values
  156. return (values[mid - 1] + values[mid] + BN_1) / BN_2;
  157. }
  158. function getAnyResult(quorum, results) {
  159. // If any value or error meets quorum, that is our preferred result
  160. const result = checkQuorum(quorum, results);
  161. if (result !== undefined) {
  162. return result;
  163. }
  164. // Otherwise, do we have any result?
  165. for (const r of results) {
  166. if (r.value) {
  167. return r.value;
  168. }
  169. }
  170. // Nope!
  171. return undefined;
  172. }
  173. function getFuzzyMode(quorum, results) {
  174. if (quorum === 1) {
  175. return getNumber(getMedian(quorum, results), "%internal");
  176. }
  177. const tally = new Map();
  178. const add = (result, weight) => {
  179. const t = tally.get(result) || { result, weight: 0 };
  180. t.weight += weight;
  181. tally.set(result, t);
  182. };
  183. for (const { weight, value } of results) {
  184. const r = getNumber(value);
  185. add(r - 1, weight);
  186. add(r, weight);
  187. add(r + 1, weight);
  188. }
  189. let bestWeight = 0;
  190. let bestResult = undefined;
  191. for (const { weight, result } of tally.values()) {
  192. // Use this result, if this result meets quorum and has either:
  193. // - a better weight
  194. // - or equal weight, but the result is larger
  195. if (weight >= quorum && (weight > bestWeight || (bestResult != null && weight === bestWeight && result > bestResult))) {
  196. bestWeight = weight;
  197. bestResult = result;
  198. }
  199. }
  200. return bestResult;
  201. }
  202. /**
  203. * A **FallbackProvider** manages several [[Providers]] providing
  204. * resilience by switching between slow or misbehaving nodes, security
  205. * by requiring multiple backends to aggree and performance by allowing
  206. * faster backends to respond earlier.
  207. *
  208. */
  209. export class FallbackProvider extends AbstractProvider {
  210. /**
  211. * The number of backends that must agree on a value before it is
  212. * accpeted.
  213. */
  214. quorum;
  215. /**
  216. * @_ignore:
  217. */
  218. eventQuorum;
  219. /**
  220. * @_ignore:
  221. */
  222. eventWorkers;
  223. #configs;
  224. #height;
  225. #initialSyncPromise;
  226. /**
  227. * Creates a new **FallbackProvider** with %%providers%% connected to
  228. * %%network%%.
  229. *
  230. * If a [[Provider]] is included in %%providers%%, defaults are used
  231. * for the configuration.
  232. */
  233. constructor(providers, network, options) {
  234. super(network, options);
  235. this.#configs = providers.map((p) => {
  236. if (p instanceof AbstractProvider) {
  237. return Object.assign({ provider: p }, defaultConfig, defaultState);
  238. }
  239. else {
  240. return Object.assign({}, defaultConfig, p, defaultState);
  241. }
  242. });
  243. this.#height = -2;
  244. this.#initialSyncPromise = null;
  245. if (options && options.quorum != null) {
  246. this.quorum = options.quorum;
  247. }
  248. else {
  249. this.quorum = Math.ceil(this.#configs.reduce((accum, config) => {
  250. accum += config.weight;
  251. return accum;
  252. }, 0) / 2);
  253. }
  254. this.eventQuorum = 1;
  255. this.eventWorkers = 1;
  256. assertArgument(this.quorum <= this.#configs.reduce((a, c) => (a + c.weight), 0), "quorum exceed provider weight", "quorum", this.quorum);
  257. }
  258. get providerConfigs() {
  259. return this.#configs.map((c) => {
  260. const result = Object.assign({}, c);
  261. for (const key in result) {
  262. if (key[0] === "_") {
  263. delete result[key];
  264. }
  265. }
  266. return result;
  267. });
  268. }
  269. async _detectNetwork() {
  270. return Network.from(getBigInt(await this._perform({ method: "chainId" })));
  271. }
  272. // @TODO: Add support to select providers to be the event subscriber
  273. //_getSubscriber(sub: Subscription): Subscriber {
  274. // throw new Error("@TODO");
  275. //}
  276. /**
  277. * Transforms a %%req%% into the correct method call on %%provider%%.
  278. */
  279. async _translatePerform(provider, req) {
  280. switch (req.method) {
  281. case "broadcastTransaction":
  282. return await provider.broadcastTransaction(req.signedTransaction);
  283. case "call":
  284. return await provider.call(Object.assign({}, req.transaction, { blockTag: req.blockTag }));
  285. case "chainId":
  286. return (await provider.getNetwork()).chainId;
  287. case "estimateGas":
  288. return await provider.estimateGas(req.transaction);
  289. case "getBalance":
  290. return await provider.getBalance(req.address, req.blockTag);
  291. case "getBlock": {
  292. const block = ("blockHash" in req) ? req.blockHash : req.blockTag;
  293. return await provider.getBlock(block, req.includeTransactions);
  294. }
  295. case "getBlockNumber":
  296. return await provider.getBlockNumber();
  297. case "getCode":
  298. return await provider.getCode(req.address, req.blockTag);
  299. case "getGasPrice":
  300. return (await provider.getFeeData()).gasPrice;
  301. case "getPriorityFee":
  302. return (await provider.getFeeData()).maxPriorityFeePerGas;
  303. case "getLogs":
  304. return await provider.getLogs(req.filter);
  305. case "getStorage":
  306. return await provider.getStorage(req.address, req.position, req.blockTag);
  307. case "getTransaction":
  308. return await provider.getTransaction(req.hash);
  309. case "getTransactionCount":
  310. return await provider.getTransactionCount(req.address, req.blockTag);
  311. case "getTransactionReceipt":
  312. return await provider.getTransactionReceipt(req.hash);
  313. case "getTransactionResult":
  314. return await provider.getTransactionResult(req.hash);
  315. }
  316. }
  317. // Grab the next (random) config that is not already part of
  318. // the running set
  319. #getNextConfig(running) {
  320. // @TODO: Maybe do a check here to favour (heavily) providers that
  321. // do not require waitForSync and disfavour providers that
  322. // seem down-ish or are behaving slowly
  323. const configs = Array.from(running).map((r) => r.config);
  324. // Shuffle the states, sorted by priority
  325. const allConfigs = this.#configs.slice();
  326. shuffle(allConfigs);
  327. allConfigs.sort((a, b) => (a.priority - b.priority));
  328. for (const config of allConfigs) {
  329. if (config._lastFatalError) {
  330. continue;
  331. }
  332. if (configs.indexOf(config) === -1) {
  333. return config;
  334. }
  335. }
  336. return null;
  337. }
  338. // Adds a new runner (if available) to running.
  339. #addRunner(running, req) {
  340. const config = this.#getNextConfig(running);
  341. // No runners available
  342. if (config == null) {
  343. return null;
  344. }
  345. // Create a new runner
  346. const runner = {
  347. config, result: null, didBump: false,
  348. perform: null, staller: null
  349. };
  350. const now = getTime();
  351. // Start performing this operation
  352. runner.perform = (async () => {
  353. try {
  354. config.requests++;
  355. const result = await this._translatePerform(config.provider, req);
  356. runner.result = { result };
  357. }
  358. catch (error) {
  359. config.errorResponses++;
  360. runner.result = { error };
  361. }
  362. const dt = (getTime() - now);
  363. config._totalTime += dt;
  364. config.rollingDuration = 0.95 * config.rollingDuration + 0.05 * dt;
  365. runner.perform = null;
  366. })();
  367. // Start a staller; when this times out, it's time to force
  368. // kicking off another runner because we are taking too long
  369. runner.staller = (async () => {
  370. await stall(config.stallTimeout);
  371. runner.staller = null;
  372. })();
  373. running.add(runner);
  374. return runner;
  375. }
  376. // Initializes the blockNumber and network for each runner and
  377. // blocks until initialized
  378. async #initialSync() {
  379. let initialSync = this.#initialSyncPromise;
  380. if (!initialSync) {
  381. const promises = [];
  382. this.#configs.forEach((config) => {
  383. promises.push((async () => {
  384. await waitForSync(config, 0);
  385. if (!config._lastFatalError) {
  386. config._network = await config.provider.getNetwork();
  387. }
  388. })());
  389. });
  390. this.#initialSyncPromise = initialSync = (async () => {
  391. // Wait for all providers to have a block number and network
  392. await Promise.all(promises);
  393. // Check all the networks match
  394. let chainId = null;
  395. for (const config of this.#configs) {
  396. if (config._lastFatalError) {
  397. continue;
  398. }
  399. const network = (config._network);
  400. if (chainId == null) {
  401. chainId = network.chainId;
  402. }
  403. else if (network.chainId !== chainId) {
  404. assert(false, "cannot mix providers on different networks", "UNSUPPORTED_OPERATION", {
  405. operation: "new FallbackProvider"
  406. });
  407. }
  408. }
  409. })();
  410. }
  411. await initialSync;
  412. }
  413. async #checkQuorum(running, req) {
  414. // Get all the result objects
  415. const results = [];
  416. for (const runner of running) {
  417. if (runner.result != null) {
  418. const { tag, value } = normalizeResult(runner.result);
  419. results.push({ tag, value, weight: runner.config.weight });
  420. }
  421. }
  422. // Are there enough results to event meet quorum?
  423. if (results.reduce((a, r) => (a + r.weight), 0) < this.quorum) {
  424. return undefined;
  425. }
  426. switch (req.method) {
  427. case "getBlockNumber": {
  428. // We need to get the bootstrap block height
  429. if (this.#height === -2) {
  430. this.#height = Math.ceil(getNumber(getMedian(this.quorum, this.#configs.filter((c) => (!c._lastFatalError)).map((c) => ({
  431. value: c.blockNumber,
  432. tag: getNumber(c.blockNumber).toString(),
  433. weight: c.weight
  434. })))));
  435. }
  436. // Find the mode across all the providers, allowing for
  437. // a little drift between block heights
  438. const mode = getFuzzyMode(this.quorum, results);
  439. if (mode === undefined) {
  440. return undefined;
  441. }
  442. if (mode > this.#height) {
  443. this.#height = mode;
  444. }
  445. return this.#height;
  446. }
  447. case "getGasPrice":
  448. case "getPriorityFee":
  449. case "estimateGas":
  450. return getMedian(this.quorum, results);
  451. case "getBlock":
  452. // Pending blocks are in the mempool and already
  453. // quite untrustworthy; just grab anything
  454. if ("blockTag" in req && req.blockTag === "pending") {
  455. return getAnyResult(this.quorum, results);
  456. }
  457. return checkQuorum(this.quorum, results);
  458. case "call":
  459. case "chainId":
  460. case "getBalance":
  461. case "getTransactionCount":
  462. case "getCode":
  463. case "getStorage":
  464. case "getTransaction":
  465. case "getTransactionReceipt":
  466. case "getLogs":
  467. return checkQuorum(this.quorum, results);
  468. case "broadcastTransaction":
  469. return getAnyResult(this.quorum, results);
  470. }
  471. assert(false, "unsupported method", "UNSUPPORTED_OPERATION", {
  472. operation: `_perform(${stringify(req.method)})`
  473. });
  474. }
  475. async #waitForQuorum(running, req) {
  476. if (running.size === 0) {
  477. throw new Error("no runners?!");
  478. }
  479. // Any promises that are interesting to watch for; an expired stall
  480. // or a successful perform
  481. const interesting = [];
  482. let newRunners = 0;
  483. for (const runner of running) {
  484. // No responses, yet; keep an eye on it
  485. if (runner.perform) {
  486. interesting.push(runner.perform);
  487. }
  488. // Still stalling...
  489. if (runner.staller) {
  490. interesting.push(runner.staller);
  491. continue;
  492. }
  493. // This runner has already triggered another runner
  494. if (runner.didBump) {
  495. continue;
  496. }
  497. // Got a response (result or error) or stalled; kick off another runner
  498. runner.didBump = true;
  499. newRunners++;
  500. }
  501. // Check if we have reached quorum on a result (or error)
  502. const value = await this.#checkQuorum(running, req);
  503. if (value !== undefined) {
  504. if (value instanceof Error) {
  505. throw value;
  506. }
  507. return value;
  508. }
  509. // Add any new runners, because a staller timed out or a result
  510. // or error response came in.
  511. for (let i = 0; i < newRunners; i++) {
  512. this.#addRunner(running, req);
  513. }
  514. // All providers have returned, and we have no result
  515. assert(interesting.length > 0, "quorum not met", "SERVER_ERROR", {
  516. request: "%sub-requests",
  517. info: { request: req, results: Array.from(running).map((r) => stringify(r.result)) }
  518. });
  519. // Wait for someone to either complete its perform or stall out
  520. await Promise.race(interesting);
  521. // This is recursive, but at worst case the depth is 2x the
  522. // number of providers (each has a perform and a staller)
  523. return await this.#waitForQuorum(running, req);
  524. }
  525. async _perform(req) {
  526. // Broadcasting a transaction is rare (ish) and already incurs
  527. // a cost on the user, so spamming is safe-ish. Just send it to
  528. // every backend.
  529. if (req.method === "broadcastTransaction") {
  530. // Once any broadcast provides a positive result, use it. No
  531. // need to wait for anyone else
  532. const results = this.#configs.map((c) => null);
  533. const broadcasts = this.#configs.map(async ({ provider, weight }, index) => {
  534. try {
  535. const result = await provider._perform(req);
  536. results[index] = Object.assign(normalizeResult({ result }), { weight });
  537. }
  538. catch (error) {
  539. results[index] = Object.assign(normalizeResult({ error }), { weight });
  540. }
  541. });
  542. // As each promise finishes...
  543. while (true) {
  544. // Check for a valid broadcast result
  545. const done = results.filter((r) => (r != null));
  546. for (const { value } of done) {
  547. if (!(value instanceof Error)) {
  548. return value;
  549. }
  550. }
  551. // Check for a legit broadcast error (one which we cannot
  552. // recover from; some nodes may return the following red
  553. // herring events:
  554. // - alredy seend (UNKNOWN_ERROR)
  555. // - NONCE_EXPIRED
  556. // - REPLACEMENT_UNDERPRICED
  557. const result = checkQuorum(this.quorum, results.filter((r) => (r != null)));
  558. if (isError(result, "INSUFFICIENT_FUNDS")) {
  559. throw result;
  560. }
  561. // Kick off the next provider (if any)
  562. const waiting = broadcasts.filter((b, i) => (results[i] == null));
  563. if (waiting.length === 0) {
  564. break;
  565. }
  566. await Promise.race(waiting);
  567. }
  568. // Use standard quorum results; any result was returned above,
  569. // so this will find any error that met quorum if any
  570. const result = getAnyResult(this.quorum, results);
  571. assert(result !== undefined, "problem multi-broadcasting", "SERVER_ERROR", {
  572. request: "%sub-requests",
  573. info: { request: req, results: results.map(stringify) }
  574. });
  575. if (result instanceof Error) {
  576. throw result;
  577. }
  578. return result;
  579. }
  580. await this.#initialSync();
  581. // Bootstrap enough runners to meet quorum
  582. const running = new Set();
  583. let inflightQuorum = 0;
  584. while (true) {
  585. const runner = this.#addRunner(running, req);
  586. if (runner == null) {
  587. break;
  588. }
  589. inflightQuorum += runner.config.weight;
  590. if (inflightQuorum >= this.quorum) {
  591. break;
  592. }
  593. }
  594. const result = await this.#waitForQuorum(running, req);
  595. // Track requests sent to a provider that are still
  596. // outstanding after quorum has been otherwise found
  597. for (const runner of running) {
  598. if (runner.perform && runner.result == null) {
  599. runner.config.lateResponses++;
  600. }
  601. }
  602. return result;
  603. }
  604. async destroy() {
  605. for (const { provider } of this.#configs) {
  606. provider.destroy();
  607. }
  608. super.destroy();
  609. }
  610. }
  611. //# sourceMappingURL=provider-fallback.js.map