| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792 | /** *  A **FallbackProvider** provides resilience, security and performance *  in a way that is customizable and configurable. * *  @_section: api/providers/fallback-provider:Fallback Provider [about-fallback-provider] */import {    assert, assertArgument, getBigInt, getNumber, isError} from "../utils/index.js";import { AbstractProvider } from "./abstract-provider.js";import { Network } from "./network.js"import type { PerformActionRequest } from "./abstract-provider.js";import type { Networkish } from "./network.js"const BN_1 = BigInt("1");const BN_2 = BigInt("2");function shuffle<T = any>(array: Array<T>): void {    for (let i = array.length - 1; i > 0; i--) {        const j = Math.floor(Math.random() * (i + 1));        const tmp = array[i];        array[i] = array[j];        array[j] = tmp;    }}function stall(duration: number): Promise<void> {    return new Promise((resolve) => { setTimeout(resolve, duration); });}function getTime(): number { return (new Date()).getTime(); }function stringify(value: any): string {    return JSON.stringify(value, (key, value) => {        if (typeof(value) === "bigint") {            return { type: "bigint", value: value.toString() };        }        return value;    });}/** *  A configuration entry for how to use a [[Provider]]. */export interface FallbackProviderConfig {    /**     *  The provider.     */    provider: AbstractProvider;    /**     *  The amount of time to wait before kicking off the next provider.     *     *  Any providers that have not responded can still respond and be     *  counted, but this ensures new providers start.     */    stallTimeout?: number;    /**     *  The priority. Lower priority providers are dispatched first.     */    priority?: number;    /**     *  The amount of weight a provider is given against the quorum.     */    weight?: number;};const defaultConfig = { stallTimeout: 400, priority: 1, weight: 1 };// We track a bunch of extra stuff that might help debug problems or// optimize infrastructure later on./** *  The statistics and state maintained for a [[Provider]]. */export interface FallbackProviderState extends Required<FallbackProviderConfig> {    /**     *  The most recent blockNumber this provider has reported (-2 if none).     */    blockNumber: number;    /**     *  The number of total requests ever sent to this provider.     */    requests: number;    /**     *  The number of responses that errored.     */    errorResponses: number;    /**     *  The number of responses that occured after the result resolved.     */    lateResponses: number;    /**     *  How many times syncing was required to catch up the expected block.     */    outOfSync: number;    /**     *  The number of requests which reported unsupported operation.     */    unsupportedEvents: number;    /**     *  A rolling average (5% current duration) for response time.     */    rollingDuration: number;    /**     *  The ratio of quorum-agreed results to total.     */    score: number;}interface Config extends FallbackProviderState {    _updateNumber: null | Promise<any>;    _network: null | Network;    _totalTime: number;    _lastFatalError: null | Error;    _lastFatalErrorTimestamp: number;}const defaultState = {    blockNumber: -2, requests: 0, lateResponses: 0, errorResponses: 0,    outOfSync: -1, unsupportedEvents: 0, rollingDuration: 0, score: 0,    _network: null, _updateNumber: null, _totalTime: 0,    _lastFatalError: null, _lastFatalErrorTimestamp: 0};async function waitForSync(config: Config, blockNumber: number): Promise<void> {    while (config.blockNumber < 0 || config.blockNumber < blockNumber) {        if (!config._updateNumber) {            config._updateNumber = (async () => {                try {                    const blockNumber = await config.provider.getBlockNumber();                    if (blockNumber > config.blockNumber) {                        config.blockNumber = blockNumber;                    }                } catch (error: any) {                    config.blockNumber = -2;                    config._lastFatalError = error;                    config._lastFatalErrorTimestamp = getTime();                }                config._updateNumber = null;            })();        }        await config._updateNumber;        config.outOfSync++;        if (config._lastFatalError) { break; }    }}/** *  Additional options to configure a [[FallbackProvider]]. */export type FallbackProviderOptions = {    // How many providers must agree on a value before reporting    // back the response    quorum?: number;    // How many providers must have reported the same event    // for it to be emitted (currently unimplmented)    eventQuorum?: number;    // How many providers to dispatch each event to simultaneously.    // Set this to 0 to use getLog polling, which implies eventQuorum    // is equal to quorum. (currently unimplemented)    eventWorkers?: number;    cacheTimeout?: number;    pollingInterval?: number;};type RunnerResult = { result: any } | { error: Error };type RunnerState = {    config: Config;    staller: null | Promise<void>;    didBump: boolean;    perform: null | Promise<any>;    result: null | RunnerResult;}function _normalize(value: any): string {    if (value == null) { return "null"; }    if (Array.isArray(value)) {        return "[" + (value.map(_normalize)).join(",") + "]";    }    if (typeof(value) === "object" && typeof(value.toJSON) === "function") {        return _normalize(value.toJSON());    }    switch (typeof(value)) {        case "boolean": case "symbol":            return value.toString();        case "bigint": case "number":            return BigInt(value).toString();        case "string":            return JSON.stringify(value);        case "object": {            const keys = Object.keys(value);            keys.sort();            return "{" + keys.map((k) => `${ JSON.stringify(k) }:${ _normalize(value[k]) }`).join(",") + "}";        }    }    console.log("Could not serialize", value);    throw new Error("Hmm...");}function normalizeResult(value: RunnerResult): { tag: string, value: any } {    if ("error" in value) {        const error = value.error;        return { tag: _normalize(error), value: error };    }    const result = value.result;    return { tag: _normalize(result), value: result };}type TallyResult = {    tag: string;    value: any;    weight: number;};// This strategy picks the highest weight result, as long as the weight is// equal to or greater than quorumfunction checkQuorum(quorum: number, results: Array<TallyResult>): any | Error {    const tally: Map<string, { value: any, weight: number }> = new Map();    for (const { value, tag, weight } of results) {        const t = tally.get(tag) || { value, weight: 0 };        t.weight += weight;        tally.set(tag, t);    }    let best: null | { value: any, weight: number } = null;    for (const r of tally.values()) {        if (r.weight >= quorum && (!best || r.weight > best.weight)) {            best = r;        }    }    if (best) { return best.value; }    return undefined;}function getMedian(quorum: number, results: Array<TallyResult>): undefined | bigint | Error {    let resultWeight = 0;    const errorMap: Map<string, { weight: number, value: Error }> = new Map();    let bestError: null | { weight: number, value: Error } = null;    const values: Array<bigint> = [ ];    for (const { value, tag, weight } of results) {        if (value instanceof Error) {            const e = errorMap.get(tag) || { value, weight: 0 };            e.weight += weight;            errorMap.set(tag, e);            if (bestError == null || e.weight > bestError.weight) { bestError = e; }        } else {            values.push(BigInt(value));            resultWeight += weight;        }    }    if (resultWeight < quorum) {        // We have quorum for an error        if (bestError && bestError.weight >= quorum) { return bestError.value; }        // We do not have quorum for a result        return undefined;    }    // Get the sorted values    values.sort((a, b) => ((a < b) ? -1: (b > a) ? 1: 0));    const mid = Math.floor(values.length / 2);    // Odd-length; take the middle value    if (values.length % 2) { return values[mid]; }    // Even length; take the ceiling of the mean of the center two values    return (values[mid - 1] + values[mid] + BN_1) / BN_2;}function getAnyResult(quorum: number, results: Array<TallyResult>): undefined | any | Error {    // If any value or error meets quorum, that is our preferred result    const result = checkQuorum(quorum, results);    if (result !== undefined) { return result; }    // Otherwise, do we have any result?    for (const r of results) {        if (r.value) { return r.value; }    }    // Nope!    return undefined;}function getFuzzyMode(quorum: number, results: Array<TallyResult>): undefined | number {    if (quorum === 1) { return getNumber(<bigint>getMedian(quorum, results), "%internal"); }    const tally: Map<number, { result: number, weight: number }> = new Map();    const add = (result: number, weight: number) => {        const t = tally.get(result) || { result, weight: 0 };        t.weight += weight;        tally.set(result, t);    };    for (const { weight, value } of results) {        const r = getNumber(value);        add(r - 1, weight);        add(r, weight);        add(r + 1, weight);    }    let bestWeight = 0;    let bestResult: undefined | number = undefined;    for (const { weight, result } of tally.values()) {        // Use this result, if this result meets quorum and has either:        // - a better weight        // - or equal weight, but the result is larger        if (weight >= quorum && (weight > bestWeight || (bestResult != null && weight === bestWeight && result > bestResult))) {            bestWeight = weight;            bestResult = result;        }    }    return bestResult;}/** *  A **FallbackProvider** manages several [[Providers]] providing *  resilience by switching between slow or misbehaving nodes, security *  by requiring multiple backends to aggree and performance by allowing *  faster backends to respond earlier. * */export class FallbackProvider extends AbstractProvider {    /**     *  The number of backends that must agree on a value before it is     *  accpeted.     */    readonly quorum: number;    /**     *  @_ignore:     */    readonly eventQuorum: number;    /**     *  @_ignore:     */    readonly eventWorkers: number;    readonly #configs: Array<Config>;    #height: number;    #initialSyncPromise: null | Promise<void>;    /**     *  Creates a new **FallbackProvider** with %%providers%% connected to     *  %%network%%.     *     *  If a [[Provider]] is included in %%providers%%, defaults are used     *  for the configuration.     */    constructor(providers: Array<AbstractProvider | FallbackProviderConfig>, network?: Networkish, options?: FallbackProviderOptions) {        super(network, options);        this.#configs = providers.map((p) => {            if (p instanceof AbstractProvider) {                return Object.assign({ provider: p }, defaultConfig, defaultState );            } else {                return Object.assign({ }, defaultConfig, p, defaultState );            }        });        this.#height = -2;        this.#initialSyncPromise = null;        if (options && options.quorum != null) {            this.quorum = options.quorum;        } else {            this.quorum = Math.ceil(this.#configs.reduce((accum, config) => {                accum += config.weight;                return accum;            }, 0) / 2);        }        this.eventQuorum = 1;        this.eventWorkers = 1;        assertArgument(this.quorum <= this.#configs.reduce((a, c) => (a + c.weight), 0),            "quorum exceed provider weight", "quorum", this.quorum);    }    get providerConfigs(): Array<FallbackProviderState> {        return this.#configs.map((c) => {            const result: any = Object.assign({ }, c);            for (const key in result) {                if (key[0] === "_") { delete result[key]; }            }            return result;        });    }    async _detectNetwork(): Promise<Network> {        return Network.from(getBigInt(await this._perform({ method: "chainId" })));    }    // @TODO: Add support to select providers to be the event subscriber    //_getSubscriber(sub: Subscription): Subscriber {    //    throw new Error("@TODO");    //}    /**     *  Transforms a %%req%% into the correct method call on %%provider%%.     */    async _translatePerform(provider: AbstractProvider, req: PerformActionRequest): Promise<any> {        switch (req.method) {            case "broadcastTransaction":                return await provider.broadcastTransaction(req.signedTransaction);            case "call":                return await provider.call(Object.assign({ }, req.transaction, { blockTag: req.blockTag }));            case "chainId":                return (await provider.getNetwork()).chainId;            case "estimateGas":                return await provider.estimateGas(req.transaction);            case "getBalance":                return await provider.getBalance(req.address, req.blockTag);            case "getBlock": {                const block = ("blockHash" in req) ? req.blockHash: req.blockTag;                return await provider.getBlock(block, req.includeTransactions);            }            case "getBlockNumber":                return await provider.getBlockNumber();            case "getCode":                return await provider.getCode(req.address, req.blockTag);            case "getGasPrice":                return (await provider.getFeeData()).gasPrice;            case "getPriorityFee":                return (await provider.getFeeData()).maxPriorityFeePerGas;            case "getLogs":                return await provider.getLogs(req.filter);            case "getStorage":                return await provider.getStorage(req.address, req.position, req.blockTag);            case "getTransaction":                return await provider.getTransaction(req.hash);            case "getTransactionCount":                return await provider.getTransactionCount(req.address, req.blockTag);            case "getTransactionReceipt":                return await provider.getTransactionReceipt(req.hash);            case "getTransactionResult":                return await provider.getTransactionResult(req.hash);        }    }    // Grab the next (random) config that is not already part of    // the running set    #getNextConfig(running: Set<RunnerState>): null | Config {        // @TODO: Maybe do a check here to favour (heavily) providers that        //        do not require waitForSync and disfavour providers that        //        seem down-ish or are behaving slowly        const configs = Array.from(running).map((r) => r.config)        // Shuffle the states, sorted by priority        const allConfigs = this.#configs.slice();        shuffle(allConfigs);        allConfigs.sort((a, b) => (a.priority - b.priority));        for (const config of allConfigs) {            if (config._lastFatalError) { continue; }            if (configs.indexOf(config) === -1) { return config; }        }        return null;    }    // Adds a new runner (if available) to running.    #addRunner(running: Set<RunnerState>, req: PerformActionRequest): null | RunnerState {        const config = this.#getNextConfig(running);        // No runners available        if (config == null) { return null; }        // Create a new runner        const runner: RunnerState = {            config, result: null, didBump: false,            perform: null, staller: null        };        const now = getTime();        // Start performing this operation        runner.perform = (async () => {            try {                config.requests++;                const result = await this._translatePerform(config.provider, req);                runner.result = { result };            } catch (error: any) {                config.errorResponses++;                runner.result = { error };            }            const dt = (getTime() - now);            config._totalTime += dt;            config.rollingDuration = 0.95 * config.rollingDuration + 0.05 * dt;            runner.perform = null;        })();        // Start a staller; when this times out, it's time to force        // kicking off another runner because we are taking too long        runner.staller = (async () => {            await stall(config.stallTimeout);            runner.staller = null;        })();        running.add(runner);        return runner;    }    // Initializes the blockNumber and network for each runner and    // blocks until initialized    async #initialSync(): Promise<void> {        let initialSync = this.#initialSyncPromise;        if (!initialSync) {            const promises: Array<Promise<any>> = [ ];            this.#configs.forEach((config) => {                promises.push((async () => {                    await waitForSync(config, 0);                    if (!config._lastFatalError) {                        config._network = await config.provider.getNetwork();                    }                })());            });            this.#initialSyncPromise = initialSync = (async () => {                // Wait for all providers to have a block number and network                await Promise.all(promises);                // Check all the networks match                let chainId: null | bigint = null;                for (const config of this.#configs) {                    if (config._lastFatalError) { continue; }                    const network = <Network>(config._network);                    if (chainId == null) {                        chainId = network.chainId;                    } else if (network.chainId !== chainId) {                        assert(false, "cannot mix providers on different networks", "UNSUPPORTED_OPERATION", {                            operation: "new FallbackProvider"                        });                    }                }            })();        }        await initialSync    }    async #checkQuorum(running: Set<RunnerState>, req: PerformActionRequest): Promise<any> {        // Get all the result objects        const results: Array<TallyResult> = [ ];        for (const runner of running) {            if (runner.result != null) {                const { tag, value } = normalizeResult(runner.result);                results.push({ tag, value, weight: runner.config.weight });            }        }        // Are there enough results to event meet quorum?        if (results.reduce((a, r) => (a + r.weight), 0) < this.quorum) {            return undefined;        }        switch (req.method) {            case "getBlockNumber": {                // We need to get the bootstrap block height                if (this.#height === -2) {                    this.#height = Math.ceil(getNumber(<bigint>getMedian(this.quorum, this.#configs.filter((c) => (!c._lastFatalError)).map((c) => ({                        value: c.blockNumber,                        tag: getNumber(c.blockNumber).toString(),                        weight: c.weight                    })))));                }                // Find the mode across all the providers, allowing for                // a little drift between block heights                const mode = getFuzzyMode(this.quorum, results);                if (mode === undefined) { return undefined; }                if (mode > this.#height) { this.#height = mode; }                return this.#height;            }            case "getGasPrice":            case "getPriorityFee":            case "estimateGas":                return getMedian(this.quorum, results);            case "getBlock":                // Pending blocks are in the mempool and already                // quite untrustworthy; just grab anything                if ("blockTag" in req && req.blockTag === "pending") {                    return getAnyResult(this.quorum, results);                }                return checkQuorum(this.quorum, results);            case "call":            case "chainId":            case "getBalance":            case "getTransactionCount":            case "getCode":            case "getStorage":            case "getTransaction":            case "getTransactionReceipt":            case "getLogs":                return checkQuorum(this.quorum, results);            case "broadcastTransaction":                return getAnyResult(this.quorum, results);        }        assert(false, "unsupported method", "UNSUPPORTED_OPERATION", {            operation: `_perform(${ stringify((<any>req).method) })`        });    }    async #waitForQuorum(running: Set<RunnerState>, req: PerformActionRequest): Promise<any> {        if (running.size === 0) { throw new Error("no runners?!"); }        // Any promises that are interesting to watch for; an expired stall        // or a successful perform        const interesting: Array<Promise<void>> = [ ];        let newRunners = 0;        for (const runner of running) {            // No responses, yet; keep an eye on it            if (runner.perform) {                interesting.push(runner.perform);            }            // Still stalling...            if (runner.staller) {                interesting.push(runner.staller);                continue;            }            // This runner has already triggered another runner            if (runner.didBump) { continue; }            // Got a response (result or error) or stalled; kick off another runner            runner.didBump = true;            newRunners++;        }        // Check if we have reached quorum on a result (or error)        const value = await this.#checkQuorum(running, req);        if (value !== undefined) {            if (value instanceof Error) { throw value; }            return value;        }        // Add any new runners, because a staller timed out or a result        // or error response came in.        for (let i = 0; i < newRunners; i++) {            this.#addRunner(running, req);        }        // All providers have returned, and we have no result        assert(interesting.length > 0, "quorum not met", "SERVER_ERROR", {            request: "%sub-requests",            info: { request: req, results: Array.from(running).map((r) => stringify(r.result)) }        });        // Wait for someone to either complete its perform or stall out        await Promise.race(interesting);        // This is recursive, but at worst case the depth is 2x the        // number of providers (each has a perform and a staller)        return await this.#waitForQuorum(running, req);    }    async _perform<T = any>(req: PerformActionRequest): Promise<T> {        // Broadcasting a transaction is rare (ish) and already incurs        // a cost on the user, so spamming is safe-ish. Just send it to        // every backend.        if (req.method === "broadcastTransaction") {            // Once any broadcast provides a positive result, use it. No            // need to wait for anyone else            const results: Array<null | TallyResult> = this.#configs.map((c) => null);            const broadcasts = this.#configs.map(async ({ provider, weight }, index) => {                try {                    const result = await provider._perform(req);                    results[index] = Object.assign(normalizeResult({ result }), { weight });                } catch (error: any) {                    results[index] = Object.assign(normalizeResult({ error }), { weight });                }            });            // As each promise finishes...            while (true) {                // Check for a valid broadcast result                const done = <Array<any>>results.filter((r) => (r != null));                for (const { value } of done) {                    if (!(value instanceof Error)) { return value; }                }                // Check for a legit broadcast error (one which we cannot                // recover from; some nodes may return the following red                // herring events:                // - alredy seend (UNKNOWN_ERROR)                // - NONCE_EXPIRED                // - REPLACEMENT_UNDERPRICED                const result = checkQuorum(this.quorum, <Array<any>>results.filter((r) => (r != null)));                if (isError(result, "INSUFFICIENT_FUNDS")) {                    throw result;                }                // Kick off the next provider (if any)                const waiting = broadcasts.filter((b, i) => (results[i] == null));                if (waiting.length === 0) { break; }                await Promise.race(waiting);            }            // Use standard quorum results; any result was returned above,            // so this will find any error that met quorum if any            const result = getAnyResult(this.quorum, <Array<any>>results);            assert(result !== undefined, "problem multi-broadcasting", "SERVER_ERROR", {                request: "%sub-requests",                info: { request: req, results: results.map(stringify) }            })            if (result instanceof Error) { throw result; }            return result;        }        await this.#initialSync();        // Bootstrap enough runners to meet quorum        const running: Set<RunnerState> = new Set();        let inflightQuorum = 0;        while (true) {            const runner = this.#addRunner(running, req);            if (runner == null) { break; }            inflightQuorum += runner.config.weight;            if (inflightQuorum >= this.quorum) { break; }        }        const result = await this.#waitForQuorum(running, req);        // Track requests sent to a provider that are still        // outstanding after quorum has been otherwise found        for (const runner of running) {            if (runner.perform && runner.result == null) {                runner.config.lateResponses++;            }        }        return result;    }    async destroy(): Promise<void> {        for (const { provider } of this.#configs) {            provider.destroy();        }        super.destroy();    }}
 |