This document details the implementation of the HTTP inbound endpoint in Synapse Go, including the architecture, configuration, and request flow.
The HTTP inbound endpoint in Synapse Go implements the InboundEndpoint interface and handles incoming HTTP requests.
flowchart TD A[HTTP Request] -->|Received by| B[Router Service] B -->|Dispatched to| C[HTTP Inbound Handler] C -->|Creates| D[Message Context] D -->|Passed to| E[Mediation Engine] E -->|Processed by| F[Sequence] F -->|Returns| G[Response] G -->|Sent back to| A classDef request fill:#f9e8a0,stroke:#e6b800,stroke-width:1px classDef service fill:#a0d8f9,stroke:#0073e6,stroke-width:1px classDef handler fill:#a0f9a5,stroke:#00b33c,stroke-width:1px classDef context fill:#e2a0f9,stroke:#9900cc,stroke-width:1px classDef engine fill:#f9a0a0,stroke:#cc0000,stroke-width:1px classDef sequence fill:#a0e9f9,stroke:#00999e,stroke-width:1px class A,G request class B service class C handler class D context class E engine class F sequence
The HTTP Inbound adapter is implemented in internal/app/adapters/inbound/http/http_inbound.go and conforms to the InboundEndpoint port interface:
type HTTPInbound struct {
config domain.InboundConfig
ctx context.Context
cancelFunc context.CancelFunc
server *http.Server
port int
logger *slog.Logger
level slog.Level
}
The HTTP Inbound is initialized with configuration parameters:
func NewHTTPInbound(config domain.InboundConfig) (*HTTPInbound, error) { // Extract port from parameters portStr, exists := config.Parameters["port"] if !exists { return nil, errors.New("port parameter is required for HTTP inbound") } port, err := strconv.Atoi(portStr) if err != nil { return nil, fmt.Errorf("invalid port: %s", portStr) } return &HTTPInbound{ config: config, port: port, logger: loggerfactory.GetLogger("http-inbound"), }, nil }
The key parameters for an HTTP Inbound are:
port: The TCP port on which the server will listensequence: The name of the mediation sequence to execute for incoming requestsWhen the Start method is called, the HTTP Inbound creates a new HTTP server and starts listening for connections:
func (h *HTTPInbound) Start(ctx context.Context, mediator ports.InboundMessageMediator) error { h.ctx, h.cancelFunc = context.WithCancel(ctx) // Create a new router router := mux.NewRouter() // Configure the handler router.HandleFunc("/{path:.*}", func(w http.ResponseWriter, r *http.Request) { h.handleRequest(w, r, mediator) }) // Create the HTTP server h.server = &http.Server{ Addr: fmt.Sprintf(":%d", h.port), Handler: router, } // Start the server in a goroutine wg := ctx.Value(utils.WaitGroupKey).(*sync.WaitGroup) wg.Add(1) go func() { defer wg.Done() h.logger.Info("Starting HTTP inbound", "port", h.port, "sequence", h.config.SequenceName) if err := h.server.ListenAndServe(); err != nil && err != http.ErrServerClosed { h.logger.Error("HTTP inbound server error", "error", err) } }() // Monitor context for cancellation go func() { <-h.ctx.Done() h.logger.Info("Shutting down HTTP inbound", "port", h.port) // Create a timeout context for graceful shutdown shutdownCtx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() if err := h.server.Shutdown(shutdownCtx); err != nil { h.logger.Error("HTTP inbound shutdown error", "error", err) } h.logger.Info("HTTP inbound shutdown complete", "port", h.port) }() return nil }
This implementation:
Requests are handled by executing the configured mediation sequence:
func (h *HTTPInbound) handleRequest(w http.ResponseWriter, r *http.Request, mediator ports.InboundMessageMediator) { // Create a new message from the request message := &domain.Message{ Payload: r, Destination: h.config.SequenceName, } // Get the sequence from the configuration context ctx := r.Context() configContext := ctx.Value(utils.ConfigContextKey).(*artifacts.ConfigContext) sequence := configContext.SequenceMap[h.config.SequenceName] // Execute the sequence result, err := sequence.Mediate(ctx, message, mediator) if err != nil { h.logger.Error("Error executing sequence", "sequence", h.config.SequenceName, "error", err) http.Error(w, "Internal Server Error", http.StatusInternalServerError) return } // Process the result if response, ok := result.Payload.(http.ResponseWriter); ok { // The sequence handled the response directly return } // Convert the result to a response h.writeResponse(w, result) }
This method:
The HTTP Inbound implements graceful shutdown to ensure that in-flight requests can complete before the server exits:
func (h *HTTPInbound) Stop() error {
if h.cancelFunc != nil {
h.cancelFunc()
}
return nil
}
This method triggers the cancellation of the context, which initiates the graceful shutdown process.
To avoid port conflicts, Synapse Go supports port offsets that can be configured in the deployment.toml file:
[server] hostname = "localhost" offset = "100"
When an offset is configured, it affects all HTTP ports in the system:
The offset is applied during server initialization:
func Run(ctx context.Context) error { // ... // Define default port httpServerPort := 8290 var hostname string if serverConfig, ok := conCtx.DeploymentConfig["server"].(map[string]string); ok { hostname = serverConfig["hostname"] if offsetStr, offsetExists := serverConfig["offset"]; offsetExists { if offsetInt, err := strconv.Atoi(offsetStr); err == nil { httpServerPort += offsetInt log.Printf("Using port offset: %d, final port: %d", offsetInt, httpServerPort) } else { log.Printf("Warning: Invalid offset value '%s', using default port", offsetStr) } } } // ... }
For HTTP inbound endpoints, the offset is applied when creating the inbound endpoint:
func NewHTTPInbound(config domain.InboundConfig) (*HTTPInbound, error) { // ... // Apply port offset if configured if offsetStr, ok := config.Parameters["portOffset"]; ok { if offset, err := strconv.Atoi(offsetStr); err == nil { port += offset } } // ... }
A key aspect of the HTTP Inbound implementation is its coordination with the main HTTP server during startup and shutdown. This coordination ensures that:
During startup, the main HTTP server and HTTP inbound servers are started independently but tracked using the same wait group.
During shutdown, the context cancellation signal propagates to all components, but the wait group ensures that the main process waits for all servers to complete their shutdown before exiting.
This coordination flow ensures that:
Synapse Go supports deploying multiple HTTP inbound endpoints, each listening on a different port:
<inbound name="FirstHTTPInbound"> <parameters> <parameter name="port">8080</parameter> </parameters> <sequence>TestInHTTPInbound</sequence> <protocol>http</protocol> </inbound> <inbound name="SecondHTTPInbound"> <parameters> <parameter name="port">8081</parameter> </parameters> <sequence>AnotherSequence</sequence> <protocol>http</protocol> </inbound>
Each HTTP inbound creates its own server instance and participates in the shared wait group for coordinated startup and shutdown.
The HTTP Inbound implementation in Synapse Go provides:
This implementation ensures that HTTP inbound endpoints operate reliably, start and stop gracefully, and integrate seamlessly with the rest of the system while adhering to the hexagonal architecture principles.