244 lines
6.6 KiB
Go
244 lines
6.6 KiB
Go
package nats_service
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/nats-io/nats.go"
|
|
"github.com/nats-io/nats.go/micro"
|
|
)
|
|
|
|
// EndpointConfig defines configuration for a single NATS endpoint
|
|
type EndpointConfig struct {
|
|
Name string `json:"name"`
|
|
Handler PlugisServiceHandler `json:"-"` // Non-serializable
|
|
MaxConcurrency int `json:"max_concurrency"`
|
|
RequestTimeout time.Duration `json:"request_timeout"`
|
|
Metadata map[string]string `json:"metadata,omitempty"`
|
|
QueueGroup string `json:"queue_group,omitempty"`
|
|
Subject string `json:"subject,omitempty"`
|
|
UserData any `json:"-"`
|
|
}
|
|
|
|
// setDefaults applies default values to endpoint configuration
|
|
func (config *EndpointConfig) setDefaults(serviceName, prefix string) {
|
|
if config.MaxConcurrency <= 0 {
|
|
config.MaxConcurrency = 10
|
|
}
|
|
if config.RequestTimeout <= 0 {
|
|
config.RequestTimeout = 30 * time.Second
|
|
}
|
|
if config.Subject == "" {
|
|
if prefix == "" {
|
|
config.Subject = serviceName + "." + config.Name
|
|
} else {
|
|
config.Subject = prefix + "." + serviceName + "." + config.Name
|
|
}
|
|
}
|
|
}
|
|
|
|
// NatsService wraps a NATS micro.Service with endpoint management
|
|
type NatsService struct {
|
|
nc *nats.Conn
|
|
prefix string
|
|
svc micro.Service
|
|
}
|
|
|
|
// NewNatsService creates a new NATS service with the given prefix and configuration
|
|
func NewNatsService(conn *nats.Conn, prefix string, config micro.Config) (*NatsService, error) {
|
|
natsService := &NatsService{
|
|
nc: conn,
|
|
prefix: prefix,
|
|
}
|
|
var err error
|
|
natsService.svc, err = micro.AddService(natsService.nc, config)
|
|
return natsService, err
|
|
}
|
|
|
|
// Svc returns the underlying NATS micro.Service
|
|
func (ns *NatsService) Svc() micro.Service {
|
|
return ns.svc
|
|
}
|
|
|
|
// Stop stops the NATS service
|
|
func (ns *NatsService) Stop() error {
|
|
return ns.svc.Stop()
|
|
}
|
|
|
|
// Shutdown gracefully shuts down the service with context cancellation
|
|
func (ns *NatsService) Shutdown(ctx context.Context) error {
|
|
done := make(chan struct{})
|
|
go func() {
|
|
defer close(done)
|
|
ns.svc.Stop()
|
|
}()
|
|
|
|
select {
|
|
case <-done:
|
|
return nil
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
}
|
|
}
|
|
|
|
// PlugisServiceReply defines the standard response format for all endpoints
|
|
type PlugisServiceReply struct {
|
|
Error string `json:"error,omitempty"`
|
|
Result any `json:"result,omitempty"`
|
|
Timestamp time.Time `json:"timestamp"`
|
|
Duration string `json:"duration,omitempty"`
|
|
}
|
|
|
|
// PlugisServiceHandler is the function signature for endpoint handlers
|
|
type PlugisServiceHandler func(ctx context.Context, request micro.Request, endpoint EndpointConfig) (any, error)
|
|
|
|
// PlugisHandler manages concurrency and timeouts for a single endpoint
|
|
type PlugisHandler struct {
|
|
plugisServiceHandler PlugisServiceHandler
|
|
ctx context.Context
|
|
semaphore chan struct{}
|
|
config EndpointConfig
|
|
}
|
|
|
|
// NewPlugisHandler creates a new handler with concurrency control
|
|
func NewPlugisHandler(ctx context.Context, config EndpointConfig) *PlugisHandler {
|
|
return &PlugisHandler{
|
|
ctx: ctx,
|
|
plugisServiceHandler: config.Handler,
|
|
semaphore: make(chan struct{}, config.MaxConcurrency),
|
|
config: config,
|
|
}
|
|
}
|
|
|
|
// Handle processes incoming requests with concurrency limiting
|
|
func (ph PlugisHandler) Handle(req micro.Request) {
|
|
// Try to acquire semaphore with non-blocking select
|
|
select {
|
|
case ph.semaphore <- struct{}{}:
|
|
// Successfully acquired semaphore, process request
|
|
go ph.handleWithTimeout(req)
|
|
default:
|
|
// Endpoint overloaded, reject request immediately
|
|
ph.sendErrorResponse(req, "503", "Endpoint overloaded - too many concurrent requests", time.Now())
|
|
}
|
|
}
|
|
|
|
// handleWithTimeout executes the handler with timeout and panic recovery
|
|
func (ph PlugisHandler) handleWithTimeout(req micro.Request) {
|
|
start := time.Now()
|
|
|
|
// Always release semaphore when done
|
|
defer func() {
|
|
<-ph.semaphore
|
|
}()
|
|
|
|
// Create timeout context
|
|
ctx, cancel := context.WithTimeout(ph.ctx, ph.config.RequestTimeout)
|
|
defer cancel()
|
|
|
|
// Channel to receive result
|
|
type result struct {
|
|
data any
|
|
err error
|
|
}
|
|
resultChan := make(chan result, 1)
|
|
|
|
// Execute handler in separate goroutine
|
|
go func() {
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
resultChan <- result{
|
|
data: nil,
|
|
err: fmt.Errorf("handler panicked: %v", r),
|
|
}
|
|
}
|
|
}()
|
|
|
|
data, err := ph.plugisServiceHandler(ctx, req, ph.config)
|
|
resultChan <- result{data: data, err: err}
|
|
}()
|
|
|
|
// Wait for either completion or timeout
|
|
select {
|
|
case res := <-resultChan:
|
|
ph.sendResponse(req, res.data, res.err, start)
|
|
case <-ctx.Done():
|
|
if ctx.Err() == context.DeadlineExceeded {
|
|
ph.sendErrorResponse(req, "408", "Request timeout", start)
|
|
} else {
|
|
ph.sendErrorResponse(req, "499", "Request cancelled", start)
|
|
}
|
|
}
|
|
}
|
|
|
|
// sendResponse sends a successful response with timing information
|
|
func (ph PlugisHandler) sendResponse(req micro.Request, result any, err error, start time.Time) {
|
|
duration := time.Since(start)
|
|
|
|
reply := PlugisServiceReply{
|
|
Result: result,
|
|
Timestamp: time.Now(),
|
|
Duration: duration.String(),
|
|
}
|
|
|
|
if err != nil {
|
|
reply.Error = err.Error()
|
|
}
|
|
|
|
if err := req.RespondJSON(reply); err != nil {
|
|
// Failed to send response, but we can't do much about it
|
|
// In a production system, this would be logged
|
|
}
|
|
}
|
|
|
|
// sendErrorResponse sends an error response with status code and timing
|
|
func (ph PlugisHandler) sendErrorResponse(req micro.Request, code, message string, start time.Time) {
|
|
duration := time.Since(start)
|
|
|
|
reply := PlugisServiceReply{
|
|
Error: fmt.Sprintf("%s: %s", code, message),
|
|
Timestamp: time.Now(),
|
|
Duration: duration.String(),
|
|
}
|
|
|
|
if err := req.RespondJSON(reply); err != nil {
|
|
// Failed to send error response
|
|
}
|
|
}
|
|
|
|
// AddEndpoint adds a new endpoint to the service with the given configuration
|
|
func (ns *NatsService) AddEndpoint(ctx context.Context, config EndpointConfig) error {
|
|
if config.Name == "" {
|
|
return fmt.Errorf("endpoint name cannot be empty")
|
|
}
|
|
if config.Handler == nil {
|
|
return fmt.Errorf("handler cannot be nil")
|
|
}
|
|
|
|
// Set defaults
|
|
config.setDefaults(ns.svc.Info().Name, ns.prefix)
|
|
|
|
// Build micro options
|
|
options := []micro.EndpointOpt{
|
|
micro.WithEndpointSubject(config.Subject),
|
|
}
|
|
|
|
if config.QueueGroup != "" {
|
|
options = append(options, micro.WithEndpointQueueGroup(config.QueueGroup))
|
|
} else {
|
|
options = append(options, micro.WithEndpointQueueGroupDisabled())
|
|
}
|
|
|
|
if len(config.Metadata) > 0 {
|
|
options = append(options, micro.WithEndpointMetadata(config.Metadata))
|
|
}
|
|
|
|
handler := NewPlugisHandler(ctx, config)
|
|
|
|
return ns.svc.AddEndpoint(config.Name, handler, options...)
|
|
}
|
|
|
|
func (ns *NatsService) Prefix() string {
|
|
return ns.prefix
|
|
}
|