subscriber-polling.js 9.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. exports.PollingEventSubscriber = exports.PollingTransactionSubscriber = exports.PollingOrphanSubscriber = exports.PollingBlockTagSubscriber = exports.OnBlockSubscriber = exports.PollingBlockSubscriber = exports.getPollingSubscriber = void 0;
  4. const index_js_1 = require("../utils/index.js");
  5. function copy(obj) {
  6. return JSON.parse(JSON.stringify(obj));
  7. }
  8. /**
  9. * Return the polling subscriber for common events.
  10. *
  11. * @_docloc: api/providers/abstract-provider
  12. */
  13. function getPollingSubscriber(provider, event) {
  14. if (event === "block") {
  15. return new PollingBlockSubscriber(provider);
  16. }
  17. if ((0, index_js_1.isHexString)(event, 32)) {
  18. return new PollingTransactionSubscriber(provider, event);
  19. }
  20. (0, index_js_1.assert)(false, "unsupported polling event", "UNSUPPORTED_OPERATION", {
  21. operation: "getPollingSubscriber", info: { event }
  22. });
  23. }
  24. exports.getPollingSubscriber = getPollingSubscriber;
  25. // @TODO: refactor this
  26. /**
  27. * A **PollingBlockSubscriber** polls at a regular interval for a change
  28. * in the block number.
  29. *
  30. * @_docloc: api/providers/abstract-provider
  31. */
  32. class PollingBlockSubscriber {
  33. #provider;
  34. #poller;
  35. #interval;
  36. // The most recent block we have scanned for events. The value -2
  37. // indicates we still need to fetch an initial block number
  38. #blockNumber;
  39. /**
  40. * Create a new **PollingBlockSubscriber** attached to %%provider%%.
  41. */
  42. constructor(provider) {
  43. this.#provider = provider;
  44. this.#poller = null;
  45. this.#interval = 4000;
  46. this.#blockNumber = -2;
  47. }
  48. /**
  49. * The polling interval.
  50. */
  51. get pollingInterval() { return this.#interval; }
  52. set pollingInterval(value) { this.#interval = value; }
  53. async #poll() {
  54. try {
  55. const blockNumber = await this.#provider.getBlockNumber();
  56. // Bootstrap poll to setup our initial block number
  57. if (this.#blockNumber === -2) {
  58. this.#blockNumber = blockNumber;
  59. return;
  60. }
  61. // @TODO: Put a cap on the maximum number of events per loop?
  62. if (blockNumber !== this.#blockNumber) {
  63. for (let b = this.#blockNumber + 1; b <= blockNumber; b++) {
  64. // We have been stopped
  65. if (this.#poller == null) {
  66. return;
  67. }
  68. await this.#provider.emit("block", b);
  69. }
  70. this.#blockNumber = blockNumber;
  71. }
  72. }
  73. catch (error) {
  74. // @TODO: Minor bump, add an "error" event to let subscribers
  75. // know things went awry.
  76. //console.log(error);
  77. }
  78. // We have been stopped
  79. if (this.#poller == null) {
  80. return;
  81. }
  82. this.#poller = this.#provider._setTimeout(this.#poll.bind(this), this.#interval);
  83. }
  84. start() {
  85. if (this.#poller) {
  86. return;
  87. }
  88. this.#poller = this.#provider._setTimeout(this.#poll.bind(this), this.#interval);
  89. this.#poll();
  90. }
  91. stop() {
  92. if (!this.#poller) {
  93. return;
  94. }
  95. this.#provider._clearTimeout(this.#poller);
  96. this.#poller = null;
  97. }
  98. pause(dropWhilePaused) {
  99. this.stop();
  100. if (dropWhilePaused) {
  101. this.#blockNumber = -2;
  102. }
  103. }
  104. resume() {
  105. this.start();
  106. }
  107. }
  108. exports.PollingBlockSubscriber = PollingBlockSubscriber;
  109. /**
  110. * An **OnBlockSubscriber** can be sub-classed, with a [[_poll]]
  111. * implmentation which will be called on every new block.
  112. *
  113. * @_docloc: api/providers/abstract-provider
  114. */
  115. class OnBlockSubscriber {
  116. #provider;
  117. #poll;
  118. #running;
  119. /**
  120. * Create a new **OnBlockSubscriber** attached to %%provider%%.
  121. */
  122. constructor(provider) {
  123. this.#provider = provider;
  124. this.#running = false;
  125. this.#poll = (blockNumber) => {
  126. this._poll(blockNumber, this.#provider);
  127. };
  128. }
  129. /**
  130. * Called on every new block.
  131. */
  132. async _poll(blockNumber, provider) {
  133. throw new Error("sub-classes must override this");
  134. }
  135. start() {
  136. if (this.#running) {
  137. return;
  138. }
  139. this.#running = true;
  140. this.#poll(-2);
  141. this.#provider.on("block", this.#poll);
  142. }
  143. stop() {
  144. if (!this.#running) {
  145. return;
  146. }
  147. this.#running = false;
  148. this.#provider.off("block", this.#poll);
  149. }
  150. pause(dropWhilePaused) { this.stop(); }
  151. resume() { this.start(); }
  152. }
  153. exports.OnBlockSubscriber = OnBlockSubscriber;
  154. class PollingBlockTagSubscriber extends OnBlockSubscriber {
  155. #tag;
  156. #lastBlock;
  157. constructor(provider, tag) {
  158. super(provider);
  159. this.#tag = tag;
  160. this.#lastBlock = -2;
  161. }
  162. pause(dropWhilePaused) {
  163. if (dropWhilePaused) {
  164. this.#lastBlock = -2;
  165. }
  166. super.pause(dropWhilePaused);
  167. }
  168. async _poll(blockNumber, provider) {
  169. const block = await provider.getBlock(this.#tag);
  170. if (block == null) {
  171. return;
  172. }
  173. if (this.#lastBlock === -2) {
  174. this.#lastBlock = block.number;
  175. }
  176. else if (block.number > this.#lastBlock) {
  177. provider.emit(this.#tag, block.number);
  178. this.#lastBlock = block.number;
  179. }
  180. }
  181. }
  182. exports.PollingBlockTagSubscriber = PollingBlockTagSubscriber;
  183. /**
  184. * @_ignore:
  185. *
  186. * @_docloc: api/providers/abstract-provider
  187. */
  188. class PollingOrphanSubscriber extends OnBlockSubscriber {
  189. #filter;
  190. constructor(provider, filter) {
  191. super(provider);
  192. this.#filter = copy(filter);
  193. }
  194. async _poll(blockNumber, provider) {
  195. throw new Error("@TODO");
  196. console.log(this.#filter);
  197. }
  198. }
  199. exports.PollingOrphanSubscriber = PollingOrphanSubscriber;
  200. /**
  201. * A **PollingTransactionSubscriber** will poll for a given transaction
  202. * hash for its receipt.
  203. *
  204. * @_docloc: api/providers/abstract-provider
  205. */
  206. class PollingTransactionSubscriber extends OnBlockSubscriber {
  207. #hash;
  208. /**
  209. * Create a new **PollingTransactionSubscriber** attached to
  210. * %%provider%%, listening for %%hash%%.
  211. */
  212. constructor(provider, hash) {
  213. super(provider);
  214. this.#hash = hash;
  215. }
  216. async _poll(blockNumber, provider) {
  217. const tx = await provider.getTransactionReceipt(this.#hash);
  218. if (tx) {
  219. provider.emit(this.#hash, tx);
  220. }
  221. }
  222. }
  223. exports.PollingTransactionSubscriber = PollingTransactionSubscriber;
  224. /**
  225. * A **PollingEventSubscriber** will poll for a given filter for its logs.
  226. *
  227. * @_docloc: api/providers/abstract-provider
  228. */
  229. class PollingEventSubscriber {
  230. #provider;
  231. #filter;
  232. #poller;
  233. #running;
  234. // The most recent block we have scanned for events. The value -2
  235. // indicates we still need to fetch an initial block number
  236. #blockNumber;
  237. /**
  238. * Create a new **PollingTransactionSubscriber** attached to
  239. * %%provider%%, listening for %%filter%%.
  240. */
  241. constructor(provider, filter) {
  242. this.#provider = provider;
  243. this.#filter = copy(filter);
  244. this.#poller = this.#poll.bind(this);
  245. this.#running = false;
  246. this.#blockNumber = -2;
  247. }
  248. async #poll(blockNumber) {
  249. // The initial block hasn't been determined yet
  250. if (this.#blockNumber === -2) {
  251. return;
  252. }
  253. const filter = copy(this.#filter);
  254. filter.fromBlock = this.#blockNumber + 1;
  255. filter.toBlock = blockNumber;
  256. const logs = await this.#provider.getLogs(filter);
  257. // No logs could just mean the node has not indexed them yet,
  258. // so we keep a sliding window of 60 blocks to keep scanning
  259. if (logs.length === 0) {
  260. if (this.#blockNumber < blockNumber - 60) {
  261. this.#blockNumber = blockNumber - 60;
  262. }
  263. return;
  264. }
  265. for (const log of logs) {
  266. this.#provider.emit(this.#filter, log);
  267. // Only advance the block number when logs were found to
  268. // account for networks (like BNB and Polygon) which may
  269. // sacrifice event consistency for block event speed
  270. this.#blockNumber = log.blockNumber;
  271. }
  272. }
  273. start() {
  274. if (this.#running) {
  275. return;
  276. }
  277. this.#running = true;
  278. if (this.#blockNumber === -2) {
  279. this.#provider.getBlockNumber().then((blockNumber) => {
  280. this.#blockNumber = blockNumber;
  281. });
  282. }
  283. this.#provider.on("block", this.#poller);
  284. }
  285. stop() {
  286. if (!this.#running) {
  287. return;
  288. }
  289. this.#running = false;
  290. this.#provider.off("block", this.#poller);
  291. }
  292. pause(dropWhilePaused) {
  293. this.stop();
  294. if (dropWhilePaused) {
  295. this.#blockNumber = -2;
  296. }
  297. }
  298. resume() {
  299. this.start();
  300. }
  301. }
  302. exports.PollingEventSubscriber = PollingEventSubscriber;
  303. //# sourceMappingURL=subscriber-polling.js.map