123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293 |
- import { assert, isHexString } from "../utils/index.js";
- function copy(obj) {
- return JSON.parse(JSON.stringify(obj));
- }
- /**
- * Return the polling subscriber for common events.
- *
- * @_docloc: api/providers/abstract-provider
- */
- export function getPollingSubscriber(provider, event) {
- if (event === "block") {
- return new PollingBlockSubscriber(provider);
- }
- if (isHexString(event, 32)) {
- return new PollingTransactionSubscriber(provider, event);
- }
- assert(false, "unsupported polling event", "UNSUPPORTED_OPERATION", {
- operation: "getPollingSubscriber", info: { event }
- });
- }
- // @TODO: refactor this
- /**
- * A **PollingBlockSubscriber** polls at a regular interval for a change
- * in the block number.
- *
- * @_docloc: api/providers/abstract-provider
- */
- export class PollingBlockSubscriber {
- #provider;
- #poller;
- #interval;
- // The most recent block we have scanned for events. The value -2
- // indicates we still need to fetch an initial block number
- #blockNumber;
- /**
- * Create a new **PollingBlockSubscriber** attached to %%provider%%.
- */
- constructor(provider) {
- this.#provider = provider;
- this.#poller = null;
- this.#interval = 4000;
- this.#blockNumber = -2;
- }
- /**
- * The polling interval.
- */
- get pollingInterval() { return this.#interval; }
- set pollingInterval(value) { this.#interval = value; }
- async #poll() {
- try {
- const blockNumber = await this.#provider.getBlockNumber();
- // Bootstrap poll to setup our initial block number
- if (this.#blockNumber === -2) {
- this.#blockNumber = blockNumber;
- return;
- }
- // @TODO: Put a cap on the maximum number of events per loop?
- if (blockNumber !== this.#blockNumber) {
- for (let b = this.#blockNumber + 1; b <= blockNumber; b++) {
- // We have been stopped
- if (this.#poller == null) {
- return;
- }
- await this.#provider.emit("block", b);
- }
- this.#blockNumber = blockNumber;
- }
- }
- catch (error) {
- // @TODO: Minor bump, add an "error" event to let subscribers
- // know things went awry.
- //console.log(error);
- }
- // We have been stopped
- if (this.#poller == null) {
- return;
- }
- this.#poller = this.#provider._setTimeout(this.#poll.bind(this), this.#interval);
- }
- start() {
- if (this.#poller) {
- return;
- }
- this.#poller = this.#provider._setTimeout(this.#poll.bind(this), this.#interval);
- this.#poll();
- }
- stop() {
- if (!this.#poller) {
- return;
- }
- this.#provider._clearTimeout(this.#poller);
- this.#poller = null;
- }
- pause(dropWhilePaused) {
- this.stop();
- if (dropWhilePaused) {
- this.#blockNumber = -2;
- }
- }
- resume() {
- this.start();
- }
- }
- /**
- * An **OnBlockSubscriber** can be sub-classed, with a [[_poll]]
- * implmentation which will be called on every new block.
- *
- * @_docloc: api/providers/abstract-provider
- */
- export class OnBlockSubscriber {
- #provider;
- #poll;
- #running;
- /**
- * Create a new **OnBlockSubscriber** attached to %%provider%%.
- */
- constructor(provider) {
- this.#provider = provider;
- this.#running = false;
- this.#poll = (blockNumber) => {
- this._poll(blockNumber, this.#provider);
- };
- }
- /**
- * Called on every new block.
- */
- async _poll(blockNumber, provider) {
- throw new Error("sub-classes must override this");
- }
- start() {
- if (this.#running) {
- return;
- }
- this.#running = true;
- this.#poll(-2);
- this.#provider.on("block", this.#poll);
- }
- stop() {
- if (!this.#running) {
- return;
- }
- this.#running = false;
- this.#provider.off("block", this.#poll);
- }
- pause(dropWhilePaused) { this.stop(); }
- resume() { this.start(); }
- }
- export class PollingBlockTagSubscriber extends OnBlockSubscriber {
- #tag;
- #lastBlock;
- constructor(provider, tag) {
- super(provider);
- this.#tag = tag;
- this.#lastBlock = -2;
- }
- pause(dropWhilePaused) {
- if (dropWhilePaused) {
- this.#lastBlock = -2;
- }
- super.pause(dropWhilePaused);
- }
- async _poll(blockNumber, provider) {
- const block = await provider.getBlock(this.#tag);
- if (block == null) {
- return;
- }
- if (this.#lastBlock === -2) {
- this.#lastBlock = block.number;
- }
- else if (block.number > this.#lastBlock) {
- provider.emit(this.#tag, block.number);
- this.#lastBlock = block.number;
- }
- }
- }
- /**
- * @_ignore:
- *
- * @_docloc: api/providers/abstract-provider
- */
- export class PollingOrphanSubscriber extends OnBlockSubscriber {
- #filter;
- constructor(provider, filter) {
- super(provider);
- this.#filter = copy(filter);
- }
- async _poll(blockNumber, provider) {
- throw new Error("@TODO");
- console.log(this.#filter);
- }
- }
- /**
- * A **PollingTransactionSubscriber** will poll for a given transaction
- * hash for its receipt.
- *
- * @_docloc: api/providers/abstract-provider
- */
- export class PollingTransactionSubscriber extends OnBlockSubscriber {
- #hash;
- /**
- * Create a new **PollingTransactionSubscriber** attached to
- * %%provider%%, listening for %%hash%%.
- */
- constructor(provider, hash) {
- super(provider);
- this.#hash = hash;
- }
- async _poll(blockNumber, provider) {
- const tx = await provider.getTransactionReceipt(this.#hash);
- if (tx) {
- provider.emit(this.#hash, tx);
- }
- }
- }
- /**
- * A **PollingEventSubscriber** will poll for a given filter for its logs.
- *
- * @_docloc: api/providers/abstract-provider
- */
- export class PollingEventSubscriber {
- #provider;
- #filter;
- #poller;
- #running;
- // The most recent block we have scanned for events. The value -2
- // indicates we still need to fetch an initial block number
- #blockNumber;
- /**
- * Create a new **PollingTransactionSubscriber** attached to
- * %%provider%%, listening for %%filter%%.
- */
- constructor(provider, filter) {
- this.#provider = provider;
- this.#filter = copy(filter);
- this.#poller = this.#poll.bind(this);
- this.#running = false;
- this.#blockNumber = -2;
- }
- async #poll(blockNumber) {
- // The initial block hasn't been determined yet
- if (this.#blockNumber === -2) {
- return;
- }
- const filter = copy(this.#filter);
- filter.fromBlock = this.#blockNumber + 1;
- filter.toBlock = blockNumber;
- const logs = await this.#provider.getLogs(filter);
- // No logs could just mean the node has not indexed them yet,
- // so we keep a sliding window of 60 blocks to keep scanning
- if (logs.length === 0) {
- if (this.#blockNumber < blockNumber - 60) {
- this.#blockNumber = blockNumber - 60;
- }
- return;
- }
- for (const log of logs) {
- this.#provider.emit(this.#filter, log);
- // Only advance the block number when logs were found to
- // account for networks (like BNB and Polygon) which may
- // sacrifice event consistency for block event speed
- this.#blockNumber = log.blockNumber;
- }
- }
- start() {
- if (this.#running) {
- return;
- }
- this.#running = true;
- if (this.#blockNumber === -2) {
- this.#provider.getBlockNumber().then((blockNumber) => {
- this.#blockNumber = blockNumber;
- });
- }
- this.#provider.on("block", this.#poller);
- }
- stop() {
- if (!this.#running) {
- return;
- }
- this.#running = false;
- this.#provider.off("block", this.#poller);
- }
- pause(dropWhilePaused) {
- this.stop();
- if (dropWhilePaused) {
- this.#blockNumber = -2;
- }
- }
- resume() {
- this.start();
- }
- }
- //# sourceMappingURL=subscriber-polling.js.map
|