diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..7176f4b --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +**.DS_Store diff --git a/README.md b/README.md new file mode 100644 index 0000000..a44fb00 --- /dev/null +++ b/README.md @@ -0,0 +1,21 @@ +# plugis3-service + +plugis3-service is a package to write plugis3 services in Go. + +A Plugis3 service is a NATS micro service with some specific properties that must implement the `PlugisServiceIntf` interface. + +## NATS Micro PlugisServiceIntf Integration + +For NATS services, see: +- [Building a PlugisServiceIntf](https://docs.nats.io/using-nats/nex/getting-started/building-service) +- [NATS micro on Github](https://github.com/nats-io/nats.go/tree/main/micro) + +# Files + +The `plugisservice.go` file defines the `PlugisServiceIntf` interface, which specifies the contract that all plugis services must implement. + +The `plugis.go` file defines the `PlugisIntf` interface, which specifies the functions provided by plugis + +The `example/echoService/echoService.go` file is a minimal service implementation sample. + +The `example/echoService/cmd/main.go` file is a sample of ServiceRunner usage. \ No newline at end of file diff --git a/example/echoService/cmd/main.go b/example/echoService/cmd/main.go new file mode 100644 index 0000000..f39c885 --- /dev/null +++ b/example/echoService/cmd/main.go @@ -0,0 +1,33 @@ +package main + +import ( + "log/slog" + "time" + + "github.com/nats-io/nats.go" + "github.com/telemac/plugisservice" + + echoservice "github.com/telemac/plugisservice/example/echoService" + + "github.com/telemac/goutils/task" +) + +func main() { + ctx, cancel := task.NewCancellableContext(time.Second * 10) + defer cancel() + logger := slog.Default().With("service", "echoService") + + nc, err := nats.Connect("wss://idronebox:admin@n1.idronebox.com") + if err != nil { + logger.Error("connect to nat", "err", err) + return + } + defer nc.Close() + + runner := plugisservice.NewServiceRunner(nc, logger) + + runner.Start(ctx, echoservice.NewEchoService()) + + runner.Wait() + +} diff --git a/example/echoService/echoService.go b/example/echoService/echoService.go new file mode 100644 index 0000000..2524b86 --- /dev/null +++ b/example/echoService/echoService.go @@ -0,0 +1,125 @@ +package echoservice + +import ( + "context" + "github.com/nats-io/nats.go/micro" + "github.com/telemac/plugisservice" + nats_service "github.com/telemac/plugisservice/pkg/nats-service" + "time" +) + +// Ensure EchoService implements plugisservice.PlugisServiceIntf interface +var _ plugisservice.PlugisServiceIntf = (*EchoService)(nil) + +// EchoService is a service that echoes a message. +type EchoService struct { + plugisservice.Plugis +} + +func NewEchoService() *EchoService { + return &EchoService{} +} + +// ExecuteCommand sends a command +func (svc *EchoService) ExecuteCommand(ctx context.Context, command string) error { + subject := "ism.homelab.service.plugis.command" + + svc.Logger().Info("sending command", + "command", command, + "subject", subject, + ) + + msg, err := svc.Request(subject, []byte(command), time.Second*5) + if err != nil { + svc.Logger().Error("command execution failed", + "error", err, + "command", command, + ) + return err + } else { + svc.Logger().Info("command executed successfully", + "command", command, + "response", string(msg.Data), + ) + } + + return nil +} + +// Run is the main function of the service. +func (svc *EchoService) Run(ctx context.Context) error { + svc.Logger().Info("Run started") + defer svc.Logger().Info("Run finished\a") + + //services, err := svc.GetServices(ctx) + //svc.Logger().Info("services", + // "services", services, + // "error", err, + //) + + svc.ExecuteCommand(ctx, "sleep 3") + + service, err := nats_service.NewNatsService(svc.Nats(), svc.Prefix(), micro.Config{ + Name: svc.Name(), + Endpoint: nil, + Version: svc.Version(), + Description: svc.Description(), + Metadata: svc.Metadata(), + }) + if err != nil { + return err + } + defer func() { + service.Stop() + }() + + echoEndpoint := nats_service.EndpointConfig{ + Name: "ping", + Handler: func(ctx context.Context, request micro.Request) (any, error) { + data := request.Data() + _ = data + return "ping: " + string(data), err + }, + MaxConcurrency: 10, + RequestTimeout: 2 * time.Second, + Metadata: map[string]string{ + "description": "ping", + "version": "0.0.1", + }, + } + + err = service.AddEndpoint(ctx, echoEndpoint) + if err != nil { + return err + } + + <-ctx.Done() + + service.Stop() + + return nil +} + +// Name returns the name of the service. +func (svc *EchoService) Name() string { + return "echo" +} + +// Description returns the description of the service. +func (svc *EchoService) Description() string { + return "Echo service" +} + +// Version returns the version of the service. +func (svc *EchoService) Version() string { + return "1.0.0" +} + +// Metadata returns the metadata of the service. +func (svc *EchoService) Metadata() plugisservice.Metadata { + meta, err := plugisservice.NewServiceMetadata(svc.Prefix(), time.Now()) + if err != nil { + svc.Logger().Error("NewServiceMetadata", "error", err) + } + return meta +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..7fedfd7 --- /dev/null +++ b/go.mod @@ -0,0 +1,19 @@ +module github.com/telemac/plugisservice + +go 1.24.0 + +require ( + github.com/go-viper/mapstructure/v2 v2.3.0 + github.com/nats-io/nats.go v1.43.0 + github.com/synadia-io/orbit.go/natsext v0.1.1 + github.com/telemac/goutils v1.1.49 +) + +require ( + github.com/klauspost/compress v1.18.0 // indirect + github.com/nats-io/nkeys v0.4.11 // indirect + github.com/nats-io/nuid v1.0.1 // indirect + github.com/sirupsen/logrus v1.9.3 // indirect + golang.org/x/crypto v0.39.0 // indirect + golang.org/x/sys v0.33.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..9b8a826 --- /dev/null +++ b/go.sum @@ -0,0 +1,36 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/go-viper/mapstructure/v2 v2.3.0 h1:27XbWsHIqhbdR5TIC911OfYvgSaW93HM+dX7970Q7jk= +github.com/go-viper/mapstructure/v2 v2.3.0/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM= +github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo= +github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ= +github.com/nats-io/nats.go v1.43.0 h1:uRFZ2FEoRvP64+UUhaTokyS18XBCR/xM2vQZKO4i8ug= +github.com/nats-io/nats.go v1.43.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g= +github.com/nats-io/nkeys v0.4.11 h1:q44qGV008kYd9W1b1nEBkNzvnWxtRSQ7A8BoqRrcfa0= +github.com/nats-io/nkeys v0.4.11/go.mod h1:szDimtgmfOi9n25JpfIdGw12tZFYXqhGxjhVxsatHVE= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= +github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/synadia-io/orbit.go/natsext v0.1.1 h1:ujoNs8vQtez640kkgXotSUZj2+GmsmqgjMKnJOLSE6s= +github.com/synadia-io/orbit.go/natsext v0.1.1/go.mod h1:TbK/ch5qjAMr9HuCF+Guyws92rpWfotFarOuVjfJIv8= +github.com/telemac/goutils v1.1.49 h1:/fqBpoBc6us4Eutn2BHzxu5mv7QEeF3+5DAnBPThWSk= +github.com/telemac/goutils v1.1.49/go.mod h1:KdJPFOHRQw6gMTnqz0LhhGIgfSnecZ/EjOBh4ups9HE= +golang.org/x/crypto v0.39.0 h1:SHs+kF4LP+f+p14esP5jAoDpHU8Gu/v9lFRK6IT5imM= +golang.org/x/crypto v0.39.0/go.mod h1:L+Xg3Wf6HoL4Bn4238Z6ft6KfEpN0tJGo53AAPC632U= +golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ= +golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.33.0 h1:q3i8TbbEz+JRD9ywIRlyRAQbM0qF7hu24q3teo2hbuw= +golang.org/x/sys v0.33.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/pkg/nats-service/nats-service.go b/pkg/nats-service/nats-service.go new file mode 100644 index 0000000..87ecb65 --- /dev/null +++ b/pkg/nats-service/nats-service.go @@ -0,0 +1,243 @@ +package nats_service + +import ( + "context" + "fmt" + "time" + + "github.com/nats-io/nats.go" + "github.com/nats-io/nats.go/micro" +) + +// EndpointConfig defines configuration for a single NATS endpoint +type EndpointConfig struct { + Name string `json:"name"` + Handler PlugisServiceHandler `json:"-"` // Non-serializable + MaxConcurrency int `json:"max_concurrency"` + RequestTimeout time.Duration `json:"request_timeout"` + Metadata map[string]string `json:"metadata,omitempty"` + QueueGroup string `json:"queue_group,omitempty"` + Subject string `json:"subject,omitempty"` +} + +// setDefaults applies default values to endpoint configuration +func (config *EndpointConfig) setDefaults(serviceName, prefix string) { + if config.MaxConcurrency <= 0 { + config.MaxConcurrency = 10 + } + if config.RequestTimeout <= 0 { + config.RequestTimeout = 30 * time.Second + } + if config.Subject == "" { + if prefix == "" { + config.Subject = serviceName + "." + config.Name + } else { + config.Subject = prefix + "." + serviceName + "." + config.Name + } + } +} + +// NatsService wraps a NATS micro.Service with endpoint management +type NatsService struct { + nc *nats.Conn + prefix string + svc micro.Service +} + +// NewNatsService creates a new NATS service with the given prefix and configuration +func NewNatsService(conn *nats.Conn, prefix string, config micro.Config) (*NatsService, error) { + natsService := &NatsService{ + nc: conn, + prefix: prefix, + } + var err error + natsService.svc, err = micro.AddService(natsService.nc, config) + return natsService, err +} + +// Svc returns the underlying NATS micro.Service +func (ns *NatsService) Svc() micro.Service { + return ns.svc +} + +// Stop stops the NATS service +func (ns *NatsService) Stop() error { + return ns.svc.Stop() +} + +// Shutdown gracefully shuts down the service with context cancellation +func (ns *NatsService) Shutdown(ctx context.Context) error { + done := make(chan struct{}) + go func() { + defer close(done) + ns.svc.Stop() + }() + + select { + case <-done: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +// PlugisServiceReply defines the standard response format for all endpoints +type PlugisServiceReply struct { + Error string `json:"error,omitempty"` + Result any `json:"result,omitempty"` + Timestamp time.Time `json:"timestamp"` + Duration string `json:"duration,omitempty"` +} + +// PlugisServiceHandler is the function signature for endpoint handlers +type PlugisServiceHandler func(ctx context.Context, request micro.Request) (any, error) + +// PlugisHandler manages concurrency and timeouts for a single endpoint +type PlugisHandler struct { + plugisServiceHandler PlugisServiceHandler + ctx context.Context + semaphore chan struct{} + config EndpointConfig +} + +// NewPlugisHandler creates a new handler with concurrency control +func NewPlugisHandler(ctx context.Context, config EndpointConfig) *PlugisHandler { + return &PlugisHandler{ + ctx: ctx, + plugisServiceHandler: config.Handler, + semaphore: make(chan struct{}, config.MaxConcurrency), + config: config, + } +} + +// Handle processes incoming requests with concurrency limiting +func (ph PlugisHandler) Handle(req micro.Request) { + // Try to acquire semaphore with non-blocking select + select { + case ph.semaphore <- struct{}{}: + // Successfully acquired semaphore, process request + go ph.handleWithTimeout(req) + default: + // Endpoint overloaded, reject request immediately + ph.sendErrorResponse(req, "503", "Endpoint overloaded - too many concurrent requests", time.Now()) + } +} + +// handleWithTimeout executes the handler with timeout and panic recovery +func (ph PlugisHandler) handleWithTimeout(req micro.Request) { + start := time.Now() + + // Always release semaphore when done + defer func() { + <-ph.semaphore + }() + + // Create timeout context + ctx, cancel := context.WithTimeout(ph.ctx, ph.config.RequestTimeout) + defer cancel() + + // Channel to receive result + type result struct { + data any + err error + } + resultChan := make(chan result, 1) + + // Execute handler in separate goroutine + go func() { + defer func() { + if r := recover(); r != nil { + resultChan <- result{ + data: nil, + err: fmt.Errorf("handler panicked: %v", r), + } + } + }() + + data, err := ph.plugisServiceHandler(ctx, req) + resultChan <- result{data: data, err: err} + }() + + // Wait for either completion or timeout + select { + case res := <-resultChan: + ph.sendResponse(req, res.data, res.err, start) + case <-ctx.Done(): + if ctx.Err() == context.DeadlineExceeded { + ph.sendErrorResponse(req, "408", "Request timeout", start) + } else { + ph.sendErrorResponse(req, "499", "Request cancelled", start) + } + } +} + +// sendResponse sends a successful response with timing information +func (ph PlugisHandler) sendResponse(req micro.Request, result any, err error, start time.Time) { + duration := time.Since(start) + + reply := PlugisServiceReply{ + Result: result, + Timestamp: time.Now(), + Duration: duration.String(), + } + + if err != nil { + reply.Error = err.Error() + } + + if err := req.RespondJSON(reply); err != nil { + // Failed to send response, but we can't do much about it + // In a production system, this would be logged + } +} + +// sendErrorResponse sends an error response with status code and timing +func (ph PlugisHandler) sendErrorResponse(req micro.Request, code, message string, start time.Time) { + duration := time.Since(start) + + reply := PlugisServiceReply{ + Error: fmt.Sprintf("%s: %s", code, message), + Timestamp: time.Now(), + Duration: duration.String(), + } + + if err := req.RespondJSON(reply); err != nil { + // Failed to send error response + } +} + +// AddEndpoint adds a new endpoint to the service with the given configuration +func (ns *NatsService) AddEndpoint(ctx context.Context, config EndpointConfig) error { + if config.Name == "" { + return fmt.Errorf("endpoint name cannot be empty") + } + if config.Handler == nil { + return fmt.Errorf("handler cannot be nil") + } + + // Set defaults + config.setDefaults(ns.svc.Info().Name, ns.prefix) + + // Build micro options + options := []micro.EndpointOpt{ + micro.WithEndpointSubject(config.Subject), + } + + if config.QueueGroup != "" { + options = append(options, micro.WithEndpointQueueGroup(config.QueueGroup)) + } else { + options = append(options, micro.WithEndpointQueueGroupDisabled()) + } + + if len(config.Metadata) > 0 { + options = append(options, micro.WithEndpointMetadata(config.Metadata)) + } + + handler := NewPlugisHandler(ctx, config) + + return ns.svc.AddEndpoint(config.Name, handler, options...) +} + +func (ns *NatsService) Prefix() string { + return ns.prefix +} diff --git a/plugis.go b/plugis.go new file mode 100644 index 0000000..b8248d7 --- /dev/null +++ b/plugis.go @@ -0,0 +1,208 @@ +package plugisservice + +import ( + "context" + "encoding/json" + "errors" + "iter" + "log/slog" + "os" + "runtime" + "time" + + "github.com/telemac/goutils/net" + "github.com/telemac/goutils/task" + + "github.com/go-viper/mapstructure/v2" + "github.com/nats-io/nats.go" + "github.com/synadia-io/orbit.go/natsext" +) + +// Ensure Plugis implements PlugisIntf +var _ PlugisIntf = (*Plugis)(nil) + +// Plugis is the default implementation of the PlugisIntf that provides +// the functionality to plugis services. +type Plugis struct { + logger *slog.Logger + nc *nats.Conn +} + +var ( + // ErrNatsNotConnected = errors.New("nats not connected") + ErrNatsConnectionNil = errors.New("nats connection is nil") +) + +// SetLogger sets the logger for the Plugis instance. +func (plugis *Plugis) SetLogger(log *slog.Logger) { + plugis.logger = log +} + +// Logger returns the logger for the Plugis instance. +func (plugis *Plugis) Logger() *slog.Logger { + if plugis.logger == nil { + plugis.logger = slog.Default() + } + return plugis.logger +} + +// SetNats sets the nats connection for the Plugis instance. +func (plugis *Plugis) SetNats(nc *nats.Conn) { + plugis.nc = nc +} + +// Nats returns the nats connection for the Plugis instance. +func (plugis *Plugis) Nats() *nats.Conn { + return plugis.nc +} + +// Publish publishes a message to the nats connection. +func (plugis *Plugis) Publish(topic string, payload []byte) error { + attrs := []slog.Attr{ + slog.String("fn", "Plugis.Publish"), + slog.String("topic", topic), + slog.String("payload", string(payload)), + } + if plugis.nc == nil { + return ErrNatsConnectionNil + } + err := plugis.nc.Publish(topic, payload) + if err != nil { + attrs = append(attrs, slog.String("err", err.Error())) + plugis.Logger().LogAttrs(context.TODO(), slog.LevelError, "Publish payload to topic", attrs...) + } else { + plugis.Logger().LogAttrs(context.TODO(), slog.LevelDebug, "Published payload to topic", attrs...) + } + return err +} + +// Prefix returns the prefix for the Plugis instance. +func (plugis *Plugis) Prefix() string { + return "integrator.customer.location" +} + +// Request sends a request to the nats connection. +func (plugis *Plugis) Request(subj string, data []byte, timeout time.Duration) (*nats.Msg, error) { + attrs := []slog.Attr{ + slog.String("fn", "Plugis.Request"), + slog.String("subject", subj), + slog.String("data", string(data)), + slog.Duration("timeout", timeout), + } + if plugis.nc == nil { + attrs = append(attrs, slog.String("err", ErrNatsConnectionNil.Error())) + plugis.Logger().LogAttrs(context.TODO(), slog.LevelError, "Request failed - nats connection is nil", attrs...) + return nil, ErrNatsConnectionNil + } + msg, err := plugis.nc.Request(subj, data, timeout) + if err != nil { + attrs = append(attrs, slog.String("err", err.Error())) + plugis.Logger().LogAttrs(context.TODO(), slog.LevelError, "Request failed", attrs...) + } else { + plugis.Logger().LogAttrs(context.TODO(), slog.LevelDebug, "Request successful", attrs...) + } + return msg, err +} + +// RequestCtx sends a request to the nats connection and returns a single message. +// context is used for timeout and cancellation. +func (plugis *Plugis) RequestCtx(ctx context.Context, subj string, data []byte) (*nats.Msg, error) { + return nil, errors.New("not implemented") + deadline, ok := ctx.Deadline() + if !ok { + //ctx, _ = context.WithTimeout(ctx, time.Hour*24) + } else { + _ = deadline + } + iter, err := plugis.RequestMany(ctx, subj, data, natsext.RequestManyMaxMessages(1)) + plugis.Logger().Warn("RequestMany", + slog.String("subject", subj), + "err", err, + "ctxErr", ctx.Err(), + "cancealed", task.IsCancelled(ctx), + ) + if err != nil { + return nil, err + } + if ctx.Err() != nil { + return nil, ctx.Err() + } + for msg := range iter { + return msg, nil + } + return &nats.Msg{}, ctx.Err() +} + +// RequestMany sends a request to the nats connection and returns a sequence of messages. +func (plugis *Plugis) RequestMany(ctx context.Context, subject string, data []byte, opts ...natsext.RequestManyOpt) (iter.Seq2[*nats.Msg, error], error) { + attrs := []slog.Attr{ + slog.String("fn", "Plugis.RequestMany"), + slog.String("subject", subject), + slog.String("data", string(data)), + } + if plugis.nc == nil { + attrs = append(attrs, slog.String("err", ErrNatsConnectionNil.Error())) + plugis.Logger().LogAttrs(context.TODO(), slog.LevelError, "RequestMany failed - nats connection is nil", attrs...) + return nil, ErrNatsConnectionNil + } + result, err := natsext.RequestMany(ctx, plugis.nc, subject, data, opts...) + if err != nil { + attrs = append(attrs, slog.String("err", err.Error())) + plugis.Logger().LogAttrs(context.TODO(), slog.LevelError, "RequestMany failed", attrs...) + } else { + plugis.Logger().LogAttrs(context.TODO(), slog.LevelDebug, "RequestMany successful", attrs...) + } + return result, err +} + +// GetServices sends a request to the $SRV.INFO subject and returns a list of services. +func (plugis *Plugis) GetServices(ctx context.Context) ([]ServiceInfo, error) { + iter, err := plugis.RequestMany(ctx, "$SRV.INFO", []byte("")) + if err != nil { + return nil, err + } + var services []ServiceInfo + for msg := range iter { + var serviceInfo ServiceInfo + err := json.Unmarshal(msg.Data, &serviceInfo) + if err != nil { + return nil, err + } + services = append(services, serviceInfo) + } + return services, nil +} + +// isRunningInDockerContainer +func isRunningInDockerContainer() bool { + if _, err := os.Stat("/.dockerenv"); err == nil { + return true + } + return false +} + +// NewServiceMetadata creates and fills a ServiceMetadata structure +func NewServiceMetadata(prefix string, startedAt time.Time) (Metadata, error) { + var err error + var meta ServiceMetadata + meta.Prefix = prefix + meta.Platform = runtime.GOOS + "/" + runtime.GOARCH + if isRunningInDockerContainer() { + meta.Platform += " (docker)" + } + + meta.StartedAt = startedAt.String() + meta.Hostname, err = os.Hostname() + if err != nil { + return nil, err + } + meta.MAC, err = net.GetMACAddress() + if err != nil { + return nil, err + } + + // Convert struct to map using mapstructure + var result Metadata + err = mapstructure.Decode(meta, &result) + return result, err +} diff --git a/plugisservice.go b/plugisservice.go new file mode 100644 index 0000000..88d82d1 --- /dev/null +++ b/plugisservice.go @@ -0,0 +1,65 @@ +package plugisservice + +import ( + "context" + "iter" + "log/slog" + "time" + + "github.com/nats-io/nats.go" + "github.com/synadia-io/orbit.go/natsext" +) + +// Metadata is a map of key-value pairs that can be used to store additional information about a service/endpoint +type Metadata map[string]string + +// PlugisServiceIntf is the interface that must be implemented by all plugis services. +type PlugisServiceIntf interface { + Run(ctx context.Context) error + Name() string + Description() string + Version() string + Metadata() Metadata + PlugisIntf +} + +// PlugisIntf holds the methods that can be used by a plugis service. +type PlugisIntf interface { + SetLogger(*slog.Logger) + Logger() *slog.Logger + SetNats(*nats.Conn) + Nats() *nats.Conn + Prefix() string + Publish(topic string, payload []byte) error + Request(subj string, data []byte, timeout time.Duration) (*nats.Msg, error) + RequestMany(ctx context.Context, subject string, data []byte, opts ...natsext.RequestManyOpt) (iter.Seq2[*nats.Msg, error], error) + GetServices(ctx context.Context) ([]ServiceInfo, error) +} + +// ServiceInfo is the information about a service. +type ServiceInfo struct { + Name string `json:"name"` + Id string `json:"id"` + Description string `json:"description"` + Version string `json:"version"` + Type string `json:"type"` + Metadata Metadata `json:"metadata"` + Endpoints []Endpoint `json:"endpoints"` +} + +// ServiceMetadata is the metadata about a service. +type ServiceMetadata struct { + Hostname string `json:"hostname"` + MAC string `json:"mac"` + Platform string `json:"platform"` + Prefix string `json:"prefix"` + StartedAt string `json:"started_at"` +} + +// Endpoint is the information about an endpoint. +type Endpoint struct { + Name string `json:"name"` + Subject string `json:"subject"` + QueueGroup string `json:"queue_group"` + Metadata Metadata `json:"metadata,omitempty"` +} diff --git a/servicerunner.go b/servicerunner.go new file mode 100644 index 0000000..bd040bc --- /dev/null +++ b/servicerunner.go @@ -0,0 +1,62 @@ +package plugisservice + +import ( + "context" + "fmt" + "log/slog" + "sync" + + "github.com/nats-io/nats.go" +) + +// ServiceRunner is a struct that runs one or more services. +type ServiceRunner struct { + wg sync.WaitGroup + log *slog.Logger + nc *nats.Conn +} + +func NewServiceRunner(nc *nats.Conn, log *slog.Logger) *ServiceRunner { + if log == nil { + log = slog.Default() + } + if nc == nil { + log.Error("failed to connect to nats") + return nil + } + return &ServiceRunner{ + nc: nc, + log: log, + } +} + +// Start starts a service. +func (sr *ServiceRunner) Start(ctx context.Context, svc PlugisServiceIntf) { + sr.wg.Add(1) + go func() { + defer sr.wg.Done() + svc.SetLogger(sr.log) + svc.SetNats(sr.nc) + serviceType := fmt.Sprintf("%T", svc) + err := svc.Run(ctx) + if err != nil { + sr.log.Error("service execution", "error", err, "service", serviceType) + } + err = svc.Nats().Flush() + if err != nil { + sr.log.Error("service flush", "error", err, "service", fmt.Sprintf("%T", svc)) + } + }() +} + +// Wait waits for all services to finish. +func (sr *ServiceRunner) Wait() { + sr.wg.Wait() + if sr.nc != nil { + err := sr.nc.Drain() + if err != nil { + sr.log.Error("service drain", "error", err) + } + sr.nc.Close() + } +}