blob: 17eaa5b5473322bff774fced9510aeac8d24dc33 [file] [log] [blame]
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package server
import (
"context"
"fmt"
"io"
"net"
"net/http"
"os"
"strings"
"sync"
"sync/atomic"
)
import (
ocprom "contrib.go.opencensus.io/exporter/prometheus"
"github.com/hashicorp/go-multierror"
"github.com/prometheus/client_golang/prometheus"
"go.opencensus.io/stats/view"
"istio.io/pkg/log"
)
import (
"github.com/apache/dubbo-go-pixiu/pilot/pkg/util/network"
"github.com/apache/dubbo-go-pixiu/pkg/config/protocol"
"github.com/apache/dubbo-go-pixiu/pkg/test/echo/common"
"github.com/apache/dubbo-go-pixiu/pkg/test/echo/server/endpoint"
)
// Config for an echo server Instance.
type Config struct {
Ports common.PortList
BindIPPortsMap map[int]struct{}
BindLocalhostPortsMap map[int]struct{}
Metrics int
TLSCert string
TLSKey string
Version string
UDSServer string
Cluster string
Dialer common.Dialer
IstioVersion string
DisableALPN bool
}
func (c Config) String() string {
var b strings.Builder
b.WriteString(fmt.Sprintf("Ports: %v\n", c.Ports))
b.WriteString(fmt.Sprintf("BindIPPortsMap: %v\n", c.BindIPPortsMap))
b.WriteString(fmt.Sprintf("BindLocalhostPortsMap: %v\n", c.BindLocalhostPortsMap))
b.WriteString(fmt.Sprintf("Metrics: %v\n", c.Metrics))
b.WriteString(fmt.Sprintf("TLSCert: %v\n", c.TLSCert))
b.WriteString(fmt.Sprintf("TLSKey: %v\n", c.TLSKey))
b.WriteString(fmt.Sprintf("Version: %v\n", c.Version))
b.WriteString(fmt.Sprintf("UDSServer: %v\n", c.UDSServer))
b.WriteString(fmt.Sprintf("Cluster: %v\n", c.Cluster))
b.WriteString(fmt.Sprintf("IstioVersion: %v\n", c.IstioVersion))
return b.String()
}
var _ io.Closer = &Instance{}
// Instance of the Echo server.
type Instance struct {
Config
endpoints []endpoint.Instance
metricsServer *http.Server
ready uint32
}
// New creates a new server instance.
func New(config Config) *Instance {
log.Infof("Creating Server with config:\n%s", config)
config.Dialer = config.Dialer.FillInDefaults()
return &Instance{
Config: config,
}
}
// Start the server.
func (s *Instance) Start() (err error) {
defer func() {
if err != nil {
_ = s.Close()
}
}()
if err = s.validate(); err != nil {
return err
}
if s.Metrics > 0 {
go s.startMetricsServer()
}
s.endpoints = make([]endpoint.Instance, 0)
for _, p := range s.Ports {
ip, err := s.getListenerIP(p)
if err != nil {
return err
}
for _, ip := range getBindAddresses(ip) {
ep, err := s.newEndpoint(p, ip, "")
if err != nil {
return err
}
s.endpoints = append(s.endpoints, ep)
}
}
if len(s.UDSServer) > 0 {
ep, err := s.newEndpoint(nil, "", s.UDSServer)
if err != nil {
return err
}
s.endpoints = append(s.endpoints, ep)
}
return s.waitUntilReady()
}
func getBindAddresses(ip string) []string {
if ip != "" && ip != "localhost" {
return []string{ip}
}
// Binding to "localhost" will only bind to a single address (v4 or v6). We want both, so we need
// to be explicit
v4, v6 := false, false
// Obtain all the IPs from the node
ipAddrs, ok := network.GetPrivateIPs(context.Background())
if !ok {
return []string{ip}
}
for _, ip := range ipAddrs {
addr := net.ParseIP(ip)
if addr == nil {
// Should not happen
continue
}
if addr.To4() != nil {
v4 = true
} else {
v6 = true
}
}
addrs := []string{}
if v4 {
if ip == "localhost" {
addrs = append(addrs, "127.0.0.1")
} else {
addrs = append(addrs, "0.0.0.0")
}
}
if v6 {
if ip == "localhost" {
addrs = append(addrs, "::1")
} else {
addrs = append(addrs, "::")
}
}
return addrs
}
// Close implements the application.Application interface
func (s *Instance) Close() (err error) {
for _, s := range s.endpoints {
if s != nil {
err = multierror.Append(err, s.Close())
}
}
return
}
func (s *Instance) getListenerIP(port *common.Port) (string, error) {
// Not configured on this port, set to empty which will lead to wildcard bind
// Not 0.0.0.0 in case we want IPv6
if port == nil {
return "", nil
}
if _, f := s.BindLocalhostPortsMap[port.Port]; f {
return "localhost", nil
}
if _, f := s.BindIPPortsMap[port.Port]; !f {
return "", nil
}
if ip, f := os.LookupEnv("INSTANCE_IP"); f {
return ip, nil
}
return "", fmt.Errorf("--bind-ip set but INSTANCE_IP undefined")
}
func (s *Instance) newEndpoint(port *common.Port, listenerIP string, udsServer string) (endpoint.Instance, error) {
return endpoint.New(endpoint.Config{
Port: port,
UDSServer: udsServer,
IsServerReady: s.isReady,
Version: s.Version,
Cluster: s.Cluster,
TLSCert: s.TLSCert,
TLSKey: s.TLSKey,
Dialer: s.Dialer,
ListenerIP: listenerIP,
DisableALPN: s.DisableALPN,
IstioVersion: s.IstioVersion,
})
}
func (s *Instance) isReady() bool {
return atomic.LoadUint32(&s.ready) == 1
}
func (s *Instance) waitUntilReady() error {
wg := &sync.WaitGroup{}
onEndpointReady := func() {
wg.Done()
}
// Start the servers, updating port numbers as necessary.
for _, ep := range s.endpoints {
wg.Add(1)
if err := ep.Start(onEndpointReady); err != nil {
return err
}
}
// Wait for all the servers to start.
wg.Wait()
// Indicate that the server is now ready.
atomic.StoreUint32(&s.ready, 1)
log.Info("Echo server is now ready")
return nil
}
func (s *Instance) validate() error {
for _, port := range s.Ports {
switch port.Protocol {
case protocol.TCP:
case protocol.HTTP:
case protocol.HTTPS:
case protocol.HTTP2:
case protocol.GRPC:
default:
return fmt.Errorf("protocol %v not currently supported", port.Protocol)
}
}
return nil
}
func (s *Instance) startMetricsServer() {
mux := http.NewServeMux()
exporter, err := ocprom.NewExporter(ocprom.Options{Registry: prometheus.DefaultRegisterer.(*prometheus.Registry)})
if err != nil {
log.Errorf("could not set up prometheus exporter: %v", err)
return
}
view.RegisterExporter(exporter)
mux.Handle("/metrics", exporter)
s.metricsServer = &http.Server{
Handler: mux,
}
if err := http.ListenAndServe(fmt.Sprintf(":%d", s.Metrics), mux); err != nil {
log.Errorf("metrics terminated with err: %v", err)
}
}