| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352 | 
							- /**
 
-  *  Generic long-lived socket provider.
 
-  *
 
-  *  Sub-classing notes
 
-  *  - a sub-class MUST call the `_start()` method once connected
 
-  *  - a sub-class MUST override the `_write(string)` method
 
-  *  - a sub-class MUST call `_processMessage(string)` for each message
 
-  *
 
-  *  @_subsection: api/providers/abstract-provider:Socket Providers  [about-socketProvider]
 
-  */
 
- import { UnmanagedSubscriber } from "./abstract-provider.js";
 
- import { assert, assertArgument, makeError } from "../utils/index.js";
 
- import { JsonRpcApiProvider } from "./provider-jsonrpc.js";
 
- import type { Subscriber, Subscription } from "./abstract-provider.js";
 
- import type { EventFilter } from "./provider.js";
 
- import type {
 
-     JsonRpcApiProviderOptions, JsonRpcError, JsonRpcPayload, JsonRpcResult
 
- } from "./provider-jsonrpc.js";
 
- import type { Networkish } from "./network.js";
 
- type JsonRpcSubscription = {
 
-     method: string,
 
-     params: {
 
-         result: any,
 
-         subscription: string
 
-     }
 
- };
 
- /**
 
-  *  A **SocketSubscriber** uses a socket transport to handle events and
 
-  *  should use [[_emit]] to manage the events.
 
-  */
 
- export class SocketSubscriber implements Subscriber {
 
-     #provider: SocketProvider;
 
-     #filter: string;
 
-     /**
 
-      *  The filter.
 
-      */
 
-     get filter(): Array<any> { return JSON.parse(this.#filter); }
 
-     #filterId: null | Promise<string |number>;
 
-     #paused: null | boolean;
 
-     #emitPromise: null | Promise<void>;
 
-     /**
 
-      *  Creates a new **SocketSubscriber** attached to %%provider%% listening
 
-      *  to %%filter%%.
 
-      */
 
-     constructor(provider: SocketProvider, filter: Array<any>) {
 
-         this.#provider = provider;
 
-         this.#filter = JSON.stringify(filter);
 
-         this.#filterId = null;
 
-         this.#paused = null;
 
-         this.#emitPromise = null;
 
-     }
 
-     start(): void {
 
-         this.#filterId = this.#provider.send("eth_subscribe", this.filter).then((filterId) => {;
 
-             this.#provider._register(filterId, this);
 
-             return filterId;
 
-         });
 
-     }
 
-     stop(): void {
 
-         (<Promise<number>>(this.#filterId)).then((filterId) => {
 
-             if (this.#provider.destroyed) { return; }
 
-             this.#provider.send("eth_unsubscribe", [ filterId ]);
 
-         });
 
-         this.#filterId = null;
 
-     }
 
-     // @TODO: pause should trap the current blockNumber, unsub, and on resume use getLogs
 
-     //        and resume
 
-     pause(dropWhilePaused?: boolean): void {
 
-         assert(dropWhilePaused, "preserve logs while paused not supported by SocketSubscriber yet",
 
-             "UNSUPPORTED_OPERATION", { operation: "pause(false)" });
 
-         this.#paused = !!dropWhilePaused;
 
-     }
 
-     resume(): void {
 
-         this.#paused = null;
 
-     }
 
-     /**
 
-      *  @_ignore:
 
-      */
 
-     _handleMessage(message: any): void {
 
-         if (this.#filterId == null) { return; }
 
-         if (this.#paused === null) {
 
-             let emitPromise: null | Promise<void> = this.#emitPromise;
 
-             if (emitPromise == null) {
 
-                 emitPromise = this._emit(this.#provider, message);
 
-             } else {
 
-                 emitPromise = emitPromise.then(async () => {
 
-                     await this._emit(this.#provider, message);
 
-                 });
 
-             }
 
-             this.#emitPromise = emitPromise.then(() => {
 
-                 if (this.#emitPromise === emitPromise) {
 
-                     this.#emitPromise = null;
 
-                 }
 
-             });
 
-         }
 
-     }
 
-     /**
 
-      *  Sub-classes **must** override this to emit the events on the
 
-      *  provider.
 
-      */
 
-     async _emit(provider: SocketProvider, message: any): Promise<void> {
 
-         throw new Error("sub-classes must implemente this; _emit");
 
-     }
 
- }
 
- /**
 
-  *  A **SocketBlockSubscriber** listens for ``newHeads`` events and emits
 
-  *  ``"block"`` events.
 
-  */
 
- export class SocketBlockSubscriber extends SocketSubscriber {
 
-     /**
 
-      *  @_ignore:
 
-      */
 
-     constructor(provider: SocketProvider) {
 
-         super(provider, [ "newHeads" ]);
 
-     }
 
-     async _emit(provider: SocketProvider, message: any): Promise<void> {
 
-         provider.emit("block", parseInt(message.number));
 
-     }
 
- }
 
- /**
 
-  *  A **SocketPendingSubscriber** listens for pending transacitons and emits
 
-  *  ``"pending"`` events.
 
-  */
 
- export class SocketPendingSubscriber extends SocketSubscriber {
 
-     /**
 
-      *  @_ignore:
 
-      */
 
-     constructor(provider: SocketProvider) {
 
-         super(provider, [ "newPendingTransactions" ]);
 
-     }
 
-     async _emit(provider: SocketProvider, message: any): Promise<void> {
 
-         provider.emit("pending", message);
 
-     }
 
- }
 
- /**
 
-  *  A **SocketEventSubscriber** listens for event logs.
 
-  */
 
- export class SocketEventSubscriber extends SocketSubscriber {
 
-     #logFilter: string;
 
-     /**
 
-      *  The filter.
 
-      */
 
-     get logFilter(): EventFilter { return JSON.parse(this.#logFilter); }
 
-     /**
 
-      *  @_ignore:
 
-      */
 
-     constructor(provider: SocketProvider, filter: EventFilter) {
 
-         super(provider, [ "logs", filter ]);
 
-         this.#logFilter = JSON.stringify(filter);
 
-     }
 
-     async _emit(provider: SocketProvider, message: any): Promise<void> {
 
-         provider.emit(this.logFilter, provider._wrapLog(message, provider._network));
 
-     }
 
- }
 
- /**
 
-  *  A **SocketProvider** is backed by a long-lived connection over a
 
-  *  socket, which can subscribe and receive real-time messages over
 
-  *  its communication channel.
 
-  */
 
- export class SocketProvider extends JsonRpcApiProvider {
 
-     #callbacks: Map<number, { payload: JsonRpcPayload, resolve: (r: any) => void, reject: (e: Error) => void }>;
 
-     // Maps each filterId to its subscriber
 
-     #subs: Map<number | string, SocketSubscriber>;
 
-     // If any events come in before a subscriber has finished
 
-     // registering, queue them
 
-     #pending: Map<number | string, Array<any>>;
 
-     /**
 
-      *  Creates a new **SocketProvider** connected to %%network%%.
 
-      *
 
-      *  If unspecified, the network will be discovered.
 
-      */
 
-     constructor(network?: Networkish, _options?: JsonRpcApiProviderOptions) {
 
-         // Copy the options
 
-         const options = Object.assign({ }, (_options != null) ? _options: { });
 
-         // Support for batches is generally not supported for
 
-         // connection-base providers; if this changes in the future
 
-         // the _send should be updated to reflect this
 
-         assertArgument(options.batchMaxCount == null || options.batchMaxCount === 1,
 
-             "sockets-based providers do not support batches", "options.batchMaxCount", _options);
 
-         options.batchMaxCount = 1;
 
-         // Socket-based Providers (generally) cannot change their network,
 
-         // since they have a long-lived connection; but let people override
 
-         // this if they have just cause.
 
-         if (options.staticNetwork == null) { options.staticNetwork = true; }
 
-         super(network, options);
 
-         this.#callbacks = new Map();
 
-         this.#subs = new Map();
 
-         this.#pending = new Map();
 
-     }
 
-     // This value is only valid after _start has been called
 
-     /*
 
-     get _network(): Network {
 
-         if (this.#network == null) {
 
-             throw new Error("this shouldn't happen");
 
-         }
 
-         return this.#network.clone();
 
-     }
 
-     */
 
-     _getSubscriber(sub: Subscription): Subscriber {
 
-         switch (sub.type) {
 
-             case "close":
 
-                 return new UnmanagedSubscriber("close");
 
-             case "block":
 
-                 return new SocketBlockSubscriber(this);
 
-             case "pending":
 
-                 return new SocketPendingSubscriber(this);
 
-             case "event":
 
-                 return new SocketEventSubscriber(this, sub.filter);
 
-             case "orphan":
 
-                 // Handled auto-matically within AbstractProvider
 
-                 // when the log.removed = true
 
-                 if (sub.filter.orphan === "drop-log") {
 
-                     return new UnmanagedSubscriber("drop-log");
 
-                 }
 
-         }
 
-         return super._getSubscriber(sub);
 
-     }
 
-     /**
 
-      *  Register a new subscriber. This is used internalled by Subscribers
 
-      *  and generally is unecessary unless extending capabilities.
 
-      */
 
-     _register(filterId: number | string, subscriber: SocketSubscriber): void {
 
-         this.#subs.set(filterId, subscriber);
 
-         const pending = this.#pending.get(filterId);
 
-         if (pending) {
 
-             for (const message of pending) {
 
-                 subscriber._handleMessage(message);
 
-             }
 
-             this.#pending.delete(filterId);
 
-         }
 
-     }
 
-     async _send(payload: JsonRpcPayload | Array<JsonRpcPayload>): Promise<Array<JsonRpcResult | JsonRpcError>> {
 
-         // WebSocket provider doesn't accept batches
 
-         assertArgument(!Array.isArray(payload), "WebSocket does not support batch send", "payload", payload);
 
-         // @TODO: stringify payloads here and store to prevent mutations
 
-         // Prepare a promise to respond to
 
-         const promise = new Promise((resolve, reject) => {
 
-             this.#callbacks.set(payload.id, { payload, resolve, reject });
 
-         });
 
-         // Wait until the socket is connected before writing to it
 
-         await this._waitUntilReady();
 
-         // Write the request to the socket
 
-         await this._write(JSON.stringify(payload));
 
-         return <Array<JsonRpcResult | JsonRpcError>>[ await promise ];
 
-     }
 
-     // Sub-classes must call this once they are connected
 
-     /*
 
-     async _start(): Promise<void> {
 
-         if (this.#ready) { return; }
 
-         for (const { payload } of this.#callbacks.values()) {
 
-             await this._write(JSON.stringify(payload));
 
-         }
 
-         this.#ready = (async function() {
 
-             await super._start();
 
-         })();
 
-     }
 
-     */
 
-     /**
 
-      *  Sub-classes **must** call this with messages received over their
 
-      *  transport to be processed and dispatched.
 
-      */
 
-     async _processMessage(message: string): Promise<void> {
 
-         const result = <JsonRpcResult | JsonRpcError | JsonRpcSubscription>(JSON.parse(message));
 
-         if (result && typeof(result) === "object" && "id" in result) {
 
-             const callback = this.#callbacks.get(result.id);
 
-             if (callback == null) {
 
-                 this.emit("error", makeError("received result for unknown id", "UNKNOWN_ERROR", {
 
-                     reasonCode: "UNKNOWN_ID",
 
-                     result
 
-                 }));
 
-                 return;
 
-             }
 
-             this.#callbacks.delete(result.id);
 
-             callback.resolve(result);
 
-         } else if (result && result.method === "eth_subscription") {
 
-             const filterId = result.params.subscription;
 
-             const subscriber = this.#subs.get(filterId);
 
-             if (subscriber) {
 
-                 subscriber._handleMessage(result.params.result);
 
-             } else {
 
-                 let pending = this.#pending.get(filterId);
 
-                 if (pending == null) {
 
-                     pending = [ ];
 
-                     this.#pending.set(filterId, pending);
 
-                 }
 
-                 pending.push(result.params.result);
 
-             }
 
-         } else {
 
-             this.emit("error", makeError("received unexpected message", "UNKNOWN_ERROR", {
 
-                 reasonCode: "UNEXPECTED_MESSAGE",
 
-                 result
 
-             }));
 
-             return;
 
-         }
 
-     }
 
-     /**
 
-      *  Sub-classes **must** override this to send %%message%% over their
 
-      *  transport.
 
-      */
 
-     async _write(message: string): Promise<void> {
 
-         throw new Error("sub-classes must override this");
 
-     }
 
- }
 
 
  |