Compare commits

..

No commits in common. "cf7586a7dc36bcf3746cdcc4ccddaafbe31da6fd" and "848d7406a09929636314d380d9111f1c27ba04ea" have entirely different histories.

9 changed files with 53 additions and 173 deletions

View file

@ -1,13 +1,13 @@
# plugisservice # plugis3-service
plugisservice is a package to write plugis3 services in Go. plugis3-service is a package to write plugis3 services in Go.
A Plugis3 service is a NATS micro service with some specific properties that must implement the `PlugisServiceIntf` interface. A Plugis3 service is a NATS micro service with some specific properties that must implement the `PlugisServiceIntf` interface.
## NATS Micro PlugisServiceIntf Integration ## NATS Micro PlugisServiceIntf Integration
For NATS services, see: For NATS services, see:
- [Building a Service](https://docs.nats.io/using-nats/nex/getting-started/building-service) - [Building a PlugisServiceIntf](https://docs.nats.io/using-nats/nex/getting-started/building-service)
- [NATS micro on Github](https://github.com/nats-io/nats.go/tree/main/micro) - [NATS micro on Github](https://github.com/nats-io/nats.go/tree/main/micro)
# Files # Files
@ -18,12 +18,4 @@ The `plugis.go` file defines the `PlugisIntf` interface, which specifies the fun
The `example/echoService/echoService.go` file is a minimal service implementation sample. The `example/echoService/echoService.go` file is a minimal service implementation sample.
The `example/echoService/cmd/main.go` file is a sample of ServiceRunner usage. The `example/echoService/cmd/main.go` file is a sample of ServiceRunner usage.
# How to declare a service
```go
```

View file

@ -24,7 +24,7 @@ func main() {
} }
defer nc.Close() defer nc.Close()
runner := plugisservice.NewServiceRunner(nc, logger, "integrator.customer.instance") runner := plugisservice.NewServiceRunner(nc, logger)
runner.Start(ctx, echoservice.NewEchoService()) runner.Start(ctx, echoservice.NewEchoService())

View file

@ -2,7 +2,9 @@ package echoservice
import ( import (
"context" "context"
"github.com/nats-io/nats.go/micro"
"github.com/telemac/plugisservice" "github.com/telemac/plugisservice"
nats_service "github.com/telemac/plugisservice/pkg/nats-service"
"time" "time"
) )
@ -19,7 +21,7 @@ func NewEchoService() *EchoService {
} }
// ExecuteCommand sends a command // ExecuteCommand sends a command
func (svc *EchoService) ExecuteCommand(ctx context.Context, command string) ([]byte, error) { func (svc *EchoService) ExecuteCommand(ctx context.Context, command string) error {
subject := "ism.homelab.service.plugis.command" subject := "ism.homelab.service.plugis.command"
svc.Logger().Info("sending command", svc.Logger().Info("sending command",
@ -33,7 +35,7 @@ func (svc *EchoService) ExecuteCommand(ctx context.Context, command string) ([]b
"error", err, "error", err,
"command", command, "command", command,
) )
return nil, err return err
} else { } else {
svc.Logger().Info("command executed successfully", svc.Logger().Info("command executed successfully",
"command", command, "command", command,
@ -41,7 +43,7 @@ func (svc *EchoService) ExecuteCommand(ctx context.Context, command string) ([]b
) )
} }
return msg.Data, nil return nil
} }
// Run is the main function of the service. // Run is the main function of the service.
@ -57,15 +59,36 @@ func (svc *EchoService) Run(ctx context.Context) error {
svc.ExecuteCommand(ctx, "sleep 3") svc.ExecuteCommand(ctx, "sleep 3")
service, err := svc.StartService(svc) service, err := nats_service.NewNatsService(svc.Nats(), svc.Prefix(), micro.Config{
Name: svc.Name(),
Endpoint: nil,
Version: svc.Version(),
Description: svc.Description(),
Metadata: svc.Metadata(),
})
if err != nil { if err != nil {
return err return err
} }
defer func() { defer func() {
service.Stop() service.Stop()
}() }()
pingEndpoint.UserData = svc
err = service.AddEndpoint(ctx, pingEndpoint) echoEndpoint := nats_service.EndpointConfig{
Name: "ping",
Handler: func(ctx context.Context, request micro.Request) (any, error) {
data := request.Data()
_ = data
return "ping: " + string(data), err
},
MaxConcurrency: 10,
RequestTimeout: 2 * time.Second,
Metadata: map[string]string{
"description": "ping",
"version": "0.0.1",
},
}
err = service.AddEndpoint(ctx, echoEndpoint)
if err != nil { if err != nil {
return err return err
} }
@ -94,9 +117,9 @@ func (svc *EchoService) Version() string {
// Metadata returns the metadata of the service. // Metadata returns the metadata of the service.
func (svc *EchoService) Metadata() plugisservice.Metadata { func (svc *EchoService) Metadata() plugisservice.Metadata {
serviceMetadata, err := plugisservice.NewServiceMetadata(svc.Prefix(), time.Now()) meta, err := plugisservice.NewServiceMetadata(svc.Prefix(), time.Now())
if err != nil { if err != nil {
svc.Logger().Error("NewServiceMetadata", "error", err) svc.Logger().Error("NewServiceMetadata", "error", err)
} }
return serviceMetadata.Meta() return meta
} }

View file

@ -1,31 +0,0 @@
package echoservice
import (
"context"
"github.com/nats-io/nats.go/micro"
nats_service "github.com/telemac/plugisservice/pkg/nats-service"
"time"
)
var pingEndpoint = nats_service.EndpointConfig{
Name: "ping",
Handler: func(ctx context.Context, request micro.Request, ec nats_service.EndpointConfig) (any, error) {
data := request.Data()
_ = data
// get plugis from EndpointConfigt
echoService, ok := ec.UserData.(*EchoService)
if ok {
echoService.Logger().Info("plugis ping received")
res, err := echoService.ExecuteCommand(ctx, "hostnamectl")
return string(res), err
}
return ec, nil
return "ping: " + string(data), nil
},
MaxConcurrency: 10,
RequestTimeout: 2 * time.Second,
Metadata: map[string]string{
"description": "ping",
"version": "0.0.1",
},
}

View file

@ -1,11 +0,0 @@
package model
import "time"
type Variable struct {
Name string `json:"name"`
VarType string `json:"type,omitempty"`
Value any `json:"value,omitempty"`
Created *time.Time `json:"created,omitempty"`
Updated *time.Time `json:"updated,omitempty"`
}

View file

@ -18,7 +18,6 @@ type EndpointConfig struct {
Metadata map[string]string `json:"metadata,omitempty"` Metadata map[string]string `json:"metadata,omitempty"`
QueueGroup string `json:"queue_group,omitempty"` QueueGroup string `json:"queue_group,omitempty"`
Subject string `json:"subject,omitempty"` Subject string `json:"subject,omitempty"`
UserData any `json:"-"`
} }
// setDefaults applies default values to endpoint configuration // setDefaults applies default values to endpoint configuration
@ -91,7 +90,7 @@ type PlugisServiceReply struct {
} }
// PlugisServiceHandler is the function signature for endpoint handlers // PlugisServiceHandler is the function signature for endpoint handlers
type PlugisServiceHandler func(ctx context.Context, request micro.Request, endpoint EndpointConfig) (any, error) type PlugisServiceHandler func(ctx context.Context, request micro.Request) (any, error)
// PlugisHandler manages concurrency and timeouts for a single endpoint // PlugisHandler manages concurrency and timeouts for a single endpoint
type PlugisHandler struct { type PlugisHandler struct {
@ -155,7 +154,7 @@ func (ph PlugisHandler) handleWithTimeout(req micro.Request) {
} }
}() }()
data, err := ph.plugisServiceHandler(ctx, req, ph.config) data, err := ph.plugisServiceHandler(ctx, req)
resultChan <- result{data: data, err: err} resultChan <- result{data: data, err: err}
}() }()

View file

@ -10,13 +10,10 @@ import (
"runtime" "runtime"
"time" "time"
"github.com/nats-io/nats.go/micro"
"github.com/telemac/plugisservice/model"
nats_service "github.com/telemac/plugisservice/pkg/nats-service"
"github.com/telemac/goutils/net" "github.com/telemac/goutils/net"
"github.com/telemac/goutils/task" "github.com/telemac/goutils/task"
"github.com/go-viper/mapstructure/v2"
"github.com/nats-io/nats.go" "github.com/nats-io/nats.go"
"github.com/synadia-io/orbit.go/natsext" "github.com/synadia-io/orbit.go/natsext"
) )
@ -29,7 +26,6 @@ var _ PlugisIntf = (*Plugis)(nil)
type Plugis struct { type Plugis struct {
logger *slog.Logger logger *slog.Logger
nc *nats.Conn nc *nats.Conn
prefix string
} }
var ( var (
@ -37,11 +33,6 @@ var (
ErrNatsConnectionNil = errors.New("nats connection is nil") ErrNatsConnectionNil = errors.New("nats connection is nil")
) )
type Event[T any] struct {
Type string `json:"type,omitempty"`
Data T `json:"data,omitempty"`
}
// SetLogger sets the logger for the Plugis instance. // SetLogger sets the logger for the Plugis instance.
func (plugis *Plugis) SetLogger(log *slog.Logger) { func (plugis *Plugis) SetLogger(log *slog.Logger) {
plugis.logger = log plugis.logger = log
@ -87,12 +78,7 @@ func (plugis *Plugis) Publish(topic string, payload []byte) error {
// Prefix returns the prefix for the Plugis instance. // Prefix returns the prefix for the Plugis instance.
func (plugis *Plugis) Prefix() string { func (plugis *Plugis) Prefix() string {
return plugis.prefix return "integrator.customer.location"
}
// SetPrefix sets the prefix for the Plugis instance.
func (plugis *Plugis) SetPrefix(prefix string) {
plugis.prefix = prefix
} }
// Request sends a request to the nats connection. // Request sends a request to the nats connection.
@ -196,7 +182,7 @@ func isRunningInDockerContainer() bool {
} }
// NewServiceMetadata creates and fills a ServiceMetadata structure // NewServiceMetadata creates and fills a ServiceMetadata structure
func NewServiceMetadata(prefix string, startedAt time.Time) (*ServiceMetadata, error) { func NewServiceMetadata(prefix string, startedAt time.Time) (Metadata, error) {
var err error var err error
var meta ServiceMetadata var meta ServiceMetadata
meta.Prefix = prefix meta.Prefix = prefix
@ -205,7 +191,7 @@ func NewServiceMetadata(prefix string, startedAt time.Time) (*ServiceMetadata, e
meta.Platform += " (docker)" meta.Platform += " (docker)"
} }
meta.StartedAt = startedAt.Format(time.RFC3339) meta.StartedAt = startedAt.String()
meta.Hostname, err = os.Hostname() meta.Hostname, err = os.Hostname()
if err != nil { if err != nil {
return nil, err return nil, err
@ -215,76 +201,8 @@ func NewServiceMetadata(prefix string, startedAt time.Time) (*ServiceMetadata, e
return nil, err return nil, err
} }
return &meta, nil // Convert struct to map using mapstructure
} var result Metadata
err = mapstructure.Decode(meta, &result)
//func (smd *ServiceMetadata) Meta() Metadata { return result, err
// var meta Metadata
// err := mapstructure.Decode(smd, &meta)
// if err != nil {
// return Metadata{}
// }
// return meta
//}
// Meta returns ServiceMetaData as map[string]string
func (smd *ServiceMetadata) Meta() Metadata {
data, err := json.Marshal(smd)
if err != nil {
return Metadata{}
}
var meta Metadata
if err := json.Unmarshal(data, &meta); err != nil {
return Metadata{}
}
return meta
}
// StartService initializes and starts a NATS service for the given PlugisServiceIntf implementation.
// It returns the created NatsService and any error encountered during the creation process.
func (plugis *Plugis) StartService(svc PlugisServiceIntf) (*nats_service.NatsService, error) {
service, err := nats_service.NewNatsService(plugis.Nats(), plugis.Prefix(), micro.Config{
Name: svc.Name(),
Endpoint: nil,
Version: svc.Version(),
Description: svc.Description(),
Metadata: svc.Metadata(),
})
return service, err
}
// VariableSet sets a variable with the given name, value, and type, then publishes it to a corresponding topic.
func (plugis *Plugis) VariableSet(name string, value any, varType string) error {
variable := model.Variable{
Name: name,
Value: value,
VarType: varType,
}
event := Event[model.Variable]{
Type: "variable.set",
Data: variable,
}
topic := "variable.set." + name
payload, err := json.Marshal(event)
if err != nil {
return err
}
return plugis.Publish(topic, payload)
}
// VariableUnset unsets a variable with the given name, then publishes it to a corresponding topic.
func (plugis *Plugis) VariableUnset(name string) error {
variable := model.Variable{
Name: name,
}
event := Event[model.Variable]{
Type: "variable.unset",
Data: variable,
}
topic := "variable.unset." + name
payload, err := json.Marshal(event)
if err != nil {
return err
}
return plugis.Publish(topic, payload)
} }

View file

@ -2,7 +2,6 @@ package plugisservice
import ( import (
"context" "context"
nats_service "github.com/telemac/plugisservice/pkg/nats-service"
"iter" "iter"
"log/slog" "log/slog"
"time" "time"
@ -30,15 +29,11 @@ type PlugisIntf interface {
Logger() *slog.Logger Logger() *slog.Logger
SetNats(*nats.Conn) SetNats(*nats.Conn)
Nats() *nats.Conn Nats() *nats.Conn
SetPrefix(prefix string)
Prefix() string Prefix() string
Publish(topic string, payload []byte) error Publish(topic string, payload []byte) error
Request(subj string, data []byte, timeout time.Duration) (*nats.Msg, error) Request(subj string, data []byte, timeout time.Duration) (*nats.Msg, error)
RequestMany(ctx context.Context, subject string, data []byte, opts ...natsext.RequestManyOpt) (iter.Seq2[*nats.Msg, error], error) RequestMany(ctx context.Context, subject string, data []byte, opts ...natsext.RequestManyOpt) (iter.Seq2[*nats.Msg, error], error)
GetServices(ctx context.Context) ([]ServiceInfo, error) GetServices(ctx context.Context) ([]ServiceInfo, error)
StartService(svc PlugisServiceIntf) (*nats_service.NatsService, error)
VariableSet(name string, value any, varType string) error
VariableUnset(name string) error
} }
// ServiceInfo is the information about a service. // ServiceInfo is the information about a service.
@ -48,7 +43,7 @@ type ServiceInfo struct {
Description string `json:"description"` Description string `json:"description"`
Version string `json:"version"` Version string `json:"version"`
Type string `json:"type"` Type string `json:"type"`
Metadata Metadata `json:"metadata"` // contains at least ServiceMetadata fields Metadata Metadata `json:"metadata"`
Endpoints []Endpoint `json:"endpoints"` Endpoints []Endpoint `json:"endpoints"`
} }

View file

@ -11,13 +11,12 @@ import (
// ServiceRunner is a struct that runs one or more services. // ServiceRunner is a struct that runs one or more services.
type ServiceRunner struct { type ServiceRunner struct {
wg sync.WaitGroup wg sync.WaitGroup
log *slog.Logger log *slog.Logger
nc *nats.Conn nc *nats.Conn
prefix string
} }
func NewServiceRunner(nc *nats.Conn, log *slog.Logger, prefix string) *ServiceRunner { func NewServiceRunner(nc *nats.Conn, log *slog.Logger) *ServiceRunner {
if log == nil { if log == nil {
log = slog.Default() log = slog.Default()
} }
@ -25,11 +24,9 @@ func NewServiceRunner(nc *nats.Conn, log *slog.Logger, prefix string) *ServiceRu
log.Error("failed to connect to nats") log.Error("failed to connect to nats")
return nil return nil
} }
// TODO : handle empty prefix
return &ServiceRunner{ return &ServiceRunner{
nc: nc, nc: nc,
log: log, log: log,
prefix: prefix,
} }
} }
@ -40,8 +37,6 @@ func (sr *ServiceRunner) Start(ctx context.Context, svc PlugisServiceIntf) {
defer sr.wg.Done() defer sr.wg.Done()
svc.SetLogger(sr.log) svc.SetLogger(sr.log)
svc.SetNats(sr.nc) svc.SetNats(sr.nc)
// TODO : handle empty prefix
svc.SetPrefix(sr.prefix)
serviceType := fmt.Sprintf("%T", svc) serviceType := fmt.Sprintf("%T", svc)
err := svc.Run(ctx) err := svc.Run(ctx)
if err != nil { if err != nil {