blob: 15751619b30b7e5efa224bb7170afb16b4069fd0 [file] [log] [blame]
package notifications
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"sync"
"time"
)
// httpSink implements a single-flight, http notification endpoint. This is
// very lightweight in that it only makes an attempt at an http request.
// Reliability should be provided by the caller.
type httpSink struct {
url string
mu sync.Mutex
closed bool
client *http.Client
listeners []httpStatusListener
// TODO(stevvooe): Allow one to configure the media type accepted by this
// sink and choose the serialization based on that.
}
// newHTTPSink returns an unreliable, single-flight http sink. Wrap in other
// sinks for increased reliability.
func newHTTPSink(u string, timeout time.Duration, headers http.Header, transport *http.Transport, listeners ...httpStatusListener) *httpSink {
if transport == nil {
transport = http.DefaultTransport.(*http.Transport)
}
return &httpSink{
url: u,
listeners: listeners,
client: &http.Client{
Transport: &headerRoundTripper{
Transport: transport,
headers: headers,
},
Timeout: timeout,
},
}
}
// httpStatusListener is called on various outcomes of sending notifications.
type httpStatusListener interface {
success(status int, events ...Event)
failure(status int, events ...Event)
err(err error, events ...Event)
}
// Accept makes an attempt to notify the endpoint, returning an error if it
// fails. It is the caller's responsibility to retry on error. The events are
// accepted or rejected as a group.
func (hs *httpSink) Write(events ...Event) error {
hs.mu.Lock()
defer hs.mu.Unlock()
defer hs.client.Transport.(*headerRoundTripper).CloseIdleConnections()
if hs.closed {
return ErrSinkClosed
}
envelope := Envelope{
Events: events,
}
// TODO(stevvooe): It is not ideal to keep re-encoding the request body on
// retry but we are going to do it to keep the code simple. It is likely
// we could change the event struct to manage its own buffer.
p, err := json.MarshalIndent(envelope, "", " ")
if err != nil {
for _, listener := range hs.listeners {
listener.err(err, events...)
}
return fmt.Errorf("%v: error marshaling event envelope: %v", hs, err)
}
body := bytes.NewReader(p)
resp, err := hs.client.Post(hs.url, EventsMediaType, body)
if err != nil {
for _, listener := range hs.listeners {
listener.err(err, events...)
}
return fmt.Errorf("%v: error posting: %v", hs, err)
}
defer resp.Body.Close()
// The notifier will treat any 2xx or 3xx response as accepted by the
// endpoint.
switch {
case resp.StatusCode >= 200 && resp.StatusCode < 400:
for _, listener := range hs.listeners {
listener.success(resp.StatusCode, events...)
}
// TODO(stevvooe): This is a little accepting: we may want to support
// unsupported media type responses with retries using the correct
// media type. There may also be cases that will never work.
return nil
default:
for _, listener := range hs.listeners {
listener.failure(resp.StatusCode, events...)
}
return fmt.Errorf("%v: response status %v unaccepted", hs, resp.Status)
}
}
// Close the endpoint
func (hs *httpSink) Close() error {
hs.mu.Lock()
defer hs.mu.Unlock()
if hs.closed {
return fmt.Errorf("httpsink: already closed")
}
hs.closed = true
return nil
}
func (hs *httpSink) String() string {
return fmt.Sprintf("httpSink{%s}", hs.url)
}
type headerRoundTripper struct {
*http.Transport // must be transport to support CancelRequest
headers http.Header
}
func (hrt *headerRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
var nreq http.Request
nreq = *req
nreq.Header = make(http.Header)
merge := func(headers http.Header) {
for k, v := range headers {
nreq.Header[k] = append(nreq.Header[k], v...)
}
}
merge(req.Header)
merge(hrt.headers)
return hrt.Transport.RoundTrip(&nreq)
}