import { connect, wsconnect, deferred, type Msg, type NatsConnection, type PublishOptions, type RequestOptions, type Subscription, type SubscriptionOptions, } from "@nats-io/transport-node"; import process from "node:process"; export type IteratorResult = { done: true; value?: T } | { done: false; value: T }; // NatsClass provides utility functions to interact with NATS export default class NatsClass { server: string | string[]; private nc: NatsConnection | null = null; public interrupted = deferred(); constructor(server: string | string[]) { // Handle termination signals ["SIGINT", "SIGTERM"].forEach((signal) => { process.on(signal, () => { console.log("Received", signal); this.interrupted.resolve(); setTimeout(() => { console.log("forced process exit"); process.exit(2); }, 10000); }); }); this.server = server; } // connect to the nats server connect = async () => { const isWebsocket = typeof this.server === "string" ? this.server.startsWith("ws") : this.server[0]?.startsWith("ws") ?? false; this.nc = await (isWebsocket ? wsconnect : connect)({ servers: this.server, debug: false, maxReconnectAttempts: -1, reconnectTimeWait: 1000, ignoreClusterUpdates: false, noRandomize: false, }); console.log(`connected ${this.nc.getServer()}`); }; // close the connection to the nats server close = async () => { if (this.nc) { await this.nc.flush(); await this.nc.drain(); this.nc = null; } }; // get the server version serverVersion = (): string => { return this.nc?.info?.version ?? "Not connected"; }; // publish a message to the nats server publish = ( subject: string, data: string | Uint8Array, options?: PublishOptions ) => { if (!this.nc) { throw new Error("Not connected"); } return this.nc.publish(subject, data, options); }; // publish data as json, only if data is not a string publishJson = (subject: string, data: any, options?: PublishOptions) => { const payload = typeof data === "string" ? data : JSON.stringify(data); return this.publish(subject, payload, options); }; // publish data as json, only if data is not a string publishWithType = async ( subject: string, data?: T, options?: PublishOptions ): Promise => { await this.publishJson(subject, data, options); }; // sends a request and returns the response message request = ( subject: string, data?: string | Uint8Array, options?: RequestOptions ): Promise => { if (!this.nc) { throw new Error("Not connected"); } return this.nc.request(subject, data, options); }; requestJson = ( subject: string, data?: any, options?: RequestOptions ): Promise => { const payload = typeof data === "string" ? data : JSON.stringify(data); return this.request(subject, payload, options); }; // request with request and response type requestWithType = async ( subject: string, data?: TRequest, options?: RequestOptions ): Promise => { const msg = await this.requestJson(subject, data, options); return msg.json(); }; // flush the connection to the nats server flush = async () => { if (!this.nc) { throw new Error("Not connected"); } await this.nc.flush(); }; // subscribe to a subject, calls nats subscribe function subscribe = (subject: string, opts?: SubscriptionOptions): Subscription => { if (!this.nc) { throw new Error("Not connected"); } return this.nc.subscribe(subject, opts); }; subscribeCb = ( subject: string, callback: (err: Error | null, msg: Msg) => void, opts?: SubscriptionOptions ): Subscription => { if (!this.nc) { throw new Error("Not connected"); } const subOpts = { ...opts, callback, }; return this.nc.subscribe(subject, subOpts); }; // subscribe to a subject and decode messages to specified type subscribeToType = ( subject: string, opts?: SubscriptionOptions ): AsyncIterableIterator => { const sub = this.subscribe(subject, opts); const iterator = sub[Symbol.asyncIterator](); return { async next(): Promise> { const result = await iterator.next(); if (result.done) { return result; } return { done: false, value: result.value.json(), }; }, [Symbol.asyncIterator]() { return this; }, }; }; }