blob: 6b602cae45b72bdc2c2df62cb7a4351fc1165026 [file] [log] [blame]
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package validation
import (
"context"
"fmt"
"io"
"net"
"strconv"
"strings"
"time"
)
import (
"istio.io/pkg/log"
)
import (
"github.com/apache/dubbo-go-pixiu/tools/istio-iptables/pkg/config"
)
var istioLocalIPv6 = net.IP{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 6}
type ReturnCode int
const (
DONE ReturnCode = iota
)
type Validator struct {
Config *Config
}
type Config struct {
ServerListenAddress []string
ServerOriginalPort uint16
ServerOriginalIP net.IP
ServerReadyBarrier chan ReturnCode
ProbeTimeout time.Duration
}
type Service struct {
Config *Config
}
type Client struct {
Config *Config
}
func (validator *Validator) Run() error {
s := Service{
validator.Config,
}
sError := make(chan error, 1)
sTimer := time.NewTimer(s.Config.ProbeTimeout)
defer sTimer.Stop()
go func() {
sError <- s.Run()
}()
// infinite loop
go func() {
c := Client{Config: validator.Config}
<-c.Config.ServerReadyBarrier
for {
_ = c.Run()
// Avoid spamming the request to the validation server.
// Since the TIMEWAIT socket is cleaned up in 60 second,
// it's maintaining 60 TIMEWAIT sockets. Not big deal.
time.Sleep(time.Second)
}
}()
select {
case <-sTimer.C:
return fmt.Errorf("validation timeout")
case err := <-sError:
if err == nil {
log.Info("Validation passed")
} else {
log.Errorf("Validation failed: %v", err)
}
return err
}
}
// TODO(lambdai): remove this if iptables only need to redirect to outbound proxy port on A call A
func genListenerAddress(ip net.IP, ports []string) []string {
addresses := make([]string, 0, len(ports))
for _, port := range ports {
addresses = append(addresses, net.JoinHostPort(ip.String(), port))
}
return addresses
}
func NewValidator(config *config.Config, hostIP net.IP) *Validator {
log.Infof("in new validator: %v", hostIP.String())
// It's tricky here:
// Connect to 127.0.0.6 will redirect to 127.0.0.1
// Connect to ::6 will redirect to ::1
isIpv6 := hostIP.To4() == nil
listenIP := net.IPv4(127, 0, 0, 1)
serverIP := net.IPv4(127, 0, 0, 6)
if isIpv6 {
listenIP = net.IPv6loopback
serverIP = istioLocalIPv6
}
return &Validator{
Config: &Config{
ServerListenAddress: genListenerAddress(listenIP, []string{config.ProxyPort, config.InboundCapturePort}),
ServerOriginalPort: config.IptablesProbePort,
ServerOriginalIP: serverIP,
ServerReadyBarrier: make(chan ReturnCode, 1),
ProbeTimeout: config.ProbeTimeout,
},
}
}
// Write human readable response
func echo(conn io.WriteCloser, echo []byte) {
_, _ = conn.Write(echo)
_ = conn.Close()
}
func restoreOriginalAddress(l net.Listener, config *Config, c chan<- ReturnCode) {
defer l.Close()
for {
conn, err := l.Accept()
if err != nil {
log.Errorf("Listener failed to accept connection: %v", err)
continue
}
_, port, err := GetOriginalDestination(conn)
if err != nil {
log.Errorf("Error getting original dst: %v", err)
conn.Close()
continue
}
// echo original port for debugging.
// Since the write amount is small it should fit in sock buffer and never blocks.
echo(conn, []byte(strconv.Itoa(int(port))))
// Handle connections
// Since the write amount is small it should fit in sock buffer and never blocks.
if port != config.ServerOriginalPort {
// This could be probe request from no where
continue
}
// Server recovers the magical original port
c <- DONE
return
}
}
func (s *Service) Run() error {
// at most 2 message: ipv4 and ipv6
c := make(chan ReturnCode, 2)
hasAtLeastOneListener := false
for _, addr := range s.Config.ServerListenAddress {
log.Infof("Listening on %v", addr)
config := &net.ListenConfig{Control: reuseAddr}
l, err := config.Listen(context.Background(), "tcp", addr) // bind to the address:port
if err != nil {
log.Errorf("Error on listening: %v", err)
continue
}
hasAtLeastOneListener = true
go restoreOriginalAddress(l, s.Config, c)
}
if hasAtLeastOneListener {
s.Config.ServerReadyBarrier <- DONE
// bump at least one since we currently support either v4 or v6
<-c
return nil
}
return fmt.Errorf("no listener available: %s", strings.Join(s.Config.ServerListenAddress, ","))
}
func (c *Client) Run() error {
laddr, err := net.ResolveTCPAddr("tcp", "127.0.0.1:0")
if err != nil {
return err
}
if c.Config.ServerOriginalIP.To4() == nil {
laddr, err = net.ResolveTCPAddr("tcp", "[::1]:0")
if err != nil {
return err
}
}
sOriginalPort := fmt.Sprintf("%d", c.Config.ServerOriginalPort)
serverOriginalAddress := net.JoinHostPort(c.Config.ServerOriginalIP.String(), sOriginalPort)
raddr, err := net.ResolveTCPAddr("tcp", serverOriginalAddress)
if err != nil {
return err
}
conn, err := net.DialTCP("tcp", laddr, raddr)
if err != nil {
log.Errorf("Error connecting to %s: %v", serverOriginalAddress, err)
return err
}
conn.Close()
return nil
}