diff --git a/NatsClass.ts b/NatsClass.ts new file mode 100644 index 0000000..9557723 --- /dev/null +++ b/NatsClass.ts @@ -0,0 +1,183 @@ +import { + connect, + wsconnect, + deferred, + type Msg, + type NatsConnection, + type PublishOptions, + type RequestOptions, + type Subscription, + type SubscriptionOptions, +} from "@nats-io/transport-node"; +import process from "node:process"; + +export type IteratorResult = { done: true; value?: T } | { done: false; value: T }; + +// NatsClass provides utility functions to interact with NATS +export default class NatsClass { + server: string | string[]; + private nc: NatsConnection | null = null; + + public interrupted = deferred(); + + constructor(server: string | string[]) { + // Handle termination signals + ["SIGINT", "SIGTERM"].forEach((signal) => { + process.on(signal, () => { + console.log("Received", signal); + this.interrupted.resolve(); + setTimeout(() => { + console.log("forced process exit"); + process.exit(2); + }, 10000); + }); + }); + + this.server = server; + } + + // connect to the nats server + connect = async () => { + const isWebsocket = + typeof this.server === "string" + ? this.server.startsWith("ws") + : this.server[0]?.startsWith("ws") ?? false; + this.nc = await (isWebsocket ? wsconnect : connect)({ + servers: this.server, + debug: false, + maxReconnectAttempts: -1, + reconnectTimeWait: 1000, + ignoreClusterUpdates: false, + noRandomize: false, + }); + console.log(`connected ${this.nc.getServer()}`); + }; + + // close the connection to the nats server + close = async () => { + if (this.nc) { + await this.nc.flush(); + await this.nc.drain(); + this.nc = null; + } + }; + + // get the server version + serverVersion = (): string => { + return this.nc?.info?.version ?? "Not connected"; + }; + + // publish a message to the nats server + publish = ( + subject: string, + data: string | Uint8Array, + options?: PublishOptions + ) => { + if (!this.nc) { + throw new Error("Not connected"); + } + return this.nc.publish(subject, data, options); + }; + + // publish data as json, only if data is not a string + publishJson = (subject: string, data: any, options?: PublishOptions) => { + const payload = typeof data === "string" ? data : JSON.stringify(data); + return this.publish(subject, payload, options); + }; + + // publish data as json, only if data is not a string + publishWithType = async ( + subject: string, + data?: T, + options?: PublishOptions + ): Promise => { + await this.publishJson(subject, data, options); + }; + + // sends a request and returns the response message + request = ( + subject: string, + data?: string | Uint8Array, + options?: RequestOptions + ): Promise => { + if (!this.nc) { + throw new Error("Not connected"); + } + return this.nc.request(subject, data, options); + }; + + requestJson = ( + subject: string, + data?: any, + options?: RequestOptions + ): Promise => { + const payload = typeof data === "string" ? data : JSON.stringify(data); + return this.request(subject, payload, options); + }; + + // request with request and response type + requestWithType = async ( + subject: string, + data?: TRequest, + options?: RequestOptions + ): Promise => { + const msg = await this.requestJson(subject, data, options); + return msg.json(); + }; + + // flush the connection to the nats server + flush = async () => { + if (!this.nc) { + throw new Error("Not connected"); + } + await this.nc.flush(); + }; + + // subscribe to a subject, calls nats subscribe function + subscribe = (subject: string, opts?: SubscriptionOptions): Subscription => { + if (!this.nc) { + throw new Error("Not connected"); + } + return this.nc.subscribe(subject, opts); + }; + + subscribeCb = ( + subject: string, + callback: (err: Error | null, msg: Msg) => void, + opts?: SubscriptionOptions + ): Subscription => { + if (!this.nc) { + throw new Error("Not connected"); + } + const subOpts = { + ...opts, + callback, + }; + return this.nc.subscribe(subject, subOpts); + }; + + // subscribe to a subject and decode messages to specified type + subscribeToType = ( + subject: string, + opts?: SubscriptionOptions + ): AsyncIterableIterator => { + const sub = this.subscribe(subject, opts); + const iterator = sub[Symbol.asyncIterator](); + return { + async next(): Promise> { + const result = await iterator.next(); + if (result.done) { + return result; + } + return { + done: false, + value: result.value.json(), + }; + }, + [Symbol.asyncIterator]() { + return this; + }, + }; + }; +} + diff --git a/PlugisClass.ts b/PlugisClass.ts new file mode 100644 index 0000000..b56f909 --- /dev/null +++ b/PlugisClass.ts @@ -0,0 +1,17 @@ +import { CloudEvent } from "cloudevents"; +import NatsClass from "./NatsClass"; + +export class PlugisClass extends NatsClass { + constructor(server: string | string[]) { + super(server); + } + + async variableSet(variable: string, value: string) { + const ce = new CloudEvent({ + type: "com.plugis.variable.set", + source: "/plugis/variable", + data: { variable, value }, + }); + this.publishJson("variable.set.>", ce); + } +} \ No newline at end of file diff --git a/README.md b/README.md index 7d916be..4468b1e 100644 --- a/README.md +++ b/README.md @@ -1,3 +1,6 @@ # plugis-client -plugis client for typescript / bun \ No newline at end of file +plugis client for typescript / bun + +npm config set @plugis:registry https://git.fibre.plugis.com/api/packages/telemac/npm/ +npm config set -- '//git.fibre.plugis.com/api/packages/telemac/npm/:_authToken' "{token}" diff --git a/bunfig.toml b/bunfig.toml new file mode 100644 index 0000000..68ca1cc --- /dev/null +++ b/bunfig.toml @@ -0,0 +1,7 @@ +[install.scopes] +# as an object with username/password +# you can reference environment variables +"@plugis" = { + username = "telemac", + url = "https://git.fibre.plugis.com/" +} diff --git a/heartbeat-subscribe.ts b/heartbeat-subscribe.ts new file mode 100644 index 0000000..56f4a28 --- /dev/null +++ b/heartbeat-subscribe.ts @@ -0,0 +1,61 @@ +import { type HeartbeatSent } from "../../../cloudevents/com.plugis/heartbeat/Sent/types/ts/heartbeatSent.d"; +import { CloudEvent } from "cloudevents"; +import NatsClass from "./NatsClass"; +import process from "node:process"; +import { PlugisClass } from "./PlugisClass"; + +const nats = new PlugisClass("ws://nats.plugis.cloud:8222"); +await nats.connect(); +console.log("server version", nats.serverVersion()); + +// publish a heartbeat cloud event every 30 seconds +setInterval(() => { + const heartbeatSent: HeartbeatSent = { + hostname: process.env.HOSTNAME || require('os').hostname(), + timestamp: new Date().toISOString(), + mac: "00:11:22:33:44:55", + ip: "192.168.1.100", + started: new Date().toISOString(), + uptime: 1000, + version: "1.0.0", + "nats-service": "test-service", + os: process.platform, + arch: process.arch + }; + const ce = new CloudEvent({ + type: "com.plugis.heartbeat.Sent", + source: `${import.meta.url}#${new Error().stack?.split('\n')[1]?.split(':')[1] || '0'}`, + data: heartbeatSent, + }); + nats.publishJson("com.plugis.heartbeat.Sent.TEST", ce); +}, 1000*60); + +// subscribe using a callback +const _sub = nats.subscribeCb("remote.*.event", (err, msg) => { + if (err) { + console.error("error", err); + return; + } + const ce = msg.json>(); + const waypoint = ce.data as Waypoint; + + console.log("msg", msg.subject, "data", msg.string()); +}); + +// subscribe using an iterator +const cloudEventsIterator = nats.subscribeToType>("com.plugis.heartbeat.Sent.>"); +(async () => { + for await (const ce of cloudEventsIterator) { + const heartbeatSent = ce.data as HeartbeatSent; + console.log("received cloud event", ce); + } +})(); + +// wait until the process is interrupted +await nats.interrupted; +console.log("interrupted"); + +// close nats connection +await nats.close(); +console.log("nats.close done"); +process.exit(0); diff --git a/package.json b/package.json new file mode 100644 index 0000000..7514bdd --- /dev/null +++ b/package.json @@ -0,0 +1,9 @@ +{ + "name": "plugis-client", + "version": "0.1.0", + "main": "index.js", + "type": "module", + "publishConfig": { + "registry": "https://git.fibre.plugis.com" + } +} \ No newline at end of file