| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611 | /** *  A **FallbackProvider** provides resilience, security and performance *  in a way that is customizable and configurable. * *  @_section: api/providers/fallback-provider:Fallback Provider [about-fallback-provider] */import { assert, assertArgument, getBigInt, getNumber, isError } from "../utils/index.js";import { AbstractProvider } from "./abstract-provider.js";import { Network } from "./network.js";const BN_1 = BigInt("1");const BN_2 = BigInt("2");function shuffle(array) {    for (let i = array.length - 1; i > 0; i--) {        const j = Math.floor(Math.random() * (i + 1));        const tmp = array[i];        array[i] = array[j];        array[j] = tmp;    }}function stall(duration) {    return new Promise((resolve) => { setTimeout(resolve, duration); });}function getTime() { return (new Date()).getTime(); }function stringify(value) {    return JSON.stringify(value, (key, value) => {        if (typeof (value) === "bigint") {            return { type: "bigint", value: value.toString() };        }        return value;    });};const defaultConfig = { stallTimeout: 400, priority: 1, weight: 1 };const defaultState = {    blockNumber: -2, requests: 0, lateResponses: 0, errorResponses: 0,    outOfSync: -1, unsupportedEvents: 0, rollingDuration: 0, score: 0,    _network: null, _updateNumber: null, _totalTime: 0,    _lastFatalError: null, _lastFatalErrorTimestamp: 0};async function waitForSync(config, blockNumber) {    while (config.blockNumber < 0 || config.blockNumber < blockNumber) {        if (!config._updateNumber) {            config._updateNumber = (async () => {                try {                    const blockNumber = await config.provider.getBlockNumber();                    if (blockNumber > config.blockNumber) {                        config.blockNumber = blockNumber;                    }                }                catch (error) {                    config.blockNumber = -2;                    config._lastFatalError = error;                    config._lastFatalErrorTimestamp = getTime();                }                config._updateNumber = null;            })();        }        await config._updateNumber;        config.outOfSync++;        if (config._lastFatalError) {            break;        }    }}function _normalize(value) {    if (value == null) {        return "null";    }    if (Array.isArray(value)) {        return "[" + (value.map(_normalize)).join(",") + "]";    }    if (typeof (value) === "object" && typeof (value.toJSON) === "function") {        return _normalize(value.toJSON());    }    switch (typeof (value)) {        case "boolean":        case "symbol":            return value.toString();        case "bigint":        case "number":            return BigInt(value).toString();        case "string":            return JSON.stringify(value);        case "object": {            const keys = Object.keys(value);            keys.sort();            return "{" + keys.map((k) => `${JSON.stringify(k)}:${_normalize(value[k])}`).join(",") + "}";        }    }    console.log("Could not serialize", value);    throw new Error("Hmm...");}function normalizeResult(value) {    if ("error" in value) {        const error = value.error;        return { tag: _normalize(error), value: error };    }    const result = value.result;    return { tag: _normalize(result), value: result };}// This strategy picks the highest weight result, as long as the weight is// equal to or greater than quorumfunction checkQuorum(quorum, results) {    const tally = new Map();    for (const { value, tag, weight } of results) {        const t = tally.get(tag) || { value, weight: 0 };        t.weight += weight;        tally.set(tag, t);    }    let best = null;    for (const r of tally.values()) {        if (r.weight >= quorum && (!best || r.weight > best.weight)) {            best = r;        }    }    if (best) {        return best.value;    }    return undefined;}function getMedian(quorum, results) {    let resultWeight = 0;    const errorMap = new Map();    let bestError = null;    const values = [];    for (const { value, tag, weight } of results) {        if (value instanceof Error) {            const e = errorMap.get(tag) || { value, weight: 0 };            e.weight += weight;            errorMap.set(tag, e);            if (bestError == null || e.weight > bestError.weight) {                bestError = e;            }        }        else {            values.push(BigInt(value));            resultWeight += weight;        }    }    if (resultWeight < quorum) {        // We have quorum for an error        if (bestError && bestError.weight >= quorum) {            return bestError.value;        }        // We do not have quorum for a result        return undefined;    }    // Get the sorted values    values.sort((a, b) => ((a < b) ? -1 : (b > a) ? 1 : 0));    const mid = Math.floor(values.length / 2);    // Odd-length; take the middle value    if (values.length % 2) {        return values[mid];    }    // Even length; take the ceiling of the mean of the center two values    return (values[mid - 1] + values[mid] + BN_1) / BN_2;}function getAnyResult(quorum, results) {    // If any value or error meets quorum, that is our preferred result    const result = checkQuorum(quorum, results);    if (result !== undefined) {        return result;    }    // Otherwise, do we have any result?    for (const r of results) {        if (r.value) {            return r.value;        }    }    // Nope!    return undefined;}function getFuzzyMode(quorum, results) {    if (quorum === 1) {        return getNumber(getMedian(quorum, results), "%internal");    }    const tally = new Map();    const add = (result, weight) => {        const t = tally.get(result) || { result, weight: 0 };        t.weight += weight;        tally.set(result, t);    };    for (const { weight, value } of results) {        const r = getNumber(value);        add(r - 1, weight);        add(r, weight);        add(r + 1, weight);    }    let bestWeight = 0;    let bestResult = undefined;    for (const { weight, result } of tally.values()) {        // Use this result, if this result meets quorum and has either:        // - a better weight        // - or equal weight, but the result is larger        if (weight >= quorum && (weight > bestWeight || (bestResult != null && weight === bestWeight && result > bestResult))) {            bestWeight = weight;            bestResult = result;        }    }    return bestResult;}/** *  A **FallbackProvider** manages several [[Providers]] providing *  resilience by switching between slow or misbehaving nodes, security *  by requiring multiple backends to aggree and performance by allowing *  faster backends to respond earlier. * */export class FallbackProvider extends AbstractProvider {    /**     *  The number of backends that must agree on a value before it is     *  accpeted.     */    quorum;    /**     *  @_ignore:     */    eventQuorum;    /**     *  @_ignore:     */    eventWorkers;    #configs;    #height;    #initialSyncPromise;    /**     *  Creates a new **FallbackProvider** with %%providers%% connected to     *  %%network%%.     *     *  If a [[Provider]] is included in %%providers%%, defaults are used     *  for the configuration.     */    constructor(providers, network, options) {        super(network, options);        this.#configs = providers.map((p) => {            if (p instanceof AbstractProvider) {                return Object.assign({ provider: p }, defaultConfig, defaultState);            }            else {                return Object.assign({}, defaultConfig, p, defaultState);            }        });        this.#height = -2;        this.#initialSyncPromise = null;        if (options && options.quorum != null) {            this.quorum = options.quorum;        }        else {            this.quorum = Math.ceil(this.#configs.reduce((accum, config) => {                accum += config.weight;                return accum;            }, 0) / 2);        }        this.eventQuorum = 1;        this.eventWorkers = 1;        assertArgument(this.quorum <= this.#configs.reduce((a, c) => (a + c.weight), 0), "quorum exceed provider weight", "quorum", this.quorum);    }    get providerConfigs() {        return this.#configs.map((c) => {            const result = Object.assign({}, c);            for (const key in result) {                if (key[0] === "_") {                    delete result[key];                }            }            return result;        });    }    async _detectNetwork() {        return Network.from(getBigInt(await this._perform({ method: "chainId" })));    }    // @TODO: Add support to select providers to be the event subscriber    //_getSubscriber(sub: Subscription): Subscriber {    //    throw new Error("@TODO");    //}    /**     *  Transforms a %%req%% into the correct method call on %%provider%%.     */    async _translatePerform(provider, req) {        switch (req.method) {            case "broadcastTransaction":                return await provider.broadcastTransaction(req.signedTransaction);            case "call":                return await provider.call(Object.assign({}, req.transaction, { blockTag: req.blockTag }));            case "chainId":                return (await provider.getNetwork()).chainId;            case "estimateGas":                return await provider.estimateGas(req.transaction);            case "getBalance":                return await provider.getBalance(req.address, req.blockTag);            case "getBlock": {                const block = ("blockHash" in req) ? req.blockHash : req.blockTag;                return await provider.getBlock(block, req.includeTransactions);            }            case "getBlockNumber":                return await provider.getBlockNumber();            case "getCode":                return await provider.getCode(req.address, req.blockTag);            case "getGasPrice":                return (await provider.getFeeData()).gasPrice;            case "getPriorityFee":                return (await provider.getFeeData()).maxPriorityFeePerGas;            case "getLogs":                return await provider.getLogs(req.filter);            case "getStorage":                return await provider.getStorage(req.address, req.position, req.blockTag);            case "getTransaction":                return await provider.getTransaction(req.hash);            case "getTransactionCount":                return await provider.getTransactionCount(req.address, req.blockTag);            case "getTransactionReceipt":                return await provider.getTransactionReceipt(req.hash);            case "getTransactionResult":                return await provider.getTransactionResult(req.hash);        }    }    // Grab the next (random) config that is not already part of    // the running set    #getNextConfig(running) {        // @TODO: Maybe do a check here to favour (heavily) providers that        //        do not require waitForSync and disfavour providers that        //        seem down-ish or are behaving slowly        const configs = Array.from(running).map((r) => r.config);        // Shuffle the states, sorted by priority        const allConfigs = this.#configs.slice();        shuffle(allConfigs);        allConfigs.sort((a, b) => (a.priority - b.priority));        for (const config of allConfigs) {            if (config._lastFatalError) {                continue;            }            if (configs.indexOf(config) === -1) {                return config;            }        }        return null;    }    // Adds a new runner (if available) to running.    #addRunner(running, req) {        const config = this.#getNextConfig(running);        // No runners available        if (config == null) {            return null;        }        // Create a new runner        const runner = {            config, result: null, didBump: false,            perform: null, staller: null        };        const now = getTime();        // Start performing this operation        runner.perform = (async () => {            try {                config.requests++;                const result = await this._translatePerform(config.provider, req);                runner.result = { result };            }            catch (error) {                config.errorResponses++;                runner.result = { error };            }            const dt = (getTime() - now);            config._totalTime += dt;            config.rollingDuration = 0.95 * config.rollingDuration + 0.05 * dt;            runner.perform = null;        })();        // Start a staller; when this times out, it's time to force        // kicking off another runner because we are taking too long        runner.staller = (async () => {            await stall(config.stallTimeout);            runner.staller = null;        })();        running.add(runner);        return runner;    }    // Initializes the blockNumber and network for each runner and    // blocks until initialized    async #initialSync() {        let initialSync = this.#initialSyncPromise;        if (!initialSync) {            const promises = [];            this.#configs.forEach((config) => {                promises.push((async () => {                    await waitForSync(config, 0);                    if (!config._lastFatalError) {                        config._network = await config.provider.getNetwork();                    }                })());            });            this.#initialSyncPromise = initialSync = (async () => {                // Wait for all providers to have a block number and network                await Promise.all(promises);                // Check all the networks match                let chainId = null;                for (const config of this.#configs) {                    if (config._lastFatalError) {                        continue;                    }                    const network = (config._network);                    if (chainId == null) {                        chainId = network.chainId;                    }                    else if (network.chainId !== chainId) {                        assert(false, "cannot mix providers on different networks", "UNSUPPORTED_OPERATION", {                            operation: "new FallbackProvider"                        });                    }                }            })();        }        await initialSync;    }    async #checkQuorum(running, req) {        // Get all the result objects        const results = [];        for (const runner of running) {            if (runner.result != null) {                const { tag, value } = normalizeResult(runner.result);                results.push({ tag, value, weight: runner.config.weight });            }        }        // Are there enough results to event meet quorum?        if (results.reduce((a, r) => (a + r.weight), 0) < this.quorum) {            return undefined;        }        switch (req.method) {            case "getBlockNumber": {                // We need to get the bootstrap block height                if (this.#height === -2) {                    this.#height = Math.ceil(getNumber(getMedian(this.quorum, this.#configs.filter((c) => (!c._lastFatalError)).map((c) => ({                        value: c.blockNumber,                        tag: getNumber(c.blockNumber).toString(),                        weight: c.weight                    })))));                }                // Find the mode across all the providers, allowing for                // a little drift between block heights                const mode = getFuzzyMode(this.quorum, results);                if (mode === undefined) {                    return undefined;                }                if (mode > this.#height) {                    this.#height = mode;                }                return this.#height;            }            case "getGasPrice":            case "getPriorityFee":            case "estimateGas":                return getMedian(this.quorum, results);            case "getBlock":                // Pending blocks are in the mempool and already                // quite untrustworthy; just grab anything                if ("blockTag" in req && req.blockTag === "pending") {                    return getAnyResult(this.quorum, results);                }                return checkQuorum(this.quorum, results);            case "call":            case "chainId":            case "getBalance":            case "getTransactionCount":            case "getCode":            case "getStorage":            case "getTransaction":            case "getTransactionReceipt":            case "getLogs":                return checkQuorum(this.quorum, results);            case "broadcastTransaction":                return getAnyResult(this.quorum, results);        }        assert(false, "unsupported method", "UNSUPPORTED_OPERATION", {            operation: `_perform(${stringify(req.method)})`        });    }    async #waitForQuorum(running, req) {        if (running.size === 0) {            throw new Error("no runners?!");        }        // Any promises that are interesting to watch for; an expired stall        // or a successful perform        const interesting = [];        let newRunners = 0;        for (const runner of running) {            // No responses, yet; keep an eye on it            if (runner.perform) {                interesting.push(runner.perform);            }            // Still stalling...            if (runner.staller) {                interesting.push(runner.staller);                continue;            }            // This runner has already triggered another runner            if (runner.didBump) {                continue;            }            // Got a response (result or error) or stalled; kick off another runner            runner.didBump = true;            newRunners++;        }        // Check if we have reached quorum on a result (or error)        const value = await this.#checkQuorum(running, req);        if (value !== undefined) {            if (value instanceof Error) {                throw value;            }            return value;        }        // Add any new runners, because a staller timed out or a result        // or error response came in.        for (let i = 0; i < newRunners; i++) {            this.#addRunner(running, req);        }        // All providers have returned, and we have no result        assert(interesting.length > 0, "quorum not met", "SERVER_ERROR", {            request: "%sub-requests",            info: { request: req, results: Array.from(running).map((r) => stringify(r.result)) }        });        // Wait for someone to either complete its perform or stall out        await Promise.race(interesting);        // This is recursive, but at worst case the depth is 2x the        // number of providers (each has a perform and a staller)        return await this.#waitForQuorum(running, req);    }    async _perform(req) {        // Broadcasting a transaction is rare (ish) and already incurs        // a cost on the user, so spamming is safe-ish. Just send it to        // every backend.        if (req.method === "broadcastTransaction") {            // Once any broadcast provides a positive result, use it. No            // need to wait for anyone else            const results = this.#configs.map((c) => null);            const broadcasts = this.#configs.map(async ({ provider, weight }, index) => {                try {                    const result = await provider._perform(req);                    results[index] = Object.assign(normalizeResult({ result }), { weight });                }                catch (error) {                    results[index] = Object.assign(normalizeResult({ error }), { weight });                }            });            // As each promise finishes...            while (true) {                // Check for a valid broadcast result                const done = results.filter((r) => (r != null));                for (const { value } of done) {                    if (!(value instanceof Error)) {                        return value;                    }                }                // Check for a legit broadcast error (one which we cannot                // recover from; some nodes may return the following red                // herring events:                // - alredy seend (UNKNOWN_ERROR)                // - NONCE_EXPIRED                // - REPLACEMENT_UNDERPRICED                const result = checkQuorum(this.quorum, results.filter((r) => (r != null)));                if (isError(result, "INSUFFICIENT_FUNDS")) {                    throw result;                }                // Kick off the next provider (if any)                const waiting = broadcasts.filter((b, i) => (results[i] == null));                if (waiting.length === 0) {                    break;                }                await Promise.race(waiting);            }            // Use standard quorum results; any result was returned above,            // so this will find any error that met quorum if any            const result = getAnyResult(this.quorum, results);            assert(result !== undefined, "problem multi-broadcasting", "SERVER_ERROR", {                request: "%sub-requests",                info: { request: req, results: results.map(stringify) }            });            if (result instanceof Error) {                throw result;            }            return result;        }        await this.#initialSync();        // Bootstrap enough runners to meet quorum        const running = new Set();        let inflightQuorum = 0;        while (true) {            const runner = this.#addRunner(running, req);            if (runner == null) {                break;            }            inflightQuorum += runner.config.weight;            if (inflightQuorum >= this.quorum) {                break;            }        }        const result = await this.#waitForQuorum(running, req);        // Track requests sent to a provider that are still        // outstanding after quorum has been otherwise found        for (const runner of running) {            if (runner.perform && runner.result == null) {                runner.config.lateResponses++;            }        }        return result;    }    async destroy() {        for (const { provider } of this.#configs) {            provider.destroy();        }        super.destroy();    }}//# sourceMappingURL=provider-fallback.js.map
 |