provider-socket.js 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309
  1. "use strict";
  2. /**
  3. * Generic long-lived socket provider.
  4. *
  5. * Sub-classing notes
  6. * - a sub-class MUST call the `_start()` method once connected
  7. * - a sub-class MUST override the `_write(string)` method
  8. * - a sub-class MUST call `_processMessage(string)` for each message
  9. *
  10. * @_subsection: api/providers/abstract-provider:Socket Providers [about-socketProvider]
  11. */
  12. Object.defineProperty(exports, "__esModule", { value: true });
  13. exports.SocketProvider = exports.SocketEventSubscriber = exports.SocketPendingSubscriber = exports.SocketBlockSubscriber = exports.SocketSubscriber = void 0;
  14. const abstract_provider_js_1 = require("./abstract-provider.js");
  15. const index_js_1 = require("../utils/index.js");
  16. const provider_jsonrpc_js_1 = require("./provider-jsonrpc.js");
  17. /**
  18. * A **SocketSubscriber** uses a socket transport to handle events and
  19. * should use [[_emit]] to manage the events.
  20. */
  21. class SocketSubscriber {
  22. #provider;
  23. #filter;
  24. /**
  25. * The filter.
  26. */
  27. get filter() { return JSON.parse(this.#filter); }
  28. #filterId;
  29. #paused;
  30. #emitPromise;
  31. /**
  32. * Creates a new **SocketSubscriber** attached to %%provider%% listening
  33. * to %%filter%%.
  34. */
  35. constructor(provider, filter) {
  36. this.#provider = provider;
  37. this.#filter = JSON.stringify(filter);
  38. this.#filterId = null;
  39. this.#paused = null;
  40. this.#emitPromise = null;
  41. }
  42. start() {
  43. this.#filterId = this.#provider.send("eth_subscribe", this.filter).then((filterId) => {
  44. ;
  45. this.#provider._register(filterId, this);
  46. return filterId;
  47. });
  48. }
  49. stop() {
  50. (this.#filterId).then((filterId) => {
  51. if (this.#provider.destroyed) {
  52. return;
  53. }
  54. this.#provider.send("eth_unsubscribe", [filterId]);
  55. });
  56. this.#filterId = null;
  57. }
  58. // @TODO: pause should trap the current blockNumber, unsub, and on resume use getLogs
  59. // and resume
  60. pause(dropWhilePaused) {
  61. (0, index_js_1.assert)(dropWhilePaused, "preserve logs while paused not supported by SocketSubscriber yet", "UNSUPPORTED_OPERATION", { operation: "pause(false)" });
  62. this.#paused = !!dropWhilePaused;
  63. }
  64. resume() {
  65. this.#paused = null;
  66. }
  67. /**
  68. * @_ignore:
  69. */
  70. _handleMessage(message) {
  71. if (this.#filterId == null) {
  72. return;
  73. }
  74. if (this.#paused === null) {
  75. let emitPromise = this.#emitPromise;
  76. if (emitPromise == null) {
  77. emitPromise = this._emit(this.#provider, message);
  78. }
  79. else {
  80. emitPromise = emitPromise.then(async () => {
  81. await this._emit(this.#provider, message);
  82. });
  83. }
  84. this.#emitPromise = emitPromise.then(() => {
  85. if (this.#emitPromise === emitPromise) {
  86. this.#emitPromise = null;
  87. }
  88. });
  89. }
  90. }
  91. /**
  92. * Sub-classes **must** override this to emit the events on the
  93. * provider.
  94. */
  95. async _emit(provider, message) {
  96. throw new Error("sub-classes must implemente this; _emit");
  97. }
  98. }
  99. exports.SocketSubscriber = SocketSubscriber;
  100. /**
  101. * A **SocketBlockSubscriber** listens for ``newHeads`` events and emits
  102. * ``"block"`` events.
  103. */
  104. class SocketBlockSubscriber extends SocketSubscriber {
  105. /**
  106. * @_ignore:
  107. */
  108. constructor(provider) {
  109. super(provider, ["newHeads"]);
  110. }
  111. async _emit(provider, message) {
  112. provider.emit("block", parseInt(message.number));
  113. }
  114. }
  115. exports.SocketBlockSubscriber = SocketBlockSubscriber;
  116. /**
  117. * A **SocketPendingSubscriber** listens for pending transacitons and emits
  118. * ``"pending"`` events.
  119. */
  120. class SocketPendingSubscriber extends SocketSubscriber {
  121. /**
  122. * @_ignore:
  123. */
  124. constructor(provider) {
  125. super(provider, ["newPendingTransactions"]);
  126. }
  127. async _emit(provider, message) {
  128. provider.emit("pending", message);
  129. }
  130. }
  131. exports.SocketPendingSubscriber = SocketPendingSubscriber;
  132. /**
  133. * A **SocketEventSubscriber** listens for event logs.
  134. */
  135. class SocketEventSubscriber extends SocketSubscriber {
  136. #logFilter;
  137. /**
  138. * The filter.
  139. */
  140. get logFilter() { return JSON.parse(this.#logFilter); }
  141. /**
  142. * @_ignore:
  143. */
  144. constructor(provider, filter) {
  145. super(provider, ["logs", filter]);
  146. this.#logFilter = JSON.stringify(filter);
  147. }
  148. async _emit(provider, message) {
  149. provider.emit(this.logFilter, provider._wrapLog(message, provider._network));
  150. }
  151. }
  152. exports.SocketEventSubscriber = SocketEventSubscriber;
  153. /**
  154. * A **SocketProvider** is backed by a long-lived connection over a
  155. * socket, which can subscribe and receive real-time messages over
  156. * its communication channel.
  157. */
  158. class SocketProvider extends provider_jsonrpc_js_1.JsonRpcApiProvider {
  159. #callbacks;
  160. // Maps each filterId to its subscriber
  161. #subs;
  162. // If any events come in before a subscriber has finished
  163. // registering, queue them
  164. #pending;
  165. /**
  166. * Creates a new **SocketProvider** connected to %%network%%.
  167. *
  168. * If unspecified, the network will be discovered.
  169. */
  170. constructor(network, _options) {
  171. // Copy the options
  172. const options = Object.assign({}, (_options != null) ? _options : {});
  173. // Support for batches is generally not supported for
  174. // connection-base providers; if this changes in the future
  175. // the _send should be updated to reflect this
  176. (0, index_js_1.assertArgument)(options.batchMaxCount == null || options.batchMaxCount === 1, "sockets-based providers do not support batches", "options.batchMaxCount", _options);
  177. options.batchMaxCount = 1;
  178. // Socket-based Providers (generally) cannot change their network,
  179. // since they have a long-lived connection; but let people override
  180. // this if they have just cause.
  181. if (options.staticNetwork == null) {
  182. options.staticNetwork = true;
  183. }
  184. super(network, options);
  185. this.#callbacks = new Map();
  186. this.#subs = new Map();
  187. this.#pending = new Map();
  188. }
  189. // This value is only valid after _start has been called
  190. /*
  191. get _network(): Network {
  192. if (this.#network == null) {
  193. throw new Error("this shouldn't happen");
  194. }
  195. return this.#network.clone();
  196. }
  197. */
  198. _getSubscriber(sub) {
  199. switch (sub.type) {
  200. case "close":
  201. return new abstract_provider_js_1.UnmanagedSubscriber("close");
  202. case "block":
  203. return new SocketBlockSubscriber(this);
  204. case "pending":
  205. return new SocketPendingSubscriber(this);
  206. case "event":
  207. return new SocketEventSubscriber(this, sub.filter);
  208. case "orphan":
  209. // Handled auto-matically within AbstractProvider
  210. // when the log.removed = true
  211. if (sub.filter.orphan === "drop-log") {
  212. return new abstract_provider_js_1.UnmanagedSubscriber("drop-log");
  213. }
  214. }
  215. return super._getSubscriber(sub);
  216. }
  217. /**
  218. * Register a new subscriber. This is used internalled by Subscribers
  219. * and generally is unecessary unless extending capabilities.
  220. */
  221. _register(filterId, subscriber) {
  222. this.#subs.set(filterId, subscriber);
  223. const pending = this.#pending.get(filterId);
  224. if (pending) {
  225. for (const message of pending) {
  226. subscriber._handleMessage(message);
  227. }
  228. this.#pending.delete(filterId);
  229. }
  230. }
  231. async _send(payload) {
  232. // WebSocket provider doesn't accept batches
  233. (0, index_js_1.assertArgument)(!Array.isArray(payload), "WebSocket does not support batch send", "payload", payload);
  234. // @TODO: stringify payloads here and store to prevent mutations
  235. // Prepare a promise to respond to
  236. const promise = new Promise((resolve, reject) => {
  237. this.#callbacks.set(payload.id, { payload, resolve, reject });
  238. });
  239. // Wait until the socket is connected before writing to it
  240. await this._waitUntilReady();
  241. // Write the request to the socket
  242. await this._write(JSON.stringify(payload));
  243. return [await promise];
  244. }
  245. // Sub-classes must call this once they are connected
  246. /*
  247. async _start(): Promise<void> {
  248. if (this.#ready) { return; }
  249. for (const { payload } of this.#callbacks.values()) {
  250. await this._write(JSON.stringify(payload));
  251. }
  252. this.#ready = (async function() {
  253. await super._start();
  254. })();
  255. }
  256. */
  257. /**
  258. * Sub-classes **must** call this with messages received over their
  259. * transport to be processed and dispatched.
  260. */
  261. async _processMessage(message) {
  262. const result = (JSON.parse(message));
  263. if (result && typeof (result) === "object" && "id" in result) {
  264. const callback = this.#callbacks.get(result.id);
  265. if (callback == null) {
  266. this.emit("error", (0, index_js_1.makeError)("received result for unknown id", "UNKNOWN_ERROR", {
  267. reasonCode: "UNKNOWN_ID",
  268. result
  269. }));
  270. return;
  271. }
  272. this.#callbacks.delete(result.id);
  273. callback.resolve(result);
  274. }
  275. else if (result && result.method === "eth_subscription") {
  276. const filterId = result.params.subscription;
  277. const subscriber = this.#subs.get(filterId);
  278. if (subscriber) {
  279. subscriber._handleMessage(result.params.result);
  280. }
  281. else {
  282. let pending = this.#pending.get(filterId);
  283. if (pending == null) {
  284. pending = [];
  285. this.#pending.set(filterId, pending);
  286. }
  287. pending.push(result.params.result);
  288. }
  289. }
  290. else {
  291. this.emit("error", (0, index_js_1.makeError)("received unexpected message", "UNKNOWN_ERROR", {
  292. reasonCode: "UNEXPECTED_MESSAGE",
  293. result
  294. }));
  295. return;
  296. }
  297. }
  298. /**
  299. * Sub-classes **must** override this to send %%message%% over their
  300. * transport.
  301. */
  302. async _write(message) {
  303. throw new Error("sub-classes must override this");
  304. }
  305. }
  306. exports.SocketProvider = SocketProvider;
  307. //# sourceMappingURL=provider-socket.js.map