provider-socket.js 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301
  1. /**
  2. * Generic long-lived socket provider.
  3. *
  4. * Sub-classing notes
  5. * - a sub-class MUST call the `_start()` method once connected
  6. * - a sub-class MUST override the `_write(string)` method
  7. * - a sub-class MUST call `_processMessage(string)` for each message
  8. *
  9. * @_subsection: api/providers/abstract-provider:Socket Providers [about-socketProvider]
  10. */
  11. import { UnmanagedSubscriber } from "./abstract-provider.js";
  12. import { assert, assertArgument, makeError } from "../utils/index.js";
  13. import { JsonRpcApiProvider } from "./provider-jsonrpc.js";
  14. /**
  15. * A **SocketSubscriber** uses a socket transport to handle events and
  16. * should use [[_emit]] to manage the events.
  17. */
  18. export class SocketSubscriber {
  19. #provider;
  20. #filter;
  21. /**
  22. * The filter.
  23. */
  24. get filter() { return JSON.parse(this.#filter); }
  25. #filterId;
  26. #paused;
  27. #emitPromise;
  28. /**
  29. * Creates a new **SocketSubscriber** attached to %%provider%% listening
  30. * to %%filter%%.
  31. */
  32. constructor(provider, filter) {
  33. this.#provider = provider;
  34. this.#filter = JSON.stringify(filter);
  35. this.#filterId = null;
  36. this.#paused = null;
  37. this.#emitPromise = null;
  38. }
  39. start() {
  40. this.#filterId = this.#provider.send("eth_subscribe", this.filter).then((filterId) => {
  41. ;
  42. this.#provider._register(filterId, this);
  43. return filterId;
  44. });
  45. }
  46. stop() {
  47. (this.#filterId).then((filterId) => {
  48. if (this.#provider.destroyed) {
  49. return;
  50. }
  51. this.#provider.send("eth_unsubscribe", [filterId]);
  52. });
  53. this.#filterId = null;
  54. }
  55. // @TODO: pause should trap the current blockNumber, unsub, and on resume use getLogs
  56. // and resume
  57. pause(dropWhilePaused) {
  58. assert(dropWhilePaused, "preserve logs while paused not supported by SocketSubscriber yet", "UNSUPPORTED_OPERATION", { operation: "pause(false)" });
  59. this.#paused = !!dropWhilePaused;
  60. }
  61. resume() {
  62. this.#paused = null;
  63. }
  64. /**
  65. * @_ignore:
  66. */
  67. _handleMessage(message) {
  68. if (this.#filterId == null) {
  69. return;
  70. }
  71. if (this.#paused === null) {
  72. let emitPromise = this.#emitPromise;
  73. if (emitPromise == null) {
  74. emitPromise = this._emit(this.#provider, message);
  75. }
  76. else {
  77. emitPromise = emitPromise.then(async () => {
  78. await this._emit(this.#provider, message);
  79. });
  80. }
  81. this.#emitPromise = emitPromise.then(() => {
  82. if (this.#emitPromise === emitPromise) {
  83. this.#emitPromise = null;
  84. }
  85. });
  86. }
  87. }
  88. /**
  89. * Sub-classes **must** override this to emit the events on the
  90. * provider.
  91. */
  92. async _emit(provider, message) {
  93. throw new Error("sub-classes must implemente this; _emit");
  94. }
  95. }
  96. /**
  97. * A **SocketBlockSubscriber** listens for ``newHeads`` events and emits
  98. * ``"block"`` events.
  99. */
  100. export class SocketBlockSubscriber extends SocketSubscriber {
  101. /**
  102. * @_ignore:
  103. */
  104. constructor(provider) {
  105. super(provider, ["newHeads"]);
  106. }
  107. async _emit(provider, message) {
  108. provider.emit("block", parseInt(message.number));
  109. }
  110. }
  111. /**
  112. * A **SocketPendingSubscriber** listens for pending transacitons and emits
  113. * ``"pending"`` events.
  114. */
  115. export class SocketPendingSubscriber extends SocketSubscriber {
  116. /**
  117. * @_ignore:
  118. */
  119. constructor(provider) {
  120. super(provider, ["newPendingTransactions"]);
  121. }
  122. async _emit(provider, message) {
  123. provider.emit("pending", message);
  124. }
  125. }
  126. /**
  127. * A **SocketEventSubscriber** listens for event logs.
  128. */
  129. export class SocketEventSubscriber extends SocketSubscriber {
  130. #logFilter;
  131. /**
  132. * The filter.
  133. */
  134. get logFilter() { return JSON.parse(this.#logFilter); }
  135. /**
  136. * @_ignore:
  137. */
  138. constructor(provider, filter) {
  139. super(provider, ["logs", filter]);
  140. this.#logFilter = JSON.stringify(filter);
  141. }
  142. async _emit(provider, message) {
  143. provider.emit(this.logFilter, provider._wrapLog(message, provider._network));
  144. }
  145. }
  146. /**
  147. * A **SocketProvider** is backed by a long-lived connection over a
  148. * socket, which can subscribe and receive real-time messages over
  149. * its communication channel.
  150. */
  151. export class SocketProvider extends JsonRpcApiProvider {
  152. #callbacks;
  153. // Maps each filterId to its subscriber
  154. #subs;
  155. // If any events come in before a subscriber has finished
  156. // registering, queue them
  157. #pending;
  158. /**
  159. * Creates a new **SocketProvider** connected to %%network%%.
  160. *
  161. * If unspecified, the network will be discovered.
  162. */
  163. constructor(network, _options) {
  164. // Copy the options
  165. const options = Object.assign({}, (_options != null) ? _options : {});
  166. // Support for batches is generally not supported for
  167. // connection-base providers; if this changes in the future
  168. // the _send should be updated to reflect this
  169. assertArgument(options.batchMaxCount == null || options.batchMaxCount === 1, "sockets-based providers do not support batches", "options.batchMaxCount", _options);
  170. options.batchMaxCount = 1;
  171. // Socket-based Providers (generally) cannot change their network,
  172. // since they have a long-lived connection; but let people override
  173. // this if they have just cause.
  174. if (options.staticNetwork == null) {
  175. options.staticNetwork = true;
  176. }
  177. super(network, options);
  178. this.#callbacks = new Map();
  179. this.#subs = new Map();
  180. this.#pending = new Map();
  181. }
  182. // This value is only valid after _start has been called
  183. /*
  184. get _network(): Network {
  185. if (this.#network == null) {
  186. throw new Error("this shouldn't happen");
  187. }
  188. return this.#network.clone();
  189. }
  190. */
  191. _getSubscriber(sub) {
  192. switch (sub.type) {
  193. case "close":
  194. return new UnmanagedSubscriber("close");
  195. case "block":
  196. return new SocketBlockSubscriber(this);
  197. case "pending":
  198. return new SocketPendingSubscriber(this);
  199. case "event":
  200. return new SocketEventSubscriber(this, sub.filter);
  201. case "orphan":
  202. // Handled auto-matically within AbstractProvider
  203. // when the log.removed = true
  204. if (sub.filter.orphan === "drop-log") {
  205. return new UnmanagedSubscriber("drop-log");
  206. }
  207. }
  208. return super._getSubscriber(sub);
  209. }
  210. /**
  211. * Register a new subscriber. This is used internalled by Subscribers
  212. * and generally is unecessary unless extending capabilities.
  213. */
  214. _register(filterId, subscriber) {
  215. this.#subs.set(filterId, subscriber);
  216. const pending = this.#pending.get(filterId);
  217. if (pending) {
  218. for (const message of pending) {
  219. subscriber._handleMessage(message);
  220. }
  221. this.#pending.delete(filterId);
  222. }
  223. }
  224. async _send(payload) {
  225. // WebSocket provider doesn't accept batches
  226. assertArgument(!Array.isArray(payload), "WebSocket does not support batch send", "payload", payload);
  227. // @TODO: stringify payloads here and store to prevent mutations
  228. // Prepare a promise to respond to
  229. const promise = new Promise((resolve, reject) => {
  230. this.#callbacks.set(payload.id, { payload, resolve, reject });
  231. });
  232. // Wait until the socket is connected before writing to it
  233. await this._waitUntilReady();
  234. // Write the request to the socket
  235. await this._write(JSON.stringify(payload));
  236. return [await promise];
  237. }
  238. // Sub-classes must call this once they are connected
  239. /*
  240. async _start(): Promise<void> {
  241. if (this.#ready) { return; }
  242. for (const { payload } of this.#callbacks.values()) {
  243. await this._write(JSON.stringify(payload));
  244. }
  245. this.#ready = (async function() {
  246. await super._start();
  247. })();
  248. }
  249. */
  250. /**
  251. * Sub-classes **must** call this with messages received over their
  252. * transport to be processed and dispatched.
  253. */
  254. async _processMessage(message) {
  255. const result = (JSON.parse(message));
  256. if (result && typeof (result) === "object" && "id" in result) {
  257. const callback = this.#callbacks.get(result.id);
  258. if (callback == null) {
  259. this.emit("error", makeError("received result for unknown id", "UNKNOWN_ERROR", {
  260. reasonCode: "UNKNOWN_ID",
  261. result
  262. }));
  263. return;
  264. }
  265. this.#callbacks.delete(result.id);
  266. callback.resolve(result);
  267. }
  268. else if (result && result.method === "eth_subscription") {
  269. const filterId = result.params.subscription;
  270. const subscriber = this.#subs.get(filterId);
  271. if (subscriber) {
  272. subscriber._handleMessage(result.params.result);
  273. }
  274. else {
  275. let pending = this.#pending.get(filterId);
  276. if (pending == null) {
  277. pending = [];
  278. this.#pending.set(filterId, pending);
  279. }
  280. pending.push(result.params.result);
  281. }
  282. }
  283. else {
  284. this.emit("error", makeError("received unexpected message", "UNKNOWN_ERROR", {
  285. reasonCode: "UNEXPECTED_MESSAGE",
  286. result
  287. }));
  288. return;
  289. }
  290. }
  291. /**
  292. * Sub-classes **must** override this to send %%message%% over their
  293. * transport.
  294. */
  295. async _write(message) {
  296. throw new Error("sub-classes must override this");
  297. }
  298. }
  299. //# sourceMappingURL=provider-socket.js.map