blob: d414501335dd18283af45f38c29950a0c8cec21d [file] [log] [blame]
// Copyright 2015 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package httpproxy
import (
"math/rand"
"net/url"
"sync"
"time"
)
// defaultRefreshInterval is the default proxyRefreshIntervalMs value
// as in etcdmain/config.go.
const defaultRefreshInterval = 30000 * time.Millisecond
var once sync.Once
func init() {
rand.Seed(time.Now().UnixNano())
}
func newDirector(urlsFunc GetProxyURLs, failureWait time.Duration, refreshInterval time.Duration) *director {
d := &director{
uf: urlsFunc,
failureWait: failureWait,
}
d.refresh()
go func() {
// In order to prevent missing proxy endpoints in the first try:
// when given refresh interval of defaultRefreshInterval or greater
// and whenever there is no available proxy endpoints,
// give 1-second refreshInterval.
for {
es := d.endpoints()
ri := refreshInterval
if ri >= defaultRefreshInterval {
if len(es) == 0 {
ri = time.Second
}
}
if len(es) > 0 {
once.Do(func() {
var sl []string
for _, e := range es {
sl = append(sl, e.URL.String())
}
plog.Infof("endpoints found %q", sl)
})
}
time.Sleep(ri)
d.refresh()
}
}()
return d
}
type director struct {
sync.Mutex
ep []*endpoint
uf GetProxyURLs
failureWait time.Duration
}
func (d *director) refresh() {
urls := d.uf()
d.Lock()
defer d.Unlock()
var endpoints []*endpoint
for _, u := range urls {
uu, err := url.Parse(u)
if err != nil {
plog.Printf("upstream URL invalid: %v", err)
continue
}
endpoints = append(endpoints, newEndpoint(*uu, d.failureWait))
}
// shuffle array to avoid connections being "stuck" to a single endpoint
for i := range endpoints {
j := rand.Intn(i + 1)
endpoints[i], endpoints[j] = endpoints[j], endpoints[i]
}
d.ep = endpoints
}
func (d *director) endpoints() []*endpoint {
d.Lock()
defer d.Unlock()
filtered := make([]*endpoint, 0)
for _, ep := range d.ep {
if ep.Available {
filtered = append(filtered, ep)
}
}
return filtered
}
func newEndpoint(u url.URL, failureWait time.Duration) *endpoint {
ep := endpoint{
URL: u,
Available: true,
failFunc: timedUnavailabilityFunc(failureWait),
}
return &ep
}
type endpoint struct {
sync.Mutex
URL url.URL
Available bool
failFunc func(ep *endpoint)
}
func (ep *endpoint) Failed() {
ep.Lock()
if !ep.Available {
ep.Unlock()
return
}
ep.Available = false
ep.Unlock()
plog.Printf("marked endpoint %s unavailable", ep.URL.String())
if ep.failFunc == nil {
plog.Printf("no failFunc defined, endpoint %s will be unavailable forever.", ep.URL.String())
return
}
ep.failFunc(ep)
}
func timedUnavailabilityFunc(wait time.Duration) func(*endpoint) {
return func(ep *endpoint) {
time.AfterFunc(wait, func() {
ep.Available = true
plog.Printf("marked endpoint %s available, to retest connectivity", ep.URL.String())
})
}
}