provider-socket.ts 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352
  1. /**
  2. * Generic long-lived socket provider.
  3. *
  4. * Sub-classing notes
  5. * - a sub-class MUST call the `_start()` method once connected
  6. * - a sub-class MUST override the `_write(string)` method
  7. * - a sub-class MUST call `_processMessage(string)` for each message
  8. *
  9. * @_subsection: api/providers/abstract-provider:Socket Providers [about-socketProvider]
  10. */
  11. import { UnmanagedSubscriber } from "./abstract-provider.js";
  12. import { assert, assertArgument, makeError } from "../utils/index.js";
  13. import { JsonRpcApiProvider } from "./provider-jsonrpc.js";
  14. import type { Subscriber, Subscription } from "./abstract-provider.js";
  15. import type { EventFilter } from "./provider.js";
  16. import type {
  17. JsonRpcApiProviderOptions, JsonRpcError, JsonRpcPayload, JsonRpcResult
  18. } from "./provider-jsonrpc.js";
  19. import type { Networkish } from "./network.js";
  20. type JsonRpcSubscription = {
  21. method: string,
  22. params: {
  23. result: any,
  24. subscription: string
  25. }
  26. };
  27. /**
  28. * A **SocketSubscriber** uses a socket transport to handle events and
  29. * should use [[_emit]] to manage the events.
  30. */
  31. export class SocketSubscriber implements Subscriber {
  32. #provider: SocketProvider;
  33. #filter: string;
  34. /**
  35. * The filter.
  36. */
  37. get filter(): Array<any> { return JSON.parse(this.#filter); }
  38. #filterId: null | Promise<string |number>;
  39. #paused: null | boolean;
  40. #emitPromise: null | Promise<void>;
  41. /**
  42. * Creates a new **SocketSubscriber** attached to %%provider%% listening
  43. * to %%filter%%.
  44. */
  45. constructor(provider: SocketProvider, filter: Array<any>) {
  46. this.#provider = provider;
  47. this.#filter = JSON.stringify(filter);
  48. this.#filterId = null;
  49. this.#paused = null;
  50. this.#emitPromise = null;
  51. }
  52. start(): void {
  53. this.#filterId = this.#provider.send("eth_subscribe", this.filter).then((filterId) => {;
  54. this.#provider._register(filterId, this);
  55. return filterId;
  56. });
  57. }
  58. stop(): void {
  59. (<Promise<number>>(this.#filterId)).then((filterId) => {
  60. if (this.#provider.destroyed) { return; }
  61. this.#provider.send("eth_unsubscribe", [ filterId ]);
  62. });
  63. this.#filterId = null;
  64. }
  65. // @TODO: pause should trap the current blockNumber, unsub, and on resume use getLogs
  66. // and resume
  67. pause(dropWhilePaused?: boolean): void {
  68. assert(dropWhilePaused, "preserve logs while paused not supported by SocketSubscriber yet",
  69. "UNSUPPORTED_OPERATION", { operation: "pause(false)" });
  70. this.#paused = !!dropWhilePaused;
  71. }
  72. resume(): void {
  73. this.#paused = null;
  74. }
  75. /**
  76. * @_ignore:
  77. */
  78. _handleMessage(message: any): void {
  79. if (this.#filterId == null) { return; }
  80. if (this.#paused === null) {
  81. let emitPromise: null | Promise<void> = this.#emitPromise;
  82. if (emitPromise == null) {
  83. emitPromise = this._emit(this.#provider, message);
  84. } else {
  85. emitPromise = emitPromise.then(async () => {
  86. await this._emit(this.#provider, message);
  87. });
  88. }
  89. this.#emitPromise = emitPromise.then(() => {
  90. if (this.#emitPromise === emitPromise) {
  91. this.#emitPromise = null;
  92. }
  93. });
  94. }
  95. }
  96. /**
  97. * Sub-classes **must** override this to emit the events on the
  98. * provider.
  99. */
  100. async _emit(provider: SocketProvider, message: any): Promise<void> {
  101. throw new Error("sub-classes must implemente this; _emit");
  102. }
  103. }
  104. /**
  105. * A **SocketBlockSubscriber** listens for ``newHeads`` events and emits
  106. * ``"block"`` events.
  107. */
  108. export class SocketBlockSubscriber extends SocketSubscriber {
  109. /**
  110. * @_ignore:
  111. */
  112. constructor(provider: SocketProvider) {
  113. super(provider, [ "newHeads" ]);
  114. }
  115. async _emit(provider: SocketProvider, message: any): Promise<void> {
  116. provider.emit("block", parseInt(message.number));
  117. }
  118. }
  119. /**
  120. * A **SocketPendingSubscriber** listens for pending transacitons and emits
  121. * ``"pending"`` events.
  122. */
  123. export class SocketPendingSubscriber extends SocketSubscriber {
  124. /**
  125. * @_ignore:
  126. */
  127. constructor(provider: SocketProvider) {
  128. super(provider, [ "newPendingTransactions" ]);
  129. }
  130. async _emit(provider: SocketProvider, message: any): Promise<void> {
  131. provider.emit("pending", message);
  132. }
  133. }
  134. /**
  135. * A **SocketEventSubscriber** listens for event logs.
  136. */
  137. export class SocketEventSubscriber extends SocketSubscriber {
  138. #logFilter: string;
  139. /**
  140. * The filter.
  141. */
  142. get logFilter(): EventFilter { return JSON.parse(this.#logFilter); }
  143. /**
  144. * @_ignore:
  145. */
  146. constructor(provider: SocketProvider, filter: EventFilter) {
  147. super(provider, [ "logs", filter ]);
  148. this.#logFilter = JSON.stringify(filter);
  149. }
  150. async _emit(provider: SocketProvider, message: any): Promise<void> {
  151. provider.emit(this.logFilter, provider._wrapLog(message, provider._network));
  152. }
  153. }
  154. /**
  155. * A **SocketProvider** is backed by a long-lived connection over a
  156. * socket, which can subscribe and receive real-time messages over
  157. * its communication channel.
  158. */
  159. export class SocketProvider extends JsonRpcApiProvider {
  160. #callbacks: Map<number, { payload: JsonRpcPayload, resolve: (r: any) => void, reject: (e: Error) => void }>;
  161. // Maps each filterId to its subscriber
  162. #subs: Map<number | string, SocketSubscriber>;
  163. // If any events come in before a subscriber has finished
  164. // registering, queue them
  165. #pending: Map<number | string, Array<any>>;
  166. /**
  167. * Creates a new **SocketProvider** connected to %%network%%.
  168. *
  169. * If unspecified, the network will be discovered.
  170. */
  171. constructor(network?: Networkish, _options?: JsonRpcApiProviderOptions) {
  172. // Copy the options
  173. const options = Object.assign({ }, (_options != null) ? _options: { });
  174. // Support for batches is generally not supported for
  175. // connection-base providers; if this changes in the future
  176. // the _send should be updated to reflect this
  177. assertArgument(options.batchMaxCount == null || options.batchMaxCount === 1,
  178. "sockets-based providers do not support batches", "options.batchMaxCount", _options);
  179. options.batchMaxCount = 1;
  180. // Socket-based Providers (generally) cannot change their network,
  181. // since they have a long-lived connection; but let people override
  182. // this if they have just cause.
  183. if (options.staticNetwork == null) { options.staticNetwork = true; }
  184. super(network, options);
  185. this.#callbacks = new Map();
  186. this.#subs = new Map();
  187. this.#pending = new Map();
  188. }
  189. // This value is only valid after _start has been called
  190. /*
  191. get _network(): Network {
  192. if (this.#network == null) {
  193. throw new Error("this shouldn't happen");
  194. }
  195. return this.#network.clone();
  196. }
  197. */
  198. _getSubscriber(sub: Subscription): Subscriber {
  199. switch (sub.type) {
  200. case "close":
  201. return new UnmanagedSubscriber("close");
  202. case "block":
  203. return new SocketBlockSubscriber(this);
  204. case "pending":
  205. return new SocketPendingSubscriber(this);
  206. case "event":
  207. return new SocketEventSubscriber(this, sub.filter);
  208. case "orphan":
  209. // Handled auto-matically within AbstractProvider
  210. // when the log.removed = true
  211. if (sub.filter.orphan === "drop-log") {
  212. return new UnmanagedSubscriber("drop-log");
  213. }
  214. }
  215. return super._getSubscriber(sub);
  216. }
  217. /**
  218. * Register a new subscriber. This is used internalled by Subscribers
  219. * and generally is unecessary unless extending capabilities.
  220. */
  221. _register(filterId: number | string, subscriber: SocketSubscriber): void {
  222. this.#subs.set(filterId, subscriber);
  223. const pending = this.#pending.get(filterId);
  224. if (pending) {
  225. for (const message of pending) {
  226. subscriber._handleMessage(message);
  227. }
  228. this.#pending.delete(filterId);
  229. }
  230. }
  231. async _send(payload: JsonRpcPayload | Array<JsonRpcPayload>): Promise<Array<JsonRpcResult | JsonRpcError>> {
  232. // WebSocket provider doesn't accept batches
  233. assertArgument(!Array.isArray(payload), "WebSocket does not support batch send", "payload", payload);
  234. // @TODO: stringify payloads here and store to prevent mutations
  235. // Prepare a promise to respond to
  236. const promise = new Promise((resolve, reject) => {
  237. this.#callbacks.set(payload.id, { payload, resolve, reject });
  238. });
  239. // Wait until the socket is connected before writing to it
  240. await this._waitUntilReady();
  241. // Write the request to the socket
  242. await this._write(JSON.stringify(payload));
  243. return <Array<JsonRpcResult | JsonRpcError>>[ await promise ];
  244. }
  245. // Sub-classes must call this once they are connected
  246. /*
  247. async _start(): Promise<void> {
  248. if (this.#ready) { return; }
  249. for (const { payload } of this.#callbacks.values()) {
  250. await this._write(JSON.stringify(payload));
  251. }
  252. this.#ready = (async function() {
  253. await super._start();
  254. })();
  255. }
  256. */
  257. /**
  258. * Sub-classes **must** call this with messages received over their
  259. * transport to be processed and dispatched.
  260. */
  261. async _processMessage(message: string): Promise<void> {
  262. const result = <JsonRpcResult | JsonRpcError | JsonRpcSubscription>(JSON.parse(message));
  263. if (result && typeof(result) === "object" && "id" in result) {
  264. const callback = this.#callbacks.get(result.id);
  265. if (callback == null) {
  266. this.emit("error", makeError("received result for unknown id", "UNKNOWN_ERROR", {
  267. reasonCode: "UNKNOWN_ID",
  268. result
  269. }));
  270. return;
  271. }
  272. this.#callbacks.delete(result.id);
  273. callback.resolve(result);
  274. } else if (result && result.method === "eth_subscription") {
  275. const filterId = result.params.subscription;
  276. const subscriber = this.#subs.get(filterId);
  277. if (subscriber) {
  278. subscriber._handleMessage(result.params.result);
  279. } else {
  280. let pending = this.#pending.get(filterId);
  281. if (pending == null) {
  282. pending = [ ];
  283. this.#pending.set(filterId, pending);
  284. }
  285. pending.push(result.params.result);
  286. }
  287. } else {
  288. this.emit("error", makeError("received unexpected message", "UNKNOWN_ERROR", {
  289. reasonCode: "UNEXPECTED_MESSAGE",
  290. result
  291. }));
  292. return;
  293. }
  294. }
  295. /**
  296. * Sub-classes **must** override this to send %%message%% over their
  297. * transport.
  298. */
  299. async _write(message: string): Promise<void> {
  300. throw new Error("sub-classes must override this");
  301. }
  302. }