Compare commits
No commits in common. "cf7586a7dc36bcf3746cdcc4ccddaafbe31da6fd" and "848d7406a09929636314d380d9111f1c27ba04ea" have entirely different histories.
cf7586a7dc
...
848d7406a0
9 changed files with 53 additions and 173 deletions
16
README.md
16
README.md
|
|
@ -1,13 +1,13 @@
|
||||||
# plugisservice
|
# plugis3-service
|
||||||
|
|
||||||
plugisservice is a package to write plugis3 services in Go.
|
plugis3-service is a package to write plugis3 services in Go.
|
||||||
|
|
||||||
A Plugis3 service is a NATS micro service with some specific properties that must implement the `PlugisServiceIntf` interface.
|
A Plugis3 service is a NATS micro service with some specific properties that must implement the `PlugisServiceIntf` interface.
|
||||||
|
|
||||||
## NATS Micro PlugisServiceIntf Integration
|
## NATS Micro PlugisServiceIntf Integration
|
||||||
|
|
||||||
For NATS services, see:
|
For NATS services, see:
|
||||||
- [Building a Service](https://docs.nats.io/using-nats/nex/getting-started/building-service)
|
- [Building a PlugisServiceIntf](https://docs.nats.io/using-nats/nex/getting-started/building-service)
|
||||||
- [NATS micro on Github](https://github.com/nats-io/nats.go/tree/main/micro)
|
- [NATS micro on Github](https://github.com/nats-io/nats.go/tree/main/micro)
|
||||||
|
|
||||||
# Files
|
# Files
|
||||||
|
|
@ -18,12 +18,4 @@ The `plugis.go` file defines the `PlugisIntf` interface, which specifies the fun
|
||||||
|
|
||||||
The `example/echoService/echoService.go` file is a minimal service implementation sample.
|
The `example/echoService/echoService.go` file is a minimal service implementation sample.
|
||||||
|
|
||||||
The `example/echoService/cmd/main.go` file is a sample of ServiceRunner usage.
|
The `example/echoService/cmd/main.go` file is a sample of ServiceRunner usage.
|
||||||
|
|
||||||
# How to declare a service
|
|
||||||
|
|
||||||
```go
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
```
|
|
||||||
|
|
@ -24,7 +24,7 @@ func main() {
|
||||||
}
|
}
|
||||||
defer nc.Close()
|
defer nc.Close()
|
||||||
|
|
||||||
runner := plugisservice.NewServiceRunner(nc, logger, "integrator.customer.instance")
|
runner := plugisservice.NewServiceRunner(nc, logger)
|
||||||
|
|
||||||
runner.Start(ctx, echoservice.NewEchoService())
|
runner.Start(ctx, echoservice.NewEchoService())
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -2,7 +2,9 @@ package echoservice
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"github.com/nats-io/nats.go/micro"
|
||||||
"github.com/telemac/plugisservice"
|
"github.com/telemac/plugisservice"
|
||||||
|
nats_service "github.com/telemac/plugisservice/pkg/nats-service"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -19,7 +21,7 @@ func NewEchoService() *EchoService {
|
||||||
}
|
}
|
||||||
|
|
||||||
// ExecuteCommand sends a command
|
// ExecuteCommand sends a command
|
||||||
func (svc *EchoService) ExecuteCommand(ctx context.Context, command string) ([]byte, error) {
|
func (svc *EchoService) ExecuteCommand(ctx context.Context, command string) error {
|
||||||
subject := "ism.homelab.service.plugis.command"
|
subject := "ism.homelab.service.plugis.command"
|
||||||
|
|
||||||
svc.Logger().Info("sending command",
|
svc.Logger().Info("sending command",
|
||||||
|
|
@ -33,7 +35,7 @@ func (svc *EchoService) ExecuteCommand(ctx context.Context, command string) ([]b
|
||||||
"error", err,
|
"error", err,
|
||||||
"command", command,
|
"command", command,
|
||||||
)
|
)
|
||||||
return nil, err
|
return err
|
||||||
} else {
|
} else {
|
||||||
svc.Logger().Info("command executed successfully",
|
svc.Logger().Info("command executed successfully",
|
||||||
"command", command,
|
"command", command,
|
||||||
|
|
@ -41,7 +43,7 @@ func (svc *EchoService) ExecuteCommand(ctx context.Context, command string) ([]b
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
return msg.Data, nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run is the main function of the service.
|
// Run is the main function of the service.
|
||||||
|
|
@ -57,15 +59,36 @@ func (svc *EchoService) Run(ctx context.Context) error {
|
||||||
|
|
||||||
svc.ExecuteCommand(ctx, "sleep 3")
|
svc.ExecuteCommand(ctx, "sleep 3")
|
||||||
|
|
||||||
service, err := svc.StartService(svc)
|
service, err := nats_service.NewNatsService(svc.Nats(), svc.Prefix(), micro.Config{
|
||||||
|
Name: svc.Name(),
|
||||||
|
Endpoint: nil,
|
||||||
|
Version: svc.Version(),
|
||||||
|
Description: svc.Description(),
|
||||||
|
Metadata: svc.Metadata(),
|
||||||
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
defer func() {
|
defer func() {
|
||||||
service.Stop()
|
service.Stop()
|
||||||
}()
|
}()
|
||||||
pingEndpoint.UserData = svc
|
|
||||||
err = service.AddEndpoint(ctx, pingEndpoint)
|
echoEndpoint := nats_service.EndpointConfig{
|
||||||
|
Name: "ping",
|
||||||
|
Handler: func(ctx context.Context, request micro.Request) (any, error) {
|
||||||
|
data := request.Data()
|
||||||
|
_ = data
|
||||||
|
return "ping: " + string(data), err
|
||||||
|
},
|
||||||
|
MaxConcurrency: 10,
|
||||||
|
RequestTimeout: 2 * time.Second,
|
||||||
|
Metadata: map[string]string{
|
||||||
|
"description": "ping",
|
||||||
|
"version": "0.0.1",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
err = service.AddEndpoint(ctx, echoEndpoint)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
@ -94,9 +117,9 @@ func (svc *EchoService) Version() string {
|
||||||
|
|
||||||
// Metadata returns the metadata of the service.
|
// Metadata returns the metadata of the service.
|
||||||
func (svc *EchoService) Metadata() plugisservice.Metadata {
|
func (svc *EchoService) Metadata() plugisservice.Metadata {
|
||||||
serviceMetadata, err := plugisservice.NewServiceMetadata(svc.Prefix(), time.Now())
|
meta, err := plugisservice.NewServiceMetadata(svc.Prefix(), time.Now())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
svc.Logger().Error("NewServiceMetadata", "error", err)
|
svc.Logger().Error("NewServiceMetadata", "error", err)
|
||||||
}
|
}
|
||||||
return serviceMetadata.Meta()
|
return meta
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,31 +0,0 @@
|
||||||
package echoservice
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"github.com/nats-io/nats.go/micro"
|
|
||||||
nats_service "github.com/telemac/plugisservice/pkg/nats-service"
|
|
||||||
"time"
|
|
||||||
)
|
|
||||||
|
|
||||||
var pingEndpoint = nats_service.EndpointConfig{
|
|
||||||
Name: "ping",
|
|
||||||
Handler: func(ctx context.Context, request micro.Request, ec nats_service.EndpointConfig) (any, error) {
|
|
||||||
data := request.Data()
|
|
||||||
_ = data
|
|
||||||
// get plugis from EndpointConfigt
|
|
||||||
echoService, ok := ec.UserData.(*EchoService)
|
|
||||||
if ok {
|
|
||||||
echoService.Logger().Info("plugis ping received")
|
|
||||||
res, err := echoService.ExecuteCommand(ctx, "hostnamectl")
|
|
||||||
return string(res), err
|
|
||||||
}
|
|
||||||
return ec, nil
|
|
||||||
return "ping: " + string(data), nil
|
|
||||||
},
|
|
||||||
MaxConcurrency: 10,
|
|
||||||
RequestTimeout: 2 * time.Second,
|
|
||||||
Metadata: map[string]string{
|
|
||||||
"description": "ping",
|
|
||||||
"version": "0.0.1",
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
@ -1,11 +0,0 @@
|
||||||
package model
|
|
||||||
|
|
||||||
import "time"
|
|
||||||
|
|
||||||
type Variable struct {
|
|
||||||
Name string `json:"name"`
|
|
||||||
VarType string `json:"type,omitempty"`
|
|
||||||
Value any `json:"value,omitempty"`
|
|
||||||
Created *time.Time `json:"created,omitempty"`
|
|
||||||
Updated *time.Time `json:"updated,omitempty"`
|
|
||||||
}
|
|
||||||
|
|
@ -18,7 +18,6 @@ type EndpointConfig struct {
|
||||||
Metadata map[string]string `json:"metadata,omitempty"`
|
Metadata map[string]string `json:"metadata,omitempty"`
|
||||||
QueueGroup string `json:"queue_group,omitempty"`
|
QueueGroup string `json:"queue_group,omitempty"`
|
||||||
Subject string `json:"subject,omitempty"`
|
Subject string `json:"subject,omitempty"`
|
||||||
UserData any `json:"-"`
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// setDefaults applies default values to endpoint configuration
|
// setDefaults applies default values to endpoint configuration
|
||||||
|
|
@ -91,7 +90,7 @@ type PlugisServiceReply struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// PlugisServiceHandler is the function signature for endpoint handlers
|
// PlugisServiceHandler is the function signature for endpoint handlers
|
||||||
type PlugisServiceHandler func(ctx context.Context, request micro.Request, endpoint EndpointConfig) (any, error)
|
type PlugisServiceHandler func(ctx context.Context, request micro.Request) (any, error)
|
||||||
|
|
||||||
// PlugisHandler manages concurrency and timeouts for a single endpoint
|
// PlugisHandler manages concurrency and timeouts for a single endpoint
|
||||||
type PlugisHandler struct {
|
type PlugisHandler struct {
|
||||||
|
|
@ -155,7 +154,7 @@ func (ph PlugisHandler) handleWithTimeout(req micro.Request) {
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
data, err := ph.plugisServiceHandler(ctx, req, ph.config)
|
data, err := ph.plugisServiceHandler(ctx, req)
|
||||||
resultChan <- result{data: data, err: err}
|
resultChan <- result{data: data, err: err}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
|
|
||||||
98
plugis.go
98
plugis.go
|
|
@ -10,13 +10,10 @@ import (
|
||||||
"runtime"
|
"runtime"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"github.com/nats-io/nats.go/micro"
|
|
||||||
"github.com/telemac/plugisservice/model"
|
|
||||||
nats_service "github.com/telemac/plugisservice/pkg/nats-service"
|
|
||||||
|
|
||||||
"github.com/telemac/goutils/net"
|
"github.com/telemac/goutils/net"
|
||||||
"github.com/telemac/goutils/task"
|
"github.com/telemac/goutils/task"
|
||||||
|
|
||||||
|
"github.com/go-viper/mapstructure/v2"
|
||||||
"github.com/nats-io/nats.go"
|
"github.com/nats-io/nats.go"
|
||||||
"github.com/synadia-io/orbit.go/natsext"
|
"github.com/synadia-io/orbit.go/natsext"
|
||||||
)
|
)
|
||||||
|
|
@ -29,7 +26,6 @@ var _ PlugisIntf = (*Plugis)(nil)
|
||||||
type Plugis struct {
|
type Plugis struct {
|
||||||
logger *slog.Logger
|
logger *slog.Logger
|
||||||
nc *nats.Conn
|
nc *nats.Conn
|
||||||
prefix string
|
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
|
|
@ -37,11 +33,6 @@ var (
|
||||||
ErrNatsConnectionNil = errors.New("nats connection is nil")
|
ErrNatsConnectionNil = errors.New("nats connection is nil")
|
||||||
)
|
)
|
||||||
|
|
||||||
type Event[T any] struct {
|
|
||||||
Type string `json:"type,omitempty"`
|
|
||||||
Data T `json:"data,omitempty"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// SetLogger sets the logger for the Plugis instance.
|
// SetLogger sets the logger for the Plugis instance.
|
||||||
func (plugis *Plugis) SetLogger(log *slog.Logger) {
|
func (plugis *Plugis) SetLogger(log *slog.Logger) {
|
||||||
plugis.logger = log
|
plugis.logger = log
|
||||||
|
|
@ -87,12 +78,7 @@ func (plugis *Plugis) Publish(topic string, payload []byte) error {
|
||||||
|
|
||||||
// Prefix returns the prefix for the Plugis instance.
|
// Prefix returns the prefix for the Plugis instance.
|
||||||
func (plugis *Plugis) Prefix() string {
|
func (plugis *Plugis) Prefix() string {
|
||||||
return plugis.prefix
|
return "integrator.customer.location"
|
||||||
}
|
|
||||||
|
|
||||||
// SetPrefix sets the prefix for the Plugis instance.
|
|
||||||
func (plugis *Plugis) SetPrefix(prefix string) {
|
|
||||||
plugis.prefix = prefix
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Request sends a request to the nats connection.
|
// Request sends a request to the nats connection.
|
||||||
|
|
@ -196,7 +182,7 @@ func isRunningInDockerContainer() bool {
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewServiceMetadata creates and fills a ServiceMetadata structure
|
// NewServiceMetadata creates and fills a ServiceMetadata structure
|
||||||
func NewServiceMetadata(prefix string, startedAt time.Time) (*ServiceMetadata, error) {
|
func NewServiceMetadata(prefix string, startedAt time.Time) (Metadata, error) {
|
||||||
var err error
|
var err error
|
||||||
var meta ServiceMetadata
|
var meta ServiceMetadata
|
||||||
meta.Prefix = prefix
|
meta.Prefix = prefix
|
||||||
|
|
@ -205,7 +191,7 @@ func NewServiceMetadata(prefix string, startedAt time.Time) (*ServiceMetadata, e
|
||||||
meta.Platform += " (docker)"
|
meta.Platform += " (docker)"
|
||||||
}
|
}
|
||||||
|
|
||||||
meta.StartedAt = startedAt.Format(time.RFC3339)
|
meta.StartedAt = startedAt.String()
|
||||||
meta.Hostname, err = os.Hostname()
|
meta.Hostname, err = os.Hostname()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|
@ -215,76 +201,8 @@ func NewServiceMetadata(prefix string, startedAt time.Time) (*ServiceMetadata, e
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
return &meta, nil
|
// Convert struct to map using mapstructure
|
||||||
}
|
var result Metadata
|
||||||
|
err = mapstructure.Decode(meta, &result)
|
||||||
//func (smd *ServiceMetadata) Meta() Metadata {
|
return result, err
|
||||||
// var meta Metadata
|
|
||||||
// err := mapstructure.Decode(smd, &meta)
|
|
||||||
// if err != nil {
|
|
||||||
// return Metadata{}
|
|
||||||
// }
|
|
||||||
// return meta
|
|
||||||
//}
|
|
||||||
|
|
||||||
// Meta returns ServiceMetaData as map[string]string
|
|
||||||
func (smd *ServiceMetadata) Meta() Metadata {
|
|
||||||
data, err := json.Marshal(smd)
|
|
||||||
if err != nil {
|
|
||||||
return Metadata{}
|
|
||||||
}
|
|
||||||
var meta Metadata
|
|
||||||
if err := json.Unmarshal(data, &meta); err != nil {
|
|
||||||
return Metadata{}
|
|
||||||
}
|
|
||||||
return meta
|
|
||||||
}
|
|
||||||
|
|
||||||
// StartService initializes and starts a NATS service for the given PlugisServiceIntf implementation.
|
|
||||||
// It returns the created NatsService and any error encountered during the creation process.
|
|
||||||
func (plugis *Plugis) StartService(svc PlugisServiceIntf) (*nats_service.NatsService, error) {
|
|
||||||
service, err := nats_service.NewNatsService(plugis.Nats(), plugis.Prefix(), micro.Config{
|
|
||||||
Name: svc.Name(),
|
|
||||||
Endpoint: nil,
|
|
||||||
Version: svc.Version(),
|
|
||||||
Description: svc.Description(),
|
|
||||||
Metadata: svc.Metadata(),
|
|
||||||
})
|
|
||||||
return service, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// VariableSet sets a variable with the given name, value, and type, then publishes it to a corresponding topic.
|
|
||||||
func (plugis *Plugis) VariableSet(name string, value any, varType string) error {
|
|
||||||
variable := model.Variable{
|
|
||||||
Name: name,
|
|
||||||
Value: value,
|
|
||||||
VarType: varType,
|
|
||||||
}
|
|
||||||
event := Event[model.Variable]{
|
|
||||||
Type: "variable.set",
|
|
||||||
Data: variable,
|
|
||||||
}
|
|
||||||
topic := "variable.set." + name
|
|
||||||
payload, err := json.Marshal(event)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return plugis.Publish(topic, payload)
|
|
||||||
}
|
|
||||||
|
|
||||||
// VariableUnset unsets a variable with the given name, then publishes it to a corresponding topic.
|
|
||||||
func (plugis *Plugis) VariableUnset(name string) error {
|
|
||||||
variable := model.Variable{
|
|
||||||
Name: name,
|
|
||||||
}
|
|
||||||
event := Event[model.Variable]{
|
|
||||||
Type: "variable.unset",
|
|
||||||
Data: variable,
|
|
||||||
}
|
|
||||||
topic := "variable.unset." + name
|
|
||||||
payload, err := json.Marshal(event)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return plugis.Publish(topic, payload)
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -2,7 +2,6 @@ package plugisservice
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
nats_service "github.com/telemac/plugisservice/pkg/nats-service"
|
|
||||||
"iter"
|
"iter"
|
||||||
"log/slog"
|
"log/slog"
|
||||||
"time"
|
"time"
|
||||||
|
|
@ -30,15 +29,11 @@ type PlugisIntf interface {
|
||||||
Logger() *slog.Logger
|
Logger() *slog.Logger
|
||||||
SetNats(*nats.Conn)
|
SetNats(*nats.Conn)
|
||||||
Nats() *nats.Conn
|
Nats() *nats.Conn
|
||||||
SetPrefix(prefix string)
|
|
||||||
Prefix() string
|
Prefix() string
|
||||||
Publish(topic string, payload []byte) error
|
Publish(topic string, payload []byte) error
|
||||||
Request(subj string, data []byte, timeout time.Duration) (*nats.Msg, error)
|
Request(subj string, data []byte, timeout time.Duration) (*nats.Msg, error)
|
||||||
RequestMany(ctx context.Context, subject string, data []byte, opts ...natsext.RequestManyOpt) (iter.Seq2[*nats.Msg, error], error)
|
RequestMany(ctx context.Context, subject string, data []byte, opts ...natsext.RequestManyOpt) (iter.Seq2[*nats.Msg, error], error)
|
||||||
GetServices(ctx context.Context) ([]ServiceInfo, error)
|
GetServices(ctx context.Context) ([]ServiceInfo, error)
|
||||||
StartService(svc PlugisServiceIntf) (*nats_service.NatsService, error)
|
|
||||||
VariableSet(name string, value any, varType string) error
|
|
||||||
VariableUnset(name string) error
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// ServiceInfo is the information about a service.
|
// ServiceInfo is the information about a service.
|
||||||
|
|
@ -48,7 +43,7 @@ type ServiceInfo struct {
|
||||||
Description string `json:"description"`
|
Description string `json:"description"`
|
||||||
Version string `json:"version"`
|
Version string `json:"version"`
|
||||||
Type string `json:"type"`
|
Type string `json:"type"`
|
||||||
Metadata Metadata `json:"metadata"` // contains at least ServiceMetadata fields
|
Metadata Metadata `json:"metadata"`
|
||||||
Endpoints []Endpoint `json:"endpoints"`
|
Endpoints []Endpoint `json:"endpoints"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -11,13 +11,12 @@ import (
|
||||||
|
|
||||||
// ServiceRunner is a struct that runs one or more services.
|
// ServiceRunner is a struct that runs one or more services.
|
||||||
type ServiceRunner struct {
|
type ServiceRunner struct {
|
||||||
wg sync.WaitGroup
|
wg sync.WaitGroup
|
||||||
log *slog.Logger
|
log *slog.Logger
|
||||||
nc *nats.Conn
|
nc *nats.Conn
|
||||||
prefix string
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewServiceRunner(nc *nats.Conn, log *slog.Logger, prefix string) *ServiceRunner {
|
func NewServiceRunner(nc *nats.Conn, log *slog.Logger) *ServiceRunner {
|
||||||
if log == nil {
|
if log == nil {
|
||||||
log = slog.Default()
|
log = slog.Default()
|
||||||
}
|
}
|
||||||
|
|
@ -25,11 +24,9 @@ func NewServiceRunner(nc *nats.Conn, log *slog.Logger, prefix string) *ServiceRu
|
||||||
log.Error("failed to connect to nats")
|
log.Error("failed to connect to nats")
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
// TODO : handle empty prefix
|
|
||||||
return &ServiceRunner{
|
return &ServiceRunner{
|
||||||
nc: nc,
|
nc: nc,
|
||||||
log: log,
|
log: log,
|
||||||
prefix: prefix,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -40,8 +37,6 @@ func (sr *ServiceRunner) Start(ctx context.Context, svc PlugisServiceIntf) {
|
||||||
defer sr.wg.Done()
|
defer sr.wg.Done()
|
||||||
svc.SetLogger(sr.log)
|
svc.SetLogger(sr.log)
|
||||||
svc.SetNats(sr.nc)
|
svc.SetNats(sr.nc)
|
||||||
// TODO : handle empty prefix
|
|
||||||
svc.SetPrefix(sr.prefix)
|
|
||||||
serviceType := fmt.Sprintf("%T", svc)
|
serviceType := fmt.Sprintf("%T", svc)
|
||||||
err := svc.Run(ctx)
|
err := svc.Run(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue