blob: ff20cbe7f908c76c0af5235ccba590b85c641e25 [file] [log] [blame]
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package endpoint
import (
"bytes"
"context"
"crypto/tls"
"fmt"
"math/rand"
"net"
"net/http"
"os"
"sort"
"strconv"
"strings"
"time"
)
import (
"github.com/google/uuid"
"github.com/gorilla/websocket"
"golang.org/x/net/http2"
"golang.org/x/net/http2/h2c"
)
import (
"github.com/apache/dubbo-go-pixiu/pkg/test/echo"
"github.com/apache/dubbo-go-pixiu/pkg/test/echo/common"
"github.com/apache/dubbo-go-pixiu/pkg/test/util/retry"
)
const (
readyTimeout = 10 * time.Second
readyInterval = 2 * time.Second
)
var webSocketUpgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
// allow all connections by default
return true
},
}
var _ Instance = &httpInstance{}
type httpInstance struct {
Config
server *http.Server
}
func newHTTP(config Config) Instance {
return &httpInstance{
Config: config,
}
}
func (s *httpInstance) GetConfig() Config {
return s.Config
}
func (s *httpInstance) Start(onReady OnReadyFunc) error {
h2s := &http2.Server{}
s.server = &http.Server{
Handler: h2c.NewHandler(&httpHandler{
Config: s.Config,
}, h2s),
}
var listener net.Listener
var port int
var err error
if s.isUDS() {
port = 0
listener, err = listenOnUDS(s.UDSServer)
} else if s.Port.TLS {
cert, cerr := tls.LoadX509KeyPair(s.TLSCert, s.TLSKey)
if cerr != nil {
return fmt.Errorf("could not load TLS keys: %v", cerr)
}
nextProtos := []string{"h2", "http/1.1", "http/1.0"}
if s.DisableALPN {
nextProtos = nil
}
config := &tls.Config{
Certificates: []tls.Certificate{cert},
NextProtos: nextProtos,
GetConfigForClient: func(info *tls.ClientHelloInfo) (*tls.Config, error) {
// There isn't a way to pass through all ALPNs presented by the client down to the
// HTTP server to return in the response. However, for debugging, we can at least log
// them at this level.
epLog.Infof("TLS connection with alpn: %v", info.SupportedProtos)
return nil, nil
},
}
// Listen on the given port and update the port if it changed from what was passed in.
listener, port, err = listenOnAddressTLS(s.ListenerIP, s.Port.Port, config)
// Store the actual listening port back to the argument.
s.Port.Port = port
} else {
// Listen on the given port and update the port if it changed from what was passed in.
listener, port, err = listenOnAddress(s.ListenerIP, s.Port.Port)
// Store the actual listening port back to the argument.
s.Port.Port = port
}
if err != nil {
return err
}
if s.isUDS() {
fmt.Printf("Listening HTTP/1.1 on %v\n", s.UDSServer)
} else if s.Port.TLS {
s.server.Addr = fmt.Sprintf(":%d", port)
fmt.Printf("Listening HTTPS/1.1 on %v\n", port)
} else {
s.server.Addr = fmt.Sprintf(":%d", port)
fmt.Printf("Listening HTTP/1.1 on %v\n", port)
}
// Start serving HTTP traffic.
go func() {
err := s.server.Serve(listener)
epLog.Warnf("Port %d listener terminated with error: %v", port, err)
}()
// Notify the WaitGroup once the port has transitioned to ready.
go s.awaitReady(onReady, listener.Addr().String())
return nil
}
func (s *httpInstance) isUDS() bool {
return s.UDSServer != ""
}
func (s *httpInstance) awaitReady(onReady OnReadyFunc, address string) {
defer onReady()
client := http.Client{}
var url string
if s.isUDS() {
url = "http://unix/" + s.UDSServer
client.Transport = &http.Transport{
DialContext: func(_ context.Context, _, _ string) (net.Conn, error) {
return net.Dial("unix", s.UDSServer)
},
}
} else if s.Port.TLS {
url = fmt.Sprintf("https://%s", address)
client.Transport = &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: true}}
} else {
url = fmt.Sprintf("http://%s", address)
}
err := retry.UntilSuccess(func() error {
resp, err := client.Get(url)
if err != nil {
return err
}
defer resp.Body.Close()
// The handler applies server readiness when handling HTTP requests. Since the
// server won't become ready until all endpoints (including this one) report
// ready, the handler will return 503. This means that the endpoint is now ready.
if resp.StatusCode != http.StatusServiceUnavailable {
return fmt.Errorf("unexpected status code %d", resp.StatusCode)
}
// Server is up now, we're ready.
return nil
}, retry.Timeout(readyTimeout), retry.Delay(readyInterval))
if err != nil {
epLog.Errorf("readiness failed for endpoint %s: %v", url, err)
} else {
epLog.Infof("ready for HTTP endpoint %s", url)
}
}
func (s *httpInstance) Close() error {
if s.server != nil {
return s.server.Close()
}
return nil
}
type httpHandler struct {
Config
}
// Imagine a pie of different flavors.
// The flavors are the HTTP response codes.
// The chance of a particular flavor is ( slices / sum of slices ).
type codeAndSlices struct {
httpResponseCode int
slices int
}
func (h *httpHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
id := uuid.New()
epLog.WithLabels("method", r.Method, "url", r.URL, "host", r.Host, "headers", r.Header, "id", id).Infof("HTTP Request")
if h.Port == nil {
defer common.Metrics.HTTPRequests.With(common.PortLabel.Value("uds")).Increment()
} else {
defer common.Metrics.HTTPRequests.With(common.PortLabel.Value(strconv.Itoa(h.Port.Port))).Increment()
}
if !h.IsServerReady() {
// Handle readiness probe failure.
epLog.Infof("HTTP service not ready, returning 503")
w.WriteHeader(http.StatusServiceUnavailable)
return
}
if common.IsWebSocketRequest(r) {
h.webSocketEcho(w, r)
} else {
h.echo(w, r, id)
}
}
// nolint: interfacer
func writeError(out *bytes.Buffer, msg string) {
epLog.Warn(msg)
_, _ = out.WriteString(msg + "\n")
}
func (h *httpHandler) echo(w http.ResponseWriter, r *http.Request, id uuid.UUID) {
body := bytes.Buffer{}
if err := r.ParseForm(); err != nil {
writeError(&body, "ParseForm() error: "+err.Error())
}
// If the request has form ?delay=[:duration] wait for duration
// For example, ?delay=10s will cause the response to wait 10s before responding
if err := delayResponse(r); err != nil {
writeError(&body, "error delaying response error: "+err.Error())
}
// If the request has form ?headers=name:value[,name:value]* return those headers in response
if err := setHeaderResponseFromHeaders(r, w); err != nil {
writeError(&body, "response headers error: "+err.Error())
}
// If the request has form ?codes=code[:chance][,code[:chance]]* return those codes, rather than 200
// For example, ?codes=500:1,200:1 returns 500 1/2 times and 200 1/2 times
// For example, ?codes=500:90,200:10 returns 500 90% of times and 200 10% of times
code, err := setResponseFromCodes(r, w)
if err != nil {
writeError(&body, "codes error: "+err.Error())
}
h.addResponsePayload(r, &body)
w.Header().Set("Content-Type", "application/text")
if _, err := w.Write(body.Bytes()); err != nil {
epLog.Warn(err)
}
epLog.WithLabels("code", code, "headers", w.Header(), "id", id).Infof("HTTP Response")
}
func (h *httpHandler) webSocketEcho(w http.ResponseWriter, r *http.Request) {
// adapted from https://github.com/gorilla/websocket/blob/master/examples/echo/server.go
// First send upgrade headers
c, err := webSocketUpgrader.Upgrade(w, r, nil)
if err != nil {
epLog.Warn("websocket-echo upgrade failed: " + err.Error())
return
}
defer func() { _ = c.Close() }()
// ping
mt, message, err := c.ReadMessage()
if err != nil {
epLog.Warn("websocket-echo read failed: " + err.Error())
return
}
body := bytes.Buffer{}
h.addResponsePayload(r, &body)
body.Write(message)
writeField(&body, echo.StatusCodeField, strconv.Itoa(http.StatusOK))
// pong
err = c.WriteMessage(mt, body.Bytes())
if err != nil {
writeError(&body, "websocket-echo write failed: "+err.Error())
return
}
}
// nolint: interfacer
func (h *httpHandler) addResponsePayload(r *http.Request, body *bytes.Buffer) {
port := ""
if h.Port != nil {
port = strconv.Itoa(h.Port.Port)
}
writeField(body, echo.ServiceVersionField, h.Version)
writeField(body, echo.ServicePortField, port)
writeField(body, echo.HostField, r.Host)
// Use raw path, we don't want golang normalizing anything since we use this for testing purposes
writeField(body, echo.URLField, r.RequestURI)
writeField(body, echo.ClusterField, h.Cluster)
writeField(body, echo.IstioVersionField, h.IstioVersion)
writeField(body, echo.MethodField, r.Method)
writeField(body, echo.ProtocolField, r.Proto)
ip, _, _ := net.SplitHostPort(r.RemoteAddr)
writeField(body, echo.IPField, ip)
// Note: since this is the NegotiatedProtocol, it will be set to empty if the client sends an ALPN
// not supported by the server (ie one of h2,http/1.1,http/1.0)
var alpn string
if r.TLS != nil {
alpn = r.TLS.NegotiatedProtocol
}
writeField(body, echo.AlpnField, alpn)
var keys []string
for k := range r.Header {
keys = append(keys, k)
}
sort.Strings(keys)
for _, key := range keys {
values := r.Header[key]
for _, value := range values {
writeRequestHeader(body, key, value)
}
}
if hostname, err := os.Hostname(); err == nil {
writeField(body, echo.HostnameField, hostname)
}
}
func delayResponse(request *http.Request) error {
d := request.FormValue("delay")
if len(d) == 0 {
return nil
}
t, err := time.ParseDuration(d)
if err != nil {
return err
}
time.Sleep(t)
return nil
}
func setHeaderResponseFromHeaders(request *http.Request, response http.ResponseWriter) error {
s := request.FormValue("headers")
if len(s) == 0 {
return nil
}
responseHeaders := strings.Split(s, ",")
for _, responseHeader := range responseHeaders {
parts := strings.Split(responseHeader, ":")
// require name:value format
if len(parts) != 2 {
return fmt.Errorf("invalid %q (want name:value)", responseHeader)
}
name := parts[0]
value := parts[1]
// Avoid using .Set() to allow users to pass non-canonical forms
response.Header()[name] = []string{value}
}
return nil
}
func setResponseFromCodes(request *http.Request, response http.ResponseWriter) (int, error) {
responseCodes := request.FormValue("codes")
codes, err := validateCodes(responseCodes)
if err != nil {
return 0, err
}
// Choose a random "slice" from a pie
totalSlices := 0
for _, flavor := range codes {
totalSlices += flavor.slices
}
slice := rand.Intn(totalSlices)
// What flavor is that slice?
responseCode := codes[len(codes)-1].httpResponseCode // Assume the last slice
position := 0
for n, flavor := range codes {
if position > slice {
responseCode = codes[n-1].httpResponseCode // No, use an earlier slice
break
}
position += flavor.slices
}
response.WriteHeader(responseCode)
return responseCode, nil
}
// codes must be comma-separated HTTP response code, colon, positive integer
func validateCodes(codestrings string) ([]codeAndSlices, error) {
if codestrings == "" {
// Consider no codes to be "200:1" -- return HTTP 200 100% of the time.
codestrings = strconv.Itoa(http.StatusOK) + ":1"
}
aCodestrings := strings.Split(codestrings, ",")
codes := make([]codeAndSlices, len(aCodestrings))
for i, codestring := range aCodestrings {
codeAndSlice, err := validateCodeAndSlices(codestring)
if err != nil {
return []codeAndSlices{{http.StatusBadRequest, 1}}, err
}
codes[i] = codeAndSlice
}
return codes, nil
}
// code must be HTTP response code
func validateCodeAndSlices(codecount string) (codeAndSlices, error) {
flavor := strings.Split(codecount, ":")
// Demand code or code:number
if len(flavor) == 0 || len(flavor) > 2 {
return codeAndSlices{http.StatusBadRequest, 9999},
fmt.Errorf("invalid %q (want code or code:count)", codecount)
}
n, err := strconv.Atoi(flavor[0])
if err != nil {
return codeAndSlices{http.StatusBadRequest, 9999}, err
}
if n < http.StatusOK || n >= 600 {
return codeAndSlices{http.StatusBadRequest, 9999},
fmt.Errorf("invalid HTTP response code %v", n)
}
count := 1
if len(flavor) > 1 {
count, err = strconv.Atoi(flavor[1])
if err != nil {
return codeAndSlices{http.StatusBadRequest, 9999}, err
}
if count < 0 {
return codeAndSlices{http.StatusBadRequest, 9999},
fmt.Errorf("invalid count %v", count)
}
}
return codeAndSlices{n, count}, nil
}