First commit
This commit is contained in:
parent
36e5787c0d
commit
848d7406a0
10 changed files with 813 additions and 0 deletions
1
.gitignore
vendored
Normal file
1
.gitignore
vendored
Normal file
|
|
@ -0,0 +1 @@
|
|||
**.DS_Store
|
||||
21
README.md
Normal file
21
README.md
Normal file
|
|
@ -0,0 +1,21 @@
|
|||
# plugis3-service
|
||||
|
||||
plugis3-service is a package to write plugis3 services in Go.
|
||||
|
||||
A Plugis3 service is a NATS micro service with some specific properties that must implement the `PlugisServiceIntf` interface.
|
||||
|
||||
## NATS Micro PlugisServiceIntf Integration
|
||||
|
||||
For NATS services, see:
|
||||
- [Building a PlugisServiceIntf](https://docs.nats.io/using-nats/nex/getting-started/building-service)
|
||||
- [NATS micro on Github](https://github.com/nats-io/nats.go/tree/main/micro)
|
||||
|
||||
# Files
|
||||
|
||||
The `plugisservice.go` file defines the `PlugisServiceIntf` interface, which specifies the contract that all plugis services must implement.
|
||||
|
||||
The `plugis.go` file defines the `PlugisIntf` interface, which specifies the functions provided by plugis
|
||||
|
||||
The `example/echoService/echoService.go` file is a minimal service implementation sample.
|
||||
|
||||
The `example/echoService/cmd/main.go` file is a sample of ServiceRunner usage.
|
||||
33
example/echoService/cmd/main.go
Normal file
33
example/echoService/cmd/main.go
Normal file
|
|
@ -0,0 +1,33 @@
|
|||
package main
|
||||
|
||||
import (
|
||||
"log/slog"
|
||||
"time"
|
||||
|
||||
"github.com/nats-io/nats.go"
|
||||
"github.com/telemac/plugisservice"
|
||||
|
||||
echoservice "github.com/telemac/plugisservice/example/echoService"
|
||||
|
||||
"github.com/telemac/goutils/task"
|
||||
)
|
||||
|
||||
func main() {
|
||||
ctx, cancel := task.NewCancellableContext(time.Second * 10)
|
||||
defer cancel()
|
||||
logger := slog.Default().With("service", "echoService")
|
||||
|
||||
nc, err := nats.Connect("wss://idronebox:admin@n1.idronebox.com")
|
||||
if err != nil {
|
||||
logger.Error("connect to nat", "err", err)
|
||||
return
|
||||
}
|
||||
defer nc.Close()
|
||||
|
||||
runner := plugisservice.NewServiceRunner(nc, logger)
|
||||
|
||||
runner.Start(ctx, echoservice.NewEchoService())
|
||||
|
||||
runner.Wait()
|
||||
|
||||
}
|
||||
125
example/echoService/echoService.go
Normal file
125
example/echoService/echoService.go
Normal file
|
|
@ -0,0 +1,125 @@
|
|||
package echoservice
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/nats-io/nats.go/micro"
|
||||
"github.com/telemac/plugisservice"
|
||||
nats_service "github.com/telemac/plugisservice/pkg/nats-service"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Ensure EchoService implements plugisservice.PlugisServiceIntf interface
|
||||
var _ plugisservice.PlugisServiceIntf = (*EchoService)(nil)
|
||||
|
||||
// EchoService is a service that echoes a message.
|
||||
type EchoService struct {
|
||||
plugisservice.Plugis
|
||||
}
|
||||
|
||||
func NewEchoService() *EchoService {
|
||||
return &EchoService{}
|
||||
}
|
||||
|
||||
// ExecuteCommand sends a command
|
||||
func (svc *EchoService) ExecuteCommand(ctx context.Context, command string) error {
|
||||
subject := "ism.homelab.service.plugis.command"
|
||||
|
||||
svc.Logger().Info("sending command",
|
||||
"command", command,
|
||||
"subject", subject,
|
||||
)
|
||||
|
||||
msg, err := svc.Request(subject, []byte(command), time.Second*5)
|
||||
if err != nil {
|
||||
svc.Logger().Error("command execution failed",
|
||||
"error", err,
|
||||
"command", command,
|
||||
)
|
||||
return err
|
||||
} else {
|
||||
svc.Logger().Info("command executed successfully",
|
||||
"command", command,
|
||||
"response", string(msg.Data),
|
||||
)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Run is the main function of the service.
|
||||
func (svc *EchoService) Run(ctx context.Context) error {
|
||||
svc.Logger().Info("Run started")
|
||||
defer svc.Logger().Info("Run finished\a")
|
||||
|
||||
//services, err := svc.GetServices(ctx)
|
||||
//svc.Logger().Info("services",
|
||||
// "services", services,
|
||||
// "error", err,
|
||||
//)
|
||||
|
||||
svc.ExecuteCommand(ctx, "sleep 3")
|
||||
|
||||
service, err := nats_service.NewNatsService(svc.Nats(), svc.Prefix(), micro.Config{
|
||||
Name: svc.Name(),
|
||||
Endpoint: nil,
|
||||
Version: svc.Version(),
|
||||
Description: svc.Description(),
|
||||
Metadata: svc.Metadata(),
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
service.Stop()
|
||||
}()
|
||||
|
||||
echoEndpoint := nats_service.EndpointConfig{
|
||||
Name: "ping",
|
||||
Handler: func(ctx context.Context, request micro.Request) (any, error) {
|
||||
data := request.Data()
|
||||
_ = data
|
||||
return "ping: " + string(data), err
|
||||
},
|
||||
MaxConcurrency: 10,
|
||||
RequestTimeout: 2 * time.Second,
|
||||
Metadata: map[string]string{
|
||||
"description": "ping",
|
||||
"version": "0.0.1",
|
||||
},
|
||||
}
|
||||
|
||||
err = service.AddEndpoint(ctx, echoEndpoint)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
<-ctx.Done()
|
||||
|
||||
service.Stop()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Name returns the name of the service.
|
||||
func (svc *EchoService) Name() string {
|
||||
return "echo"
|
||||
}
|
||||
|
||||
// Description returns the description of the service.
|
||||
func (svc *EchoService) Description() string {
|
||||
return "Echo service"
|
||||
}
|
||||
|
||||
// Version returns the version of the service.
|
||||
func (svc *EchoService) Version() string {
|
||||
return "1.0.0"
|
||||
}
|
||||
|
||||
// Metadata returns the metadata of the service.
|
||||
func (svc *EchoService) Metadata() plugisservice.Metadata {
|
||||
meta, err := plugisservice.NewServiceMetadata(svc.Prefix(), time.Now())
|
||||
if err != nil {
|
||||
svc.Logger().Error("NewServiceMetadata", "error", err)
|
||||
}
|
||||
return meta
|
||||
}
|
||||
19
go.mod
Normal file
19
go.mod
Normal file
|
|
@ -0,0 +1,19 @@
|
|||
module github.com/telemac/plugisservice
|
||||
|
||||
go 1.24.0
|
||||
|
||||
require (
|
||||
github.com/go-viper/mapstructure/v2 v2.3.0
|
||||
github.com/nats-io/nats.go v1.43.0
|
||||
github.com/synadia-io/orbit.go/natsext v0.1.1
|
||||
github.com/telemac/goutils v1.1.49
|
||||
)
|
||||
|
||||
require (
|
||||
github.com/klauspost/compress v1.18.0 // indirect
|
||||
github.com/nats-io/nkeys v0.4.11 // indirect
|
||||
github.com/nats-io/nuid v1.0.1 // indirect
|
||||
github.com/sirupsen/logrus v1.9.3 // indirect
|
||||
golang.org/x/crypto v0.39.0 // indirect
|
||||
golang.org/x/sys v0.33.0 // indirect
|
||||
)
|
||||
36
go.sum
Normal file
36
go.sum
Normal file
|
|
@ -0,0 +1,36 @@
|
|||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||
github.com/go-viper/mapstructure/v2 v2.3.0 h1:27XbWsHIqhbdR5TIC911OfYvgSaW93HM+dX7970Q7jk=
|
||||
github.com/go-viper/mapstructure/v2 v2.3.0/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM=
|
||||
github.com/klauspost/compress v1.18.0 h1:c/Cqfb0r+Yi+JtIEq73FWXVkRonBlf0CRNYc8Zttxdo=
|
||||
github.com/klauspost/compress v1.18.0/go.mod h1:2Pp+KzxcywXVXMr50+X0Q/Lsb43OQHYWRCY2AiWywWQ=
|
||||
github.com/nats-io/nats.go v1.43.0 h1:uRFZ2FEoRvP64+UUhaTokyS18XBCR/xM2vQZKO4i8ug=
|
||||
github.com/nats-io/nats.go v1.43.0/go.mod h1:iRWIPokVIFbVijxuMQq4y9ttaBTMe0SFdlZfMDd+33g=
|
||||
github.com/nats-io/nkeys v0.4.11 h1:q44qGV008kYd9W1b1nEBkNzvnWxtRSQ7A8BoqRrcfa0=
|
||||
github.com/nats-io/nkeys v0.4.11/go.mod h1:szDimtgmfOi9n25JpfIdGw12tZFYXqhGxjhVxsatHVE=
|
||||
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
|
||||
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
|
||||
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
|
||||
github.com/sirupsen/logrus v1.9.3/go.mod h1:naHLuLoDiP4jHNo9R0sCBMtWGeIprob74mVsIT4qYEQ=
|
||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
|
||||
github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=
|
||||
github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
|
||||
github.com/synadia-io/orbit.go/natsext v0.1.1 h1:ujoNs8vQtez640kkgXotSUZj2+GmsmqgjMKnJOLSE6s=
|
||||
github.com/synadia-io/orbit.go/natsext v0.1.1/go.mod h1:TbK/ch5qjAMr9HuCF+Guyws92rpWfotFarOuVjfJIv8=
|
||||
github.com/telemac/goutils v1.1.49 h1:/fqBpoBc6us4Eutn2BHzxu5mv7QEeF3+5DAnBPThWSk=
|
||||
github.com/telemac/goutils v1.1.49/go.mod h1:KdJPFOHRQw6gMTnqz0LhhGIgfSnecZ/EjOBh4ups9HE=
|
||||
golang.org/x/crypto v0.39.0 h1:SHs+kF4LP+f+p14esP5jAoDpHU8Gu/v9lFRK6IT5imM=
|
||||
golang.org/x/crypto v0.39.0/go.mod h1:L+Xg3Wf6HoL4Bn4238Z6ft6KfEpN0tJGo53AAPC632U=
|
||||
golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ=
|
||||
golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
|
||||
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
|
||||
golang.org/x/sys v0.33.0 h1:q3i8TbbEz+JRD9ywIRlyRAQbM0qF7hu24q3teo2hbuw=
|
||||
golang.org/x/sys v0.33.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
|
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||
243
pkg/nats-service/nats-service.go
Normal file
243
pkg/nats-service/nats-service.go
Normal file
|
|
@ -0,0 +1,243 @@
|
|||
package nats_service
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/nats-io/nats.go"
|
||||
"github.com/nats-io/nats.go/micro"
|
||||
)
|
||||
|
||||
// EndpointConfig defines configuration for a single NATS endpoint
|
||||
type EndpointConfig struct {
|
||||
Name string `json:"name"`
|
||||
Handler PlugisServiceHandler `json:"-"` // Non-serializable
|
||||
MaxConcurrency int `json:"max_concurrency"`
|
||||
RequestTimeout time.Duration `json:"request_timeout"`
|
||||
Metadata map[string]string `json:"metadata,omitempty"`
|
||||
QueueGroup string `json:"queue_group,omitempty"`
|
||||
Subject string `json:"subject,omitempty"`
|
||||
}
|
||||
|
||||
// setDefaults applies default values to endpoint configuration
|
||||
func (config *EndpointConfig) setDefaults(serviceName, prefix string) {
|
||||
if config.MaxConcurrency <= 0 {
|
||||
config.MaxConcurrency = 10
|
||||
}
|
||||
if config.RequestTimeout <= 0 {
|
||||
config.RequestTimeout = 30 * time.Second
|
||||
}
|
||||
if config.Subject == "" {
|
||||
if prefix == "" {
|
||||
config.Subject = serviceName + "." + config.Name
|
||||
} else {
|
||||
config.Subject = prefix + "." + serviceName + "." + config.Name
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// NatsService wraps a NATS micro.Service with endpoint management
|
||||
type NatsService struct {
|
||||
nc *nats.Conn
|
||||
prefix string
|
||||
svc micro.Service
|
||||
}
|
||||
|
||||
// NewNatsService creates a new NATS service with the given prefix and configuration
|
||||
func NewNatsService(conn *nats.Conn, prefix string, config micro.Config) (*NatsService, error) {
|
||||
natsService := &NatsService{
|
||||
nc: conn,
|
||||
prefix: prefix,
|
||||
}
|
||||
var err error
|
||||
natsService.svc, err = micro.AddService(natsService.nc, config)
|
||||
return natsService, err
|
||||
}
|
||||
|
||||
// Svc returns the underlying NATS micro.Service
|
||||
func (ns *NatsService) Svc() micro.Service {
|
||||
return ns.svc
|
||||
}
|
||||
|
||||
// Stop stops the NATS service
|
||||
func (ns *NatsService) Stop() error {
|
||||
return ns.svc.Stop()
|
||||
}
|
||||
|
||||
// Shutdown gracefully shuts down the service with context cancellation
|
||||
func (ns *NatsService) Shutdown(ctx context.Context) error {
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
defer close(done)
|
||||
ns.svc.Stop()
|
||||
}()
|
||||
|
||||
select {
|
||||
case <-done:
|
||||
return nil
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
// PlugisServiceReply defines the standard response format for all endpoints
|
||||
type PlugisServiceReply struct {
|
||||
Error string `json:"error,omitempty"`
|
||||
Result any `json:"result,omitempty"`
|
||||
Timestamp time.Time `json:"timestamp"`
|
||||
Duration string `json:"duration,omitempty"`
|
||||
}
|
||||
|
||||
// PlugisServiceHandler is the function signature for endpoint handlers
|
||||
type PlugisServiceHandler func(ctx context.Context, request micro.Request) (any, error)
|
||||
|
||||
// PlugisHandler manages concurrency and timeouts for a single endpoint
|
||||
type PlugisHandler struct {
|
||||
plugisServiceHandler PlugisServiceHandler
|
||||
ctx context.Context
|
||||
semaphore chan struct{}
|
||||
config EndpointConfig
|
||||
}
|
||||
|
||||
// NewPlugisHandler creates a new handler with concurrency control
|
||||
func NewPlugisHandler(ctx context.Context, config EndpointConfig) *PlugisHandler {
|
||||
return &PlugisHandler{
|
||||
ctx: ctx,
|
||||
plugisServiceHandler: config.Handler,
|
||||
semaphore: make(chan struct{}, config.MaxConcurrency),
|
||||
config: config,
|
||||
}
|
||||
}
|
||||
|
||||
// Handle processes incoming requests with concurrency limiting
|
||||
func (ph PlugisHandler) Handle(req micro.Request) {
|
||||
// Try to acquire semaphore with non-blocking select
|
||||
select {
|
||||
case ph.semaphore <- struct{}{}:
|
||||
// Successfully acquired semaphore, process request
|
||||
go ph.handleWithTimeout(req)
|
||||
default:
|
||||
// Endpoint overloaded, reject request immediately
|
||||
ph.sendErrorResponse(req, "503", "Endpoint overloaded - too many concurrent requests", time.Now())
|
||||
}
|
||||
}
|
||||
|
||||
// handleWithTimeout executes the handler with timeout and panic recovery
|
||||
func (ph PlugisHandler) handleWithTimeout(req micro.Request) {
|
||||
start := time.Now()
|
||||
|
||||
// Always release semaphore when done
|
||||
defer func() {
|
||||
<-ph.semaphore
|
||||
}()
|
||||
|
||||
// Create timeout context
|
||||
ctx, cancel := context.WithTimeout(ph.ctx, ph.config.RequestTimeout)
|
||||
defer cancel()
|
||||
|
||||
// Channel to receive result
|
||||
type result struct {
|
||||
data any
|
||||
err error
|
||||
}
|
||||
resultChan := make(chan result, 1)
|
||||
|
||||
// Execute handler in separate goroutine
|
||||
go func() {
|
||||
defer func() {
|
||||
if r := recover(); r != nil {
|
||||
resultChan <- result{
|
||||
data: nil,
|
||||
err: fmt.Errorf("handler panicked: %v", r),
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
data, err := ph.plugisServiceHandler(ctx, req)
|
||||
resultChan <- result{data: data, err: err}
|
||||
}()
|
||||
|
||||
// Wait for either completion or timeout
|
||||
select {
|
||||
case res := <-resultChan:
|
||||
ph.sendResponse(req, res.data, res.err, start)
|
||||
case <-ctx.Done():
|
||||
if ctx.Err() == context.DeadlineExceeded {
|
||||
ph.sendErrorResponse(req, "408", "Request timeout", start)
|
||||
} else {
|
||||
ph.sendErrorResponse(req, "499", "Request cancelled", start)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// sendResponse sends a successful response with timing information
|
||||
func (ph PlugisHandler) sendResponse(req micro.Request, result any, err error, start time.Time) {
|
||||
duration := time.Since(start)
|
||||
|
||||
reply := PlugisServiceReply{
|
||||
Result: result,
|
||||
Timestamp: time.Now(),
|
||||
Duration: duration.String(),
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
reply.Error = err.Error()
|
||||
}
|
||||
|
||||
if err := req.RespondJSON(reply); err != nil {
|
||||
// Failed to send response, but we can't do much about it
|
||||
// In a production system, this would be logged
|
||||
}
|
||||
}
|
||||
|
||||
// sendErrorResponse sends an error response with status code and timing
|
||||
func (ph PlugisHandler) sendErrorResponse(req micro.Request, code, message string, start time.Time) {
|
||||
duration := time.Since(start)
|
||||
|
||||
reply := PlugisServiceReply{
|
||||
Error: fmt.Sprintf("%s: %s", code, message),
|
||||
Timestamp: time.Now(),
|
||||
Duration: duration.String(),
|
||||
}
|
||||
|
||||
if err := req.RespondJSON(reply); err != nil {
|
||||
// Failed to send error response
|
||||
}
|
||||
}
|
||||
|
||||
// AddEndpoint adds a new endpoint to the service with the given configuration
|
||||
func (ns *NatsService) AddEndpoint(ctx context.Context, config EndpointConfig) error {
|
||||
if config.Name == "" {
|
||||
return fmt.Errorf("endpoint name cannot be empty")
|
||||
}
|
||||
if config.Handler == nil {
|
||||
return fmt.Errorf("handler cannot be nil")
|
||||
}
|
||||
|
||||
// Set defaults
|
||||
config.setDefaults(ns.svc.Info().Name, ns.prefix)
|
||||
|
||||
// Build micro options
|
||||
options := []micro.EndpointOpt{
|
||||
micro.WithEndpointSubject(config.Subject),
|
||||
}
|
||||
|
||||
if config.QueueGroup != "" {
|
||||
options = append(options, micro.WithEndpointQueueGroup(config.QueueGroup))
|
||||
} else {
|
||||
options = append(options, micro.WithEndpointQueueGroupDisabled())
|
||||
}
|
||||
|
||||
if len(config.Metadata) > 0 {
|
||||
options = append(options, micro.WithEndpointMetadata(config.Metadata))
|
||||
}
|
||||
|
||||
handler := NewPlugisHandler(ctx, config)
|
||||
|
||||
return ns.svc.AddEndpoint(config.Name, handler, options...)
|
||||
}
|
||||
|
||||
func (ns *NatsService) Prefix() string {
|
||||
return ns.prefix
|
||||
}
|
||||
208
plugis.go
Normal file
208
plugis.go
Normal file
|
|
@ -0,0 +1,208 @@
|
|||
package plugisservice
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"iter"
|
||||
"log/slog"
|
||||
"os"
|
||||
"runtime"
|
||||
"time"
|
||||
|
||||
"github.com/telemac/goutils/net"
|
||||
"github.com/telemac/goutils/task"
|
||||
|
||||
"github.com/go-viper/mapstructure/v2"
|
||||
"github.com/nats-io/nats.go"
|
||||
"github.com/synadia-io/orbit.go/natsext"
|
||||
)
|
||||
|
||||
// Ensure Plugis implements PlugisIntf
|
||||
var _ PlugisIntf = (*Plugis)(nil)
|
||||
|
||||
// Plugis is the default implementation of the PlugisIntf that provides
|
||||
// the functionality to plugis services.
|
||||
type Plugis struct {
|
||||
logger *slog.Logger
|
||||
nc *nats.Conn
|
||||
}
|
||||
|
||||
var (
|
||||
// ErrNatsNotConnected = errors.New("nats not connected")
|
||||
ErrNatsConnectionNil = errors.New("nats connection is nil")
|
||||
)
|
||||
|
||||
// SetLogger sets the logger for the Plugis instance.
|
||||
func (plugis *Plugis) SetLogger(log *slog.Logger) {
|
||||
plugis.logger = log
|
||||
}
|
||||
|
||||
// Logger returns the logger for the Plugis instance.
|
||||
func (plugis *Plugis) Logger() *slog.Logger {
|
||||
if plugis.logger == nil {
|
||||
plugis.logger = slog.Default()
|
||||
}
|
||||
return plugis.logger
|
||||
}
|
||||
|
||||
// SetNats sets the nats connection for the Plugis instance.
|
||||
func (plugis *Plugis) SetNats(nc *nats.Conn) {
|
||||
plugis.nc = nc
|
||||
}
|
||||
|
||||
// Nats returns the nats connection for the Plugis instance.
|
||||
func (plugis *Plugis) Nats() *nats.Conn {
|
||||
return plugis.nc
|
||||
}
|
||||
|
||||
// Publish publishes a message to the nats connection.
|
||||
func (plugis *Plugis) Publish(topic string, payload []byte) error {
|
||||
attrs := []slog.Attr{
|
||||
slog.String("fn", "Plugis.Publish"),
|
||||
slog.String("topic", topic),
|
||||
slog.String("payload", string(payload)),
|
||||
}
|
||||
if plugis.nc == nil {
|
||||
return ErrNatsConnectionNil
|
||||
}
|
||||
err := plugis.nc.Publish(topic, payload)
|
||||
if err != nil {
|
||||
attrs = append(attrs, slog.String("err", err.Error()))
|
||||
plugis.Logger().LogAttrs(context.TODO(), slog.LevelError, "Publish payload to topic", attrs...)
|
||||
} else {
|
||||
plugis.Logger().LogAttrs(context.TODO(), slog.LevelDebug, "Published payload to topic", attrs...)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// Prefix returns the prefix for the Plugis instance.
|
||||
func (plugis *Plugis) Prefix() string {
|
||||
return "integrator.customer.location"
|
||||
}
|
||||
|
||||
// Request sends a request to the nats connection.
|
||||
func (plugis *Plugis) Request(subj string, data []byte, timeout time.Duration) (*nats.Msg, error) {
|
||||
attrs := []slog.Attr{
|
||||
slog.String("fn", "Plugis.Request"),
|
||||
slog.String("subject", subj),
|
||||
slog.String("data", string(data)),
|
||||
slog.Duration("timeout", timeout),
|
||||
}
|
||||
if plugis.nc == nil {
|
||||
attrs = append(attrs, slog.String("err", ErrNatsConnectionNil.Error()))
|
||||
plugis.Logger().LogAttrs(context.TODO(), slog.LevelError, "Request failed - nats connection is nil", attrs...)
|
||||
return nil, ErrNatsConnectionNil
|
||||
}
|
||||
msg, err := plugis.nc.Request(subj, data, timeout)
|
||||
if err != nil {
|
||||
attrs = append(attrs, slog.String("err", err.Error()))
|
||||
plugis.Logger().LogAttrs(context.TODO(), slog.LevelError, "Request failed", attrs...)
|
||||
} else {
|
||||
plugis.Logger().LogAttrs(context.TODO(), slog.LevelDebug, "Request successful", attrs...)
|
||||
}
|
||||
return msg, err
|
||||
}
|
||||
|
||||
// RequestCtx sends a request to the nats connection and returns a single message.
|
||||
// context is used for timeout and cancellation.
|
||||
func (plugis *Plugis) RequestCtx(ctx context.Context, subj string, data []byte) (*nats.Msg, error) {
|
||||
return nil, errors.New("not implemented")
|
||||
deadline, ok := ctx.Deadline()
|
||||
if !ok {
|
||||
//ctx, _ = context.WithTimeout(ctx, time.Hour*24)
|
||||
} else {
|
||||
_ = deadline
|
||||
}
|
||||
iter, err := plugis.RequestMany(ctx, subj, data, natsext.RequestManyMaxMessages(1))
|
||||
plugis.Logger().Warn("RequestMany",
|
||||
slog.String("subject", subj),
|
||||
"err", err,
|
||||
"ctxErr", ctx.Err(),
|
||||
"cancealed", task.IsCancelled(ctx),
|
||||
)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if ctx.Err() != nil {
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
for msg := range iter {
|
||||
return msg, nil
|
||||
}
|
||||
return &nats.Msg{}, ctx.Err()
|
||||
}
|
||||
|
||||
// RequestMany sends a request to the nats connection and returns a sequence of messages.
|
||||
func (plugis *Plugis) RequestMany(ctx context.Context, subject string, data []byte, opts ...natsext.RequestManyOpt) (iter.Seq2[*nats.Msg, error], error) {
|
||||
attrs := []slog.Attr{
|
||||
slog.String("fn", "Plugis.RequestMany"),
|
||||
slog.String("subject", subject),
|
||||
slog.String("data", string(data)),
|
||||
}
|
||||
if plugis.nc == nil {
|
||||
attrs = append(attrs, slog.String("err", ErrNatsConnectionNil.Error()))
|
||||
plugis.Logger().LogAttrs(context.TODO(), slog.LevelError, "RequestMany failed - nats connection is nil", attrs...)
|
||||
return nil, ErrNatsConnectionNil
|
||||
}
|
||||
result, err := natsext.RequestMany(ctx, plugis.nc, subject, data, opts...)
|
||||
if err != nil {
|
||||
attrs = append(attrs, slog.String("err", err.Error()))
|
||||
plugis.Logger().LogAttrs(context.TODO(), slog.LevelError, "RequestMany failed", attrs...)
|
||||
} else {
|
||||
plugis.Logger().LogAttrs(context.TODO(), slog.LevelDebug, "RequestMany successful", attrs...)
|
||||
}
|
||||
return result, err
|
||||
}
|
||||
|
||||
// GetServices sends a request to the $SRV.INFO subject and returns a list of services.
|
||||
func (plugis *Plugis) GetServices(ctx context.Context) ([]ServiceInfo, error) {
|
||||
iter, err := plugis.RequestMany(ctx, "$SRV.INFO", []byte(""))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var services []ServiceInfo
|
||||
for msg := range iter {
|
||||
var serviceInfo ServiceInfo
|
||||
err := json.Unmarshal(msg.Data, &serviceInfo)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
services = append(services, serviceInfo)
|
||||
}
|
||||
return services, nil
|
||||
}
|
||||
|
||||
// isRunningInDockerContainer
|
||||
func isRunningInDockerContainer() bool {
|
||||
if _, err := os.Stat("/.dockerenv"); err == nil {
|
||||
return true
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
// NewServiceMetadata creates and fills a ServiceMetadata structure
|
||||
func NewServiceMetadata(prefix string, startedAt time.Time) (Metadata, error) {
|
||||
var err error
|
||||
var meta ServiceMetadata
|
||||
meta.Prefix = prefix
|
||||
meta.Platform = runtime.GOOS + "/" + runtime.GOARCH
|
||||
if isRunningInDockerContainer() {
|
||||
meta.Platform += " (docker)"
|
||||
}
|
||||
|
||||
meta.StartedAt = startedAt.String()
|
||||
meta.Hostname, err = os.Hostname()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
meta.MAC, err = net.GetMACAddress()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Convert struct to map using mapstructure
|
||||
var result Metadata
|
||||
err = mapstructure.Decode(meta, &result)
|
||||
return result, err
|
||||
}
|
||||
65
plugisservice.go
Normal file
65
plugisservice.go
Normal file
|
|
@ -0,0 +1,65 @@
|
|||
package plugisservice
|
||||
|
||||
import (
|
||||
"context"
|
||||
"iter"
|
||||
"log/slog"
|
||||
"time"
|
||||
|
||||
"github.com/nats-io/nats.go"
|
||||
"github.com/synadia-io/orbit.go/natsext"
|
||||
)
|
||||
|
||||
// Metadata is a map of key-value pairs that can be used to store additional information about a service/endpoint
|
||||
type Metadata map[string]string
|
||||
|
||||
// PlugisServiceIntf is the interface that must be implemented by all plugis services.
|
||||
type PlugisServiceIntf interface {
|
||||
Run(ctx context.Context) error
|
||||
Name() string
|
||||
Description() string
|
||||
Version() string
|
||||
Metadata() Metadata
|
||||
PlugisIntf
|
||||
}
|
||||
|
||||
// PlugisIntf holds the methods that can be used by a plugis service.
|
||||
type PlugisIntf interface {
|
||||
SetLogger(*slog.Logger)
|
||||
Logger() *slog.Logger
|
||||
SetNats(*nats.Conn)
|
||||
Nats() *nats.Conn
|
||||
Prefix() string
|
||||
Publish(topic string, payload []byte) error
|
||||
Request(subj string, data []byte, timeout time.Duration) (*nats.Msg, error)
|
||||
RequestMany(ctx context.Context, subject string, data []byte, opts ...natsext.RequestManyOpt) (iter.Seq2[*nats.Msg, error], error)
|
||||
GetServices(ctx context.Context) ([]ServiceInfo, error)
|
||||
}
|
||||
|
||||
// ServiceInfo is the information about a service.
|
||||
type ServiceInfo struct {
|
||||
Name string `json:"name"`
|
||||
Id string `json:"id"`
|
||||
Description string `json:"description"`
|
||||
Version string `json:"version"`
|
||||
Type string `json:"type"`
|
||||
Metadata Metadata `json:"metadata"`
|
||||
Endpoints []Endpoint `json:"endpoints"`
|
||||
}
|
||||
|
||||
// ServiceMetadata is the metadata about a service.
|
||||
type ServiceMetadata struct {
|
||||
Hostname string `json:"hostname"`
|
||||
MAC string `json:"mac"`
|
||||
Platform string `json:"platform"`
|
||||
Prefix string `json:"prefix"`
|
||||
StartedAt string `json:"started_at"`
|
||||
}
|
||||
|
||||
// Endpoint is the information about an endpoint.
|
||||
type Endpoint struct {
|
||||
Name string `json:"name"`
|
||||
Subject string `json:"subject"`
|
||||
QueueGroup string `json:"queue_group"`
|
||||
Metadata Metadata `json:"metadata,omitempty"`
|
||||
}
|
||||
62
servicerunner.go
Normal file
62
servicerunner.go
Normal file
|
|
@ -0,0 +1,62 @@
|
|||
package plugisservice
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"sync"
|
||||
|
||||
"github.com/nats-io/nats.go"
|
||||
)
|
||||
|
||||
// ServiceRunner is a struct that runs one or more services.
|
||||
type ServiceRunner struct {
|
||||
wg sync.WaitGroup
|
||||
log *slog.Logger
|
||||
nc *nats.Conn
|
||||
}
|
||||
|
||||
func NewServiceRunner(nc *nats.Conn, log *slog.Logger) *ServiceRunner {
|
||||
if log == nil {
|
||||
log = slog.Default()
|
||||
}
|
||||
if nc == nil {
|
||||
log.Error("failed to connect to nats")
|
||||
return nil
|
||||
}
|
||||
return &ServiceRunner{
|
||||
nc: nc,
|
||||
log: log,
|
||||
}
|
||||
}
|
||||
|
||||
// Start starts a service.
|
||||
func (sr *ServiceRunner) Start(ctx context.Context, svc PlugisServiceIntf) {
|
||||
sr.wg.Add(1)
|
||||
go func() {
|
||||
defer sr.wg.Done()
|
||||
svc.SetLogger(sr.log)
|
||||
svc.SetNats(sr.nc)
|
||||
serviceType := fmt.Sprintf("%T", svc)
|
||||
err := svc.Run(ctx)
|
||||
if err != nil {
|
||||
sr.log.Error("service execution", "error", err, "service", serviceType)
|
||||
}
|
||||
err = svc.Nats().Flush()
|
||||
if err != nil {
|
||||
sr.log.Error("service flush", "error", err, "service", fmt.Sprintf("%T", svc))
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Wait waits for all services to finish.
|
||||
func (sr *ServiceRunner) Wait() {
|
||||
sr.wg.Wait()
|
||||
if sr.nc != nil {
|
||||
err := sr.nc.Drain()
|
||||
if err != nil {
|
||||
sr.log.Error("service drain", "error", err)
|
||||
}
|
||||
sr.nc.Close()
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Reference in a new issue