subscriber-connection.ts 1.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
  1. import { getNumber } from "../utils/index.js";
  2. import type { Subscriber } from "./abstract-provider.js";
  3. //#TODO: Temp
  4. import type { Provider } from "./provider.js";
  5. /**
  6. * @TODO
  7. *
  8. * @_docloc: api/providers/abstract-provider
  9. */
  10. export interface ConnectionRpcProvider extends Provider {
  11. //send(method: string, params: Array<any>): Promise<any>;
  12. _subscribe(param: Array<any>, processFunc: (result: any) => void): number;
  13. _unsubscribe(filterId: number): void;
  14. }
  15. /**
  16. * @TODO
  17. *
  18. * @_docloc: api/providers/abstract-provider
  19. */
  20. export class BlockConnectionSubscriber implements Subscriber {
  21. #provider: ConnectionRpcProvider;
  22. #blockNumber: number;
  23. #running: boolean;
  24. #filterId: null | number;
  25. constructor(provider: ConnectionRpcProvider) {
  26. this.#provider = provider;
  27. this.#blockNumber = -2;
  28. this.#running = false;
  29. this.#filterId = null;
  30. }
  31. start(): void {
  32. if (this.#running) { return; }
  33. this.#running = true;
  34. this.#filterId = this.#provider._subscribe([ "newHeads" ], (result: any) => {
  35. const blockNumber = getNumber(result.number);
  36. const initial = (this.#blockNumber === -2) ? blockNumber: (this.#blockNumber + 1)
  37. for (let b = initial; b <= blockNumber; b++) {
  38. this.#provider.emit("block", b);
  39. }
  40. this.#blockNumber = blockNumber;
  41. });
  42. }
  43. stop(): void {
  44. if (!this.#running) { return; }
  45. this.#running = false;
  46. if (this.#filterId != null) {
  47. this.#provider._unsubscribe(this.#filterId);
  48. this.#filterId = null;
  49. }
  50. }
  51. pause(dropWhilePaused?: boolean): void {
  52. if (dropWhilePaused) { this.#blockNumber = -2; }
  53. this.stop();
  54. }
  55. resume(): void {
  56. this.start();
  57. }
  58. }