subscriber-polling.js 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293
  1. import { assert, isHexString } from "../utils/index.js";
  2. function copy(obj) {
  3. return JSON.parse(JSON.stringify(obj));
  4. }
  5. /**
  6. * Return the polling subscriber for common events.
  7. *
  8. * @_docloc: api/providers/abstract-provider
  9. */
  10. export function getPollingSubscriber(provider, event) {
  11. if (event === "block") {
  12. return new PollingBlockSubscriber(provider);
  13. }
  14. if (isHexString(event, 32)) {
  15. return new PollingTransactionSubscriber(provider, event);
  16. }
  17. assert(false, "unsupported polling event", "UNSUPPORTED_OPERATION", {
  18. operation: "getPollingSubscriber", info: { event }
  19. });
  20. }
  21. // @TODO: refactor this
  22. /**
  23. * A **PollingBlockSubscriber** polls at a regular interval for a change
  24. * in the block number.
  25. *
  26. * @_docloc: api/providers/abstract-provider
  27. */
  28. export class PollingBlockSubscriber {
  29. #provider;
  30. #poller;
  31. #interval;
  32. // The most recent block we have scanned for events. The value -2
  33. // indicates we still need to fetch an initial block number
  34. #blockNumber;
  35. /**
  36. * Create a new **PollingBlockSubscriber** attached to %%provider%%.
  37. */
  38. constructor(provider) {
  39. this.#provider = provider;
  40. this.#poller = null;
  41. this.#interval = 4000;
  42. this.#blockNumber = -2;
  43. }
  44. /**
  45. * The polling interval.
  46. */
  47. get pollingInterval() { return this.#interval; }
  48. set pollingInterval(value) { this.#interval = value; }
  49. async #poll() {
  50. try {
  51. const blockNumber = await this.#provider.getBlockNumber();
  52. // Bootstrap poll to setup our initial block number
  53. if (this.#blockNumber === -2) {
  54. this.#blockNumber = blockNumber;
  55. return;
  56. }
  57. // @TODO: Put a cap on the maximum number of events per loop?
  58. if (blockNumber !== this.#blockNumber) {
  59. for (let b = this.#blockNumber + 1; b <= blockNumber; b++) {
  60. // We have been stopped
  61. if (this.#poller == null) {
  62. return;
  63. }
  64. await this.#provider.emit("block", b);
  65. }
  66. this.#blockNumber = blockNumber;
  67. }
  68. }
  69. catch (error) {
  70. // @TODO: Minor bump, add an "error" event to let subscribers
  71. // know things went awry.
  72. //console.log(error);
  73. }
  74. // We have been stopped
  75. if (this.#poller == null) {
  76. return;
  77. }
  78. this.#poller = this.#provider._setTimeout(this.#poll.bind(this), this.#interval);
  79. }
  80. start() {
  81. if (this.#poller) {
  82. return;
  83. }
  84. this.#poller = this.#provider._setTimeout(this.#poll.bind(this), this.#interval);
  85. this.#poll();
  86. }
  87. stop() {
  88. if (!this.#poller) {
  89. return;
  90. }
  91. this.#provider._clearTimeout(this.#poller);
  92. this.#poller = null;
  93. }
  94. pause(dropWhilePaused) {
  95. this.stop();
  96. if (dropWhilePaused) {
  97. this.#blockNumber = -2;
  98. }
  99. }
  100. resume() {
  101. this.start();
  102. }
  103. }
  104. /**
  105. * An **OnBlockSubscriber** can be sub-classed, with a [[_poll]]
  106. * implmentation which will be called on every new block.
  107. *
  108. * @_docloc: api/providers/abstract-provider
  109. */
  110. export class OnBlockSubscriber {
  111. #provider;
  112. #poll;
  113. #running;
  114. /**
  115. * Create a new **OnBlockSubscriber** attached to %%provider%%.
  116. */
  117. constructor(provider) {
  118. this.#provider = provider;
  119. this.#running = false;
  120. this.#poll = (blockNumber) => {
  121. this._poll(blockNumber, this.#provider);
  122. };
  123. }
  124. /**
  125. * Called on every new block.
  126. */
  127. async _poll(blockNumber, provider) {
  128. throw new Error("sub-classes must override this");
  129. }
  130. start() {
  131. if (this.#running) {
  132. return;
  133. }
  134. this.#running = true;
  135. this.#poll(-2);
  136. this.#provider.on("block", this.#poll);
  137. }
  138. stop() {
  139. if (!this.#running) {
  140. return;
  141. }
  142. this.#running = false;
  143. this.#provider.off("block", this.#poll);
  144. }
  145. pause(dropWhilePaused) { this.stop(); }
  146. resume() { this.start(); }
  147. }
  148. export class PollingBlockTagSubscriber extends OnBlockSubscriber {
  149. #tag;
  150. #lastBlock;
  151. constructor(provider, tag) {
  152. super(provider);
  153. this.#tag = tag;
  154. this.#lastBlock = -2;
  155. }
  156. pause(dropWhilePaused) {
  157. if (dropWhilePaused) {
  158. this.#lastBlock = -2;
  159. }
  160. super.pause(dropWhilePaused);
  161. }
  162. async _poll(blockNumber, provider) {
  163. const block = await provider.getBlock(this.#tag);
  164. if (block == null) {
  165. return;
  166. }
  167. if (this.#lastBlock === -2) {
  168. this.#lastBlock = block.number;
  169. }
  170. else if (block.number > this.#lastBlock) {
  171. provider.emit(this.#tag, block.number);
  172. this.#lastBlock = block.number;
  173. }
  174. }
  175. }
  176. /**
  177. * @_ignore:
  178. *
  179. * @_docloc: api/providers/abstract-provider
  180. */
  181. export class PollingOrphanSubscriber extends OnBlockSubscriber {
  182. #filter;
  183. constructor(provider, filter) {
  184. super(provider);
  185. this.#filter = copy(filter);
  186. }
  187. async _poll(blockNumber, provider) {
  188. throw new Error("@TODO");
  189. console.log(this.#filter);
  190. }
  191. }
  192. /**
  193. * A **PollingTransactionSubscriber** will poll for a given transaction
  194. * hash for its receipt.
  195. *
  196. * @_docloc: api/providers/abstract-provider
  197. */
  198. export class PollingTransactionSubscriber extends OnBlockSubscriber {
  199. #hash;
  200. /**
  201. * Create a new **PollingTransactionSubscriber** attached to
  202. * %%provider%%, listening for %%hash%%.
  203. */
  204. constructor(provider, hash) {
  205. super(provider);
  206. this.#hash = hash;
  207. }
  208. async _poll(blockNumber, provider) {
  209. const tx = await provider.getTransactionReceipt(this.#hash);
  210. if (tx) {
  211. provider.emit(this.#hash, tx);
  212. }
  213. }
  214. }
  215. /**
  216. * A **PollingEventSubscriber** will poll for a given filter for its logs.
  217. *
  218. * @_docloc: api/providers/abstract-provider
  219. */
  220. export class PollingEventSubscriber {
  221. #provider;
  222. #filter;
  223. #poller;
  224. #running;
  225. // The most recent block we have scanned for events. The value -2
  226. // indicates we still need to fetch an initial block number
  227. #blockNumber;
  228. /**
  229. * Create a new **PollingTransactionSubscriber** attached to
  230. * %%provider%%, listening for %%filter%%.
  231. */
  232. constructor(provider, filter) {
  233. this.#provider = provider;
  234. this.#filter = copy(filter);
  235. this.#poller = this.#poll.bind(this);
  236. this.#running = false;
  237. this.#blockNumber = -2;
  238. }
  239. async #poll(blockNumber) {
  240. // The initial block hasn't been determined yet
  241. if (this.#blockNumber === -2) {
  242. return;
  243. }
  244. const filter = copy(this.#filter);
  245. filter.fromBlock = this.#blockNumber + 1;
  246. filter.toBlock = blockNumber;
  247. const logs = await this.#provider.getLogs(filter);
  248. // No logs could just mean the node has not indexed them yet,
  249. // so we keep a sliding window of 60 blocks to keep scanning
  250. if (logs.length === 0) {
  251. if (this.#blockNumber < blockNumber - 60) {
  252. this.#blockNumber = blockNumber - 60;
  253. }
  254. return;
  255. }
  256. for (const log of logs) {
  257. this.#provider.emit(this.#filter, log);
  258. // Only advance the block number when logs were found to
  259. // account for networks (like BNB and Polygon) which may
  260. // sacrifice event consistency for block event speed
  261. this.#blockNumber = log.blockNumber;
  262. }
  263. }
  264. start() {
  265. if (this.#running) {
  266. return;
  267. }
  268. this.#running = true;
  269. if (this.#blockNumber === -2) {
  270. this.#provider.getBlockNumber().then((blockNumber) => {
  271. this.#blockNumber = blockNumber;
  272. });
  273. }
  274. this.#provider.on("block", this.#poller);
  275. }
  276. stop() {
  277. if (!this.#running) {
  278. return;
  279. }
  280. this.#running = false;
  281. this.#provider.off("block", this.#poller);
  282. }
  283. pause(dropWhilePaused) {
  284. this.stop();
  285. if (dropWhilePaused) {
  286. this.#blockNumber = -2;
  287. }
  288. }
  289. resume() {
  290. this.start();
  291. }
  292. }
  293. //# sourceMappingURL=subscriber-polling.js.map