plugis-client/NatsClass.ts
2025-03-22 18:51:17 +01:00

183 lines
4.6 KiB
TypeScript

import {
connect,
wsconnect,
deferred,
type Msg,
type NatsConnection,
type PublishOptions,
type RequestOptions,
type Subscription,
type SubscriptionOptions,
} from "@nats-io/transport-node";
import process from "node:process";
export type IteratorResult<T> = { done: true; value?: T } | { done: false; value: T };
// NatsClass provides utility functions to interact with NATS
export default class NatsClass {
server: string | string[];
private nc: NatsConnection | null = null;
public interrupted = deferred();
constructor(server: string | string[]) {
// Handle termination signals
["SIGINT", "SIGTERM"].forEach((signal) => {
process.on(signal, () => {
console.log("Received", signal);
this.interrupted.resolve();
setTimeout(() => {
console.log("forced process exit");
process.exit(2);
}, 10000);
});
});
this.server = server;
}
// connect to the nats server
connect = async () => {
const isWebsocket =
typeof this.server === "string"
? this.server.startsWith("ws")
: this.server[0]?.startsWith("ws") ?? false;
this.nc = await (isWebsocket ? wsconnect : connect)({
servers: this.server,
debug: false,
maxReconnectAttempts: -1,
reconnectTimeWait: 1000,
ignoreClusterUpdates: false,
noRandomize: false,
});
console.log(`connected ${this.nc.getServer()}`);
};
// close the connection to the nats server
close = async () => {
if (this.nc) {
await this.nc.flush();
await this.nc.drain();
this.nc = null;
}
};
// get the server version
serverVersion = (): string => {
return this.nc?.info?.version ?? "Not connected";
};
// publish a message to the nats server
publish = (
subject: string,
data: string | Uint8Array,
options?: PublishOptions
) => {
if (!this.nc) {
throw new Error("Not connected");
}
return this.nc.publish(subject, data, options);
};
// publish data as json, only if data is not a string
publishJson = (subject: string, data: any, options?: PublishOptions) => {
const payload = typeof data === "string" ? data : JSON.stringify(data);
return this.publish(subject, payload, options);
};
// publish data as json, only if data is not a string
publishWithType = async <T>(
subject: string,
data?: T,
options?: PublishOptions
): Promise<void> => {
await this.publishJson(subject, data, options);
};
// sends a request and returns the response message
request = (
subject: string,
data?: string | Uint8Array,
options?: RequestOptions
): Promise<Msg> => {
if (!this.nc) {
throw new Error("Not connected");
}
return this.nc.request(subject, data, options);
};
requestJson = (
subject: string,
data?: any,
options?: RequestOptions
): Promise<Msg> => {
const payload = typeof data === "string" ? data : JSON.stringify(data);
return this.request(subject, payload, options);
};
// request with request and response type
requestWithType = async <TRequest, TResponse>(
subject: string,
data?: TRequest,
options?: RequestOptions
): Promise<TResponse> => {
const msg = await this.requestJson(subject, data, options);
return msg.json<TResponse>();
};
// flush the connection to the nats server
flush = async () => {
if (!this.nc) {
throw new Error("Not connected");
}
await this.nc.flush();
};
// subscribe to a subject, calls nats subscribe function
subscribe = (subject: string, opts?: SubscriptionOptions): Subscription => {
if (!this.nc) {
throw new Error("Not connected");
}
return this.nc.subscribe(subject, opts);
};
subscribeCb = (
subject: string,
callback: (err: Error | null, msg: Msg) => void,
opts?: SubscriptionOptions
): Subscription => {
if (!this.nc) {
throw new Error("Not connected");
}
const subOpts = {
...opts,
callback,
};
return this.nc.subscribe(subject, subOpts);
};
// subscribe to a subject and decode messages to specified type
subscribeToType = <T>(
subject: string,
opts?: SubscriptionOptions
): AsyncIterableIterator<T> => {
const sub = this.subscribe(subject, opts);
const iterator = sub[Symbol.asyncIterator]();
return {
async next(): Promise<IteratorResult<T>> {
const result = await iterator.next();
if (result.done) {
return result;
}
return {
done: false,
value: result.value.json<T>(),
};
},
[Symbol.asyncIterator]() {
return this;
},
};
};
}