provider-fallback.ts 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792
  1. /**
  2. * A **FallbackProvider** provides resilience, security and performance
  3. * in a way that is customizable and configurable.
  4. *
  5. * @_section: api/providers/fallback-provider:Fallback Provider [about-fallback-provider]
  6. */
  7. import {
  8. assert, assertArgument, getBigInt, getNumber, isError
  9. } from "../utils/index.js";
  10. import { AbstractProvider } from "./abstract-provider.js";
  11. import { Network } from "./network.js"
  12. import type { PerformActionRequest } from "./abstract-provider.js";
  13. import type { Networkish } from "./network.js"
  14. const BN_1 = BigInt("1");
  15. const BN_2 = BigInt("2");
  16. function shuffle<T = any>(array: Array<T>): void {
  17. for (let i = array.length - 1; i > 0; i--) {
  18. const j = Math.floor(Math.random() * (i + 1));
  19. const tmp = array[i];
  20. array[i] = array[j];
  21. array[j] = tmp;
  22. }
  23. }
  24. function stall(duration: number): Promise<void> {
  25. return new Promise((resolve) => { setTimeout(resolve, duration); });
  26. }
  27. function getTime(): number { return (new Date()).getTime(); }
  28. function stringify(value: any): string {
  29. return JSON.stringify(value, (key, value) => {
  30. if (typeof(value) === "bigint") {
  31. return { type: "bigint", value: value.toString() };
  32. }
  33. return value;
  34. });
  35. }
  36. /**
  37. * A configuration entry for how to use a [[Provider]].
  38. */
  39. export interface FallbackProviderConfig {
  40. /**
  41. * The provider.
  42. */
  43. provider: AbstractProvider;
  44. /**
  45. * The amount of time to wait before kicking off the next provider.
  46. *
  47. * Any providers that have not responded can still respond and be
  48. * counted, but this ensures new providers start.
  49. */
  50. stallTimeout?: number;
  51. /**
  52. * The priority. Lower priority providers are dispatched first.
  53. */
  54. priority?: number;
  55. /**
  56. * The amount of weight a provider is given against the quorum.
  57. */
  58. weight?: number;
  59. };
  60. const defaultConfig = { stallTimeout: 400, priority: 1, weight: 1 };
  61. // We track a bunch of extra stuff that might help debug problems or
  62. // optimize infrastructure later on.
  63. /**
  64. * The statistics and state maintained for a [[Provider]].
  65. */
  66. export interface FallbackProviderState extends Required<FallbackProviderConfig> {
  67. /**
  68. * The most recent blockNumber this provider has reported (-2 if none).
  69. */
  70. blockNumber: number;
  71. /**
  72. * The number of total requests ever sent to this provider.
  73. */
  74. requests: number;
  75. /**
  76. * The number of responses that errored.
  77. */
  78. errorResponses: number;
  79. /**
  80. * The number of responses that occured after the result resolved.
  81. */
  82. lateResponses: number;
  83. /**
  84. * How many times syncing was required to catch up the expected block.
  85. */
  86. outOfSync: number;
  87. /**
  88. * The number of requests which reported unsupported operation.
  89. */
  90. unsupportedEvents: number;
  91. /**
  92. * A rolling average (5% current duration) for response time.
  93. */
  94. rollingDuration: number;
  95. /**
  96. * The ratio of quorum-agreed results to total.
  97. */
  98. score: number;
  99. }
  100. interface Config extends FallbackProviderState {
  101. _updateNumber: null | Promise<any>;
  102. _network: null | Network;
  103. _totalTime: number;
  104. _lastFatalError: null | Error;
  105. _lastFatalErrorTimestamp: number;
  106. }
  107. const defaultState = {
  108. blockNumber: -2, requests: 0, lateResponses: 0, errorResponses: 0,
  109. outOfSync: -1, unsupportedEvents: 0, rollingDuration: 0, score: 0,
  110. _network: null, _updateNumber: null, _totalTime: 0,
  111. _lastFatalError: null, _lastFatalErrorTimestamp: 0
  112. };
  113. async function waitForSync(config: Config, blockNumber: number): Promise<void> {
  114. while (config.blockNumber < 0 || config.blockNumber < blockNumber) {
  115. if (!config._updateNumber) {
  116. config._updateNumber = (async () => {
  117. try {
  118. const blockNumber = await config.provider.getBlockNumber();
  119. if (blockNumber > config.blockNumber) {
  120. config.blockNumber = blockNumber;
  121. }
  122. } catch (error: any) {
  123. config.blockNumber = -2;
  124. config._lastFatalError = error;
  125. config._lastFatalErrorTimestamp = getTime();
  126. }
  127. config._updateNumber = null;
  128. })();
  129. }
  130. await config._updateNumber;
  131. config.outOfSync++;
  132. if (config._lastFatalError) { break; }
  133. }
  134. }
  135. /**
  136. * Additional options to configure a [[FallbackProvider]].
  137. */
  138. export type FallbackProviderOptions = {
  139. // How many providers must agree on a value before reporting
  140. // back the response
  141. quorum?: number;
  142. // How many providers must have reported the same event
  143. // for it to be emitted (currently unimplmented)
  144. eventQuorum?: number;
  145. // How many providers to dispatch each event to simultaneously.
  146. // Set this to 0 to use getLog polling, which implies eventQuorum
  147. // is equal to quorum. (currently unimplemented)
  148. eventWorkers?: number;
  149. cacheTimeout?: number;
  150. pollingInterval?: number;
  151. };
  152. type RunnerResult = { result: any } | { error: Error };
  153. type RunnerState = {
  154. config: Config;
  155. staller: null | Promise<void>;
  156. didBump: boolean;
  157. perform: null | Promise<any>;
  158. result: null | RunnerResult;
  159. }
  160. function _normalize(value: any): string {
  161. if (value == null) { return "null"; }
  162. if (Array.isArray(value)) {
  163. return "[" + (value.map(_normalize)).join(",") + "]";
  164. }
  165. if (typeof(value) === "object" && typeof(value.toJSON) === "function") {
  166. return _normalize(value.toJSON());
  167. }
  168. switch (typeof(value)) {
  169. case "boolean": case "symbol":
  170. return value.toString();
  171. case "bigint": case "number":
  172. return BigInt(value).toString();
  173. case "string":
  174. return JSON.stringify(value);
  175. case "object": {
  176. const keys = Object.keys(value);
  177. keys.sort();
  178. return "{" + keys.map((k) => `${ JSON.stringify(k) }:${ _normalize(value[k]) }`).join(",") + "}";
  179. }
  180. }
  181. console.log("Could not serialize", value);
  182. throw new Error("Hmm...");
  183. }
  184. function normalizeResult(value: RunnerResult): { tag: string, value: any } {
  185. if ("error" in value) {
  186. const error = value.error;
  187. return { tag: _normalize(error), value: error };
  188. }
  189. const result = value.result;
  190. return { tag: _normalize(result), value: result };
  191. }
  192. type TallyResult = {
  193. tag: string;
  194. value: any;
  195. weight: number;
  196. };
  197. // This strategy picks the highest weight result, as long as the weight is
  198. // equal to or greater than quorum
  199. function checkQuorum(quorum: number, results: Array<TallyResult>): any | Error {
  200. const tally: Map<string, { value: any, weight: number }> = new Map();
  201. for (const { value, tag, weight } of results) {
  202. const t = tally.get(tag) || { value, weight: 0 };
  203. t.weight += weight;
  204. tally.set(tag, t);
  205. }
  206. let best: null | { value: any, weight: number } = null;
  207. for (const r of tally.values()) {
  208. if (r.weight >= quorum && (!best || r.weight > best.weight)) {
  209. best = r;
  210. }
  211. }
  212. if (best) { return best.value; }
  213. return undefined;
  214. }
  215. function getMedian(quorum: number, results: Array<TallyResult>): undefined | bigint | Error {
  216. let resultWeight = 0;
  217. const errorMap: Map<string, { weight: number, value: Error }> = new Map();
  218. let bestError: null | { weight: number, value: Error } = null;
  219. const values: Array<bigint> = [ ];
  220. for (const { value, tag, weight } of results) {
  221. if (value instanceof Error) {
  222. const e = errorMap.get(tag) || { value, weight: 0 };
  223. e.weight += weight;
  224. errorMap.set(tag, e);
  225. if (bestError == null || e.weight > bestError.weight) { bestError = e; }
  226. } else {
  227. values.push(BigInt(value));
  228. resultWeight += weight;
  229. }
  230. }
  231. if (resultWeight < quorum) {
  232. // We have quorum for an error
  233. if (bestError && bestError.weight >= quorum) { return bestError.value; }
  234. // We do not have quorum for a result
  235. return undefined;
  236. }
  237. // Get the sorted values
  238. values.sort((a, b) => ((a < b) ? -1: (b > a) ? 1: 0));
  239. const mid = Math.floor(values.length / 2);
  240. // Odd-length; take the middle value
  241. if (values.length % 2) { return values[mid]; }
  242. // Even length; take the ceiling of the mean of the center two values
  243. return (values[mid - 1] + values[mid] + BN_1) / BN_2;
  244. }
  245. function getAnyResult(quorum: number, results: Array<TallyResult>): undefined | any | Error {
  246. // If any value or error meets quorum, that is our preferred result
  247. const result = checkQuorum(quorum, results);
  248. if (result !== undefined) { return result; }
  249. // Otherwise, do we have any result?
  250. for (const r of results) {
  251. if (r.value) { return r.value; }
  252. }
  253. // Nope!
  254. return undefined;
  255. }
  256. function getFuzzyMode(quorum: number, results: Array<TallyResult>): undefined | number {
  257. if (quorum === 1) { return getNumber(<bigint>getMedian(quorum, results), "%internal"); }
  258. const tally: Map<number, { result: number, weight: number }> = new Map();
  259. const add = (result: number, weight: number) => {
  260. const t = tally.get(result) || { result, weight: 0 };
  261. t.weight += weight;
  262. tally.set(result, t);
  263. };
  264. for (const { weight, value } of results) {
  265. const r = getNumber(value);
  266. add(r - 1, weight);
  267. add(r, weight);
  268. add(r + 1, weight);
  269. }
  270. let bestWeight = 0;
  271. let bestResult: undefined | number = undefined;
  272. for (const { weight, result } of tally.values()) {
  273. // Use this result, if this result meets quorum and has either:
  274. // - a better weight
  275. // - or equal weight, but the result is larger
  276. if (weight >= quorum && (weight > bestWeight || (bestResult != null && weight === bestWeight && result > bestResult))) {
  277. bestWeight = weight;
  278. bestResult = result;
  279. }
  280. }
  281. return bestResult;
  282. }
  283. /**
  284. * A **FallbackProvider** manages several [[Providers]] providing
  285. * resilience by switching between slow or misbehaving nodes, security
  286. * by requiring multiple backends to aggree and performance by allowing
  287. * faster backends to respond earlier.
  288. *
  289. */
  290. export class FallbackProvider extends AbstractProvider {
  291. /**
  292. * The number of backends that must agree on a value before it is
  293. * accpeted.
  294. */
  295. readonly quorum: number;
  296. /**
  297. * @_ignore:
  298. */
  299. readonly eventQuorum: number;
  300. /**
  301. * @_ignore:
  302. */
  303. readonly eventWorkers: number;
  304. readonly #configs: Array<Config>;
  305. #height: number;
  306. #initialSyncPromise: null | Promise<void>;
  307. /**
  308. * Creates a new **FallbackProvider** with %%providers%% connected to
  309. * %%network%%.
  310. *
  311. * If a [[Provider]] is included in %%providers%%, defaults are used
  312. * for the configuration.
  313. */
  314. constructor(providers: Array<AbstractProvider | FallbackProviderConfig>, network?: Networkish, options?: FallbackProviderOptions) {
  315. super(network, options);
  316. this.#configs = providers.map((p) => {
  317. if (p instanceof AbstractProvider) {
  318. return Object.assign({ provider: p }, defaultConfig, defaultState );
  319. } else {
  320. return Object.assign({ }, defaultConfig, p, defaultState );
  321. }
  322. });
  323. this.#height = -2;
  324. this.#initialSyncPromise = null;
  325. if (options && options.quorum != null) {
  326. this.quorum = options.quorum;
  327. } else {
  328. this.quorum = Math.ceil(this.#configs.reduce((accum, config) => {
  329. accum += config.weight;
  330. return accum;
  331. }, 0) / 2);
  332. }
  333. this.eventQuorum = 1;
  334. this.eventWorkers = 1;
  335. assertArgument(this.quorum <= this.#configs.reduce((a, c) => (a + c.weight), 0),
  336. "quorum exceed provider weight", "quorum", this.quorum);
  337. }
  338. get providerConfigs(): Array<FallbackProviderState> {
  339. return this.#configs.map((c) => {
  340. const result: any = Object.assign({ }, c);
  341. for (const key in result) {
  342. if (key[0] === "_") { delete result[key]; }
  343. }
  344. return result;
  345. });
  346. }
  347. async _detectNetwork(): Promise<Network> {
  348. return Network.from(getBigInt(await this._perform({ method: "chainId" })));
  349. }
  350. // @TODO: Add support to select providers to be the event subscriber
  351. //_getSubscriber(sub: Subscription): Subscriber {
  352. // throw new Error("@TODO");
  353. //}
  354. /**
  355. * Transforms a %%req%% into the correct method call on %%provider%%.
  356. */
  357. async _translatePerform(provider: AbstractProvider, req: PerformActionRequest): Promise<any> {
  358. switch (req.method) {
  359. case "broadcastTransaction":
  360. return await provider.broadcastTransaction(req.signedTransaction);
  361. case "call":
  362. return await provider.call(Object.assign({ }, req.transaction, { blockTag: req.blockTag }));
  363. case "chainId":
  364. return (await provider.getNetwork()).chainId;
  365. case "estimateGas":
  366. return await provider.estimateGas(req.transaction);
  367. case "getBalance":
  368. return await provider.getBalance(req.address, req.blockTag);
  369. case "getBlock": {
  370. const block = ("blockHash" in req) ? req.blockHash: req.blockTag;
  371. return await provider.getBlock(block, req.includeTransactions);
  372. }
  373. case "getBlockNumber":
  374. return await provider.getBlockNumber();
  375. case "getCode":
  376. return await provider.getCode(req.address, req.blockTag);
  377. case "getGasPrice":
  378. return (await provider.getFeeData()).gasPrice;
  379. case "getPriorityFee":
  380. return (await provider.getFeeData()).maxPriorityFeePerGas;
  381. case "getLogs":
  382. return await provider.getLogs(req.filter);
  383. case "getStorage":
  384. return await provider.getStorage(req.address, req.position, req.blockTag);
  385. case "getTransaction":
  386. return await provider.getTransaction(req.hash);
  387. case "getTransactionCount":
  388. return await provider.getTransactionCount(req.address, req.blockTag);
  389. case "getTransactionReceipt":
  390. return await provider.getTransactionReceipt(req.hash);
  391. case "getTransactionResult":
  392. return await provider.getTransactionResult(req.hash);
  393. }
  394. }
  395. // Grab the next (random) config that is not already part of
  396. // the running set
  397. #getNextConfig(running: Set<RunnerState>): null | Config {
  398. // @TODO: Maybe do a check here to favour (heavily) providers that
  399. // do not require waitForSync and disfavour providers that
  400. // seem down-ish or are behaving slowly
  401. const configs = Array.from(running).map((r) => r.config)
  402. // Shuffle the states, sorted by priority
  403. const allConfigs = this.#configs.slice();
  404. shuffle(allConfigs);
  405. allConfigs.sort((a, b) => (a.priority - b.priority));
  406. for (const config of allConfigs) {
  407. if (config._lastFatalError) { continue; }
  408. if (configs.indexOf(config) === -1) { return config; }
  409. }
  410. return null;
  411. }
  412. // Adds a new runner (if available) to running.
  413. #addRunner(running: Set<RunnerState>, req: PerformActionRequest): null | RunnerState {
  414. const config = this.#getNextConfig(running);
  415. // No runners available
  416. if (config == null) { return null; }
  417. // Create a new runner
  418. const runner: RunnerState = {
  419. config, result: null, didBump: false,
  420. perform: null, staller: null
  421. };
  422. const now = getTime();
  423. // Start performing this operation
  424. runner.perform = (async () => {
  425. try {
  426. config.requests++;
  427. const result = await this._translatePerform(config.provider, req);
  428. runner.result = { result };
  429. } catch (error: any) {
  430. config.errorResponses++;
  431. runner.result = { error };
  432. }
  433. const dt = (getTime() - now);
  434. config._totalTime += dt;
  435. config.rollingDuration = 0.95 * config.rollingDuration + 0.05 * dt;
  436. runner.perform = null;
  437. })();
  438. // Start a staller; when this times out, it's time to force
  439. // kicking off another runner because we are taking too long
  440. runner.staller = (async () => {
  441. await stall(config.stallTimeout);
  442. runner.staller = null;
  443. })();
  444. running.add(runner);
  445. return runner;
  446. }
  447. // Initializes the blockNumber and network for each runner and
  448. // blocks until initialized
  449. async #initialSync(): Promise<void> {
  450. let initialSync = this.#initialSyncPromise;
  451. if (!initialSync) {
  452. const promises: Array<Promise<any>> = [ ];
  453. this.#configs.forEach((config) => {
  454. promises.push((async () => {
  455. await waitForSync(config, 0);
  456. if (!config._lastFatalError) {
  457. config._network = await config.provider.getNetwork();
  458. }
  459. })());
  460. });
  461. this.#initialSyncPromise = initialSync = (async () => {
  462. // Wait for all providers to have a block number and network
  463. await Promise.all(promises);
  464. // Check all the networks match
  465. let chainId: null | bigint = null;
  466. for (const config of this.#configs) {
  467. if (config._lastFatalError) { continue; }
  468. const network = <Network>(config._network);
  469. if (chainId == null) {
  470. chainId = network.chainId;
  471. } else if (network.chainId !== chainId) {
  472. assert(false, "cannot mix providers on different networks", "UNSUPPORTED_OPERATION", {
  473. operation: "new FallbackProvider"
  474. });
  475. }
  476. }
  477. })();
  478. }
  479. await initialSync
  480. }
  481. async #checkQuorum(running: Set<RunnerState>, req: PerformActionRequest): Promise<any> {
  482. // Get all the result objects
  483. const results: Array<TallyResult> = [ ];
  484. for (const runner of running) {
  485. if (runner.result != null) {
  486. const { tag, value } = normalizeResult(runner.result);
  487. results.push({ tag, value, weight: runner.config.weight });
  488. }
  489. }
  490. // Are there enough results to event meet quorum?
  491. if (results.reduce((a, r) => (a + r.weight), 0) < this.quorum) {
  492. return undefined;
  493. }
  494. switch (req.method) {
  495. case "getBlockNumber": {
  496. // We need to get the bootstrap block height
  497. if (this.#height === -2) {
  498. this.#height = Math.ceil(getNumber(<bigint>getMedian(this.quorum, this.#configs.filter((c) => (!c._lastFatalError)).map((c) => ({
  499. value: c.blockNumber,
  500. tag: getNumber(c.blockNumber).toString(),
  501. weight: c.weight
  502. })))));
  503. }
  504. // Find the mode across all the providers, allowing for
  505. // a little drift between block heights
  506. const mode = getFuzzyMode(this.quorum, results);
  507. if (mode === undefined) { return undefined; }
  508. if (mode > this.#height) { this.#height = mode; }
  509. return this.#height;
  510. }
  511. case "getGasPrice":
  512. case "getPriorityFee":
  513. case "estimateGas":
  514. return getMedian(this.quorum, results);
  515. case "getBlock":
  516. // Pending blocks are in the mempool and already
  517. // quite untrustworthy; just grab anything
  518. if ("blockTag" in req && req.blockTag === "pending") {
  519. return getAnyResult(this.quorum, results);
  520. }
  521. return checkQuorum(this.quorum, results);
  522. case "call":
  523. case "chainId":
  524. case "getBalance":
  525. case "getTransactionCount":
  526. case "getCode":
  527. case "getStorage":
  528. case "getTransaction":
  529. case "getTransactionReceipt":
  530. case "getLogs":
  531. return checkQuorum(this.quorum, results);
  532. case "broadcastTransaction":
  533. return getAnyResult(this.quorum, results);
  534. }
  535. assert(false, "unsupported method", "UNSUPPORTED_OPERATION", {
  536. operation: `_perform(${ stringify((<any>req).method) })`
  537. });
  538. }
  539. async #waitForQuorum(running: Set<RunnerState>, req: PerformActionRequest): Promise<any> {
  540. if (running.size === 0) { throw new Error("no runners?!"); }
  541. // Any promises that are interesting to watch for; an expired stall
  542. // or a successful perform
  543. const interesting: Array<Promise<void>> = [ ];
  544. let newRunners = 0;
  545. for (const runner of running) {
  546. // No responses, yet; keep an eye on it
  547. if (runner.perform) {
  548. interesting.push(runner.perform);
  549. }
  550. // Still stalling...
  551. if (runner.staller) {
  552. interesting.push(runner.staller);
  553. continue;
  554. }
  555. // This runner has already triggered another runner
  556. if (runner.didBump) { continue; }
  557. // Got a response (result or error) or stalled; kick off another runner
  558. runner.didBump = true;
  559. newRunners++;
  560. }
  561. // Check if we have reached quorum on a result (or error)
  562. const value = await this.#checkQuorum(running, req);
  563. if (value !== undefined) {
  564. if (value instanceof Error) { throw value; }
  565. return value;
  566. }
  567. // Add any new runners, because a staller timed out or a result
  568. // or error response came in.
  569. for (let i = 0; i < newRunners; i++) {
  570. this.#addRunner(running, req);
  571. }
  572. // All providers have returned, and we have no result
  573. assert(interesting.length > 0, "quorum not met", "SERVER_ERROR", {
  574. request: "%sub-requests",
  575. info: { request: req, results: Array.from(running).map((r) => stringify(r.result)) }
  576. });
  577. // Wait for someone to either complete its perform or stall out
  578. await Promise.race(interesting);
  579. // This is recursive, but at worst case the depth is 2x the
  580. // number of providers (each has a perform and a staller)
  581. return await this.#waitForQuorum(running, req);
  582. }
  583. async _perform<T = any>(req: PerformActionRequest): Promise<T> {
  584. // Broadcasting a transaction is rare (ish) and already incurs
  585. // a cost on the user, so spamming is safe-ish. Just send it to
  586. // every backend.
  587. if (req.method === "broadcastTransaction") {
  588. // Once any broadcast provides a positive result, use it. No
  589. // need to wait for anyone else
  590. const results: Array<null | TallyResult> = this.#configs.map((c) => null);
  591. const broadcasts = this.#configs.map(async ({ provider, weight }, index) => {
  592. try {
  593. const result = await provider._perform(req);
  594. results[index] = Object.assign(normalizeResult({ result }), { weight });
  595. } catch (error: any) {
  596. results[index] = Object.assign(normalizeResult({ error }), { weight });
  597. }
  598. });
  599. // As each promise finishes...
  600. while (true) {
  601. // Check for a valid broadcast result
  602. const done = <Array<any>>results.filter((r) => (r != null));
  603. for (const { value } of done) {
  604. if (!(value instanceof Error)) { return value; }
  605. }
  606. // Check for a legit broadcast error (one which we cannot
  607. // recover from; some nodes may return the following red
  608. // herring events:
  609. // - alredy seend (UNKNOWN_ERROR)
  610. // - NONCE_EXPIRED
  611. // - REPLACEMENT_UNDERPRICED
  612. const result = checkQuorum(this.quorum, <Array<any>>results.filter((r) => (r != null)));
  613. if (isError(result, "INSUFFICIENT_FUNDS")) {
  614. throw result;
  615. }
  616. // Kick off the next provider (if any)
  617. const waiting = broadcasts.filter((b, i) => (results[i] == null));
  618. if (waiting.length === 0) { break; }
  619. await Promise.race(waiting);
  620. }
  621. // Use standard quorum results; any result was returned above,
  622. // so this will find any error that met quorum if any
  623. const result = getAnyResult(this.quorum, <Array<any>>results);
  624. assert(result !== undefined, "problem multi-broadcasting", "SERVER_ERROR", {
  625. request: "%sub-requests",
  626. info: { request: req, results: results.map(stringify) }
  627. })
  628. if (result instanceof Error) { throw result; }
  629. return result;
  630. }
  631. await this.#initialSync();
  632. // Bootstrap enough runners to meet quorum
  633. const running: Set<RunnerState> = new Set();
  634. let inflightQuorum = 0;
  635. while (true) {
  636. const runner = this.#addRunner(running, req);
  637. if (runner == null) { break; }
  638. inflightQuorum += runner.config.weight;
  639. if (inflightQuorum >= this.quorum) { break; }
  640. }
  641. const result = await this.#waitForQuorum(running, req);
  642. // Track requests sent to a provider that are still
  643. // outstanding after quorum has been otherwise found
  644. for (const runner of running) {
  645. if (runner.perform && runner.result == null) {
  646. runner.config.lateResponses++;
  647. }
  648. }
  649. return result;
  650. }
  651. async destroy(): Promise<void> {
  652. for (const { provider } of this.#configs) {
  653. provider.destroy();
  654. }
  655. super.destroy();
  656. }
  657. }