blob: 289eee46d2bdc88b4b7726c42bdd0583df9af8c7 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package main
import (
"context"
"fmt"
"net"
"sync"
"time"
"github.com/apache/dubbo-kubernetes/api/mesh"
gvks "github.com/apache/dubbo-kubernetes/pkg/core/schema/gvk"
"github.com/apache/dubbo-kubernetes/api/dds"
api "github.com/apache/dubbo-kubernetes/api/resource/v1alpha1"
"github.com/apache/dubbo-kubernetes/pkg/core/logger"
"github.com/cenkalti/backoff"
"github.com/gogo/protobuf/proto"
"google.golang.org/grpc"
)
var (
// use plain server to test
grpcAddr = "127.0.0.1:30060"
grpcUpstreamAddr = grpcAddr
)
type Config struct {
// InitialDiscoveryRequests is a list of resources to watch at first, represented as URLs (for new DDS resource naming)
// or type URLs.
InitialDiscoveryRequest []*dds.ObserveRequest
// BackoffPolicy determines the reconnect policy. Based on ddsclient.
BackoffPolicy backoff.BackOff
GrpcOpts []grpc.DialOption
Namespace string
// It is sent by ddsclient, must match a known endpoint IP.
IP string
}
// DDSC implements a basic ddsclient for DDS, for use in stress tests and tools
// or libraries that need to connect to Dubbo admin or other DDS servers.
// Currently only for testing!
type DDSC struct {
// Stream is the GRPC connection stream, allowing direct GRPC send operations.
// Set after Dial is called.
stream dds.RuleService_ObserveClient
// dds ddsclient used to create a stream
ddsclient dds.RuleServiceClient
snpclient mesh.ServiceNameMappingServiceClient
conn *grpc.ClientConn
// Indicates if the DDSC ddsclient is closed
closed bool
// NodeID is the node identity sent to Admin
nodeID string
url string
authentication []*api.AuthenticationPolicyToClient
authorization []*api.AuthorizationPolicyToClient
conditionRoute []*api.ConditionRouteToClient
tagRoute []*api.TagRouteToClient
dynamicConfig []*api.DynamicConfigToClient
serviceMapping []*api.ServiceNameMappingToClient
// Last received message, by type
Received map[string]*dds.ObserveResponse
mutex sync.RWMutex
// RecvWg is for letting goroutines know when the goroutine handling the DDS stream finishes.
RecvWg sync.WaitGroup
cfg *Config
}
func New(discoveryAddr string, opts *Config) (*DDSC, error) {
if opts == nil {
opts = &Config{}
}
// We want to recreate stream
if opts.BackoffPolicy == nil {
opts.BackoffPolicy = backoff.NewExponentialBackOff()
}
ddsc := &DDSC{
url: discoveryAddr,
cfg: opts,
Received: map[string]*dds.ObserveResponse{},
RecvWg: sync.WaitGroup{},
}
if opts.IP == "" {
opts.IP = getPrivateIPIfAvailable().String()
}
ddsc.nodeID = fmt.Sprintf("%s~%s", opts.IP, opts.Namespace)
if err := ddsc.Dial(); err != nil {
return nil, err
}
return ddsc, nil
}
// Dial connects to a dds server
// nolint
func (a *DDSC) Dial() error {
opts := a.cfg
var err error
grpcDialOptions := opts.GrpcOpts
if len(grpcDialOptions) == 0 {
// Only disable transport security if the user didn't supply custom dial options
grpcDialOptions = append(grpcDialOptions, grpc.WithInsecure())
}
a.conn, err = grpc.Dial(a.url, grpcDialOptions...)
if err != nil {
return err
}
return nil
}
func getPrivateIPIfAvailable() net.IP {
addrs, _ := net.InterfaceAddrs()
for _, addr := range addrs {
var ip net.IP
switch v := addr.(type) {
case *net.IPNet:
ip = v.IP
case *net.IPAddr:
ip = v.IP
default:
continue
}
if !ip.IsLoopback() {
return ip
}
}
return net.IPv4zero
}
// reconnect will create a new stream
func (a *DDSC) reconnect() {
a.mutex.RLock()
if a.closed {
a.mutex.RUnlock()
return
}
a.mutex.RUnlock()
err := a.Run()
if err == nil {
a.cfg.BackoffPolicy.Reset()
} else {
time.AfterFunc(a.cfg.BackoffPolicy.NextBackOff(), a.reconnect)
}
}
func (a *DDSC) Run() error {
var err error
a.ddsclient = dds.NewRuleServiceClient(a.conn)
a.snpclient = mesh.NewServiceNameMappingServiceClient(a.conn)
a.stream, err = a.ddsclient.Observe(context.Background())
if err != nil {
return err
}
// Send the snp message
a.sendSnp()
// Send the initial requests
for _, r := range a.cfg.InitialDiscoveryRequest {
err := a.Send(r)
if err != nil {
return err
}
}
// by default, we assume 1 goroutine decrements the waitgroup (go a.handleRecv()).
// for synchronizing when the goroutine finishes reading from the gRPC stream.
a.RecvWg.Add(1)
go a.handleRecv()
return nil
}
func (a *DDSC) sendSnp() {
res, err := a.snpclient.RegisterServiceAppMapping(context.Background(), &mesh.ServiceMappingRequest{
Namespace: "dubbo-system",
ApplicationName: "test-app",
InterfaceNames: []string{
"test-interface1",
"test-interface2",
},
})
if err != nil || !res.Success {
a.sendSnp()
}
}
// Send Raw send of request
func (a *DDSC) Send(req *dds.ObserveRequest) error {
return a.stream.Send(req)
}
func (a *DDSC) handleRecv() {
for {
var err error
msg, err := a.stream.Recv()
if err != nil {
a.RecvWg.Done()
logger.Sugar().Infof("Connection closed for node %v with err: %v", a.nodeID, err)
// if 'reconnect' enabled - schedule a new Run
if a.cfg.BackoffPolicy != nil {
time.AfterFunc(a.cfg.BackoffPolicy.NextBackOff(), a.reconnect)
} else {
a.Close()
}
return
}
logger.Sugar().Info("Received ", a.url, " type ", msg.Type,
"nonce= ", msg.Nonce)
// Process the resources
var authentication []*api.AuthenticationPolicyToClient
var authorization []*api.AuthorizationPolicyToClient
var serviceMapping []*api.ServiceNameMappingToClient
var conditionRoute []*api.ConditionRouteToClient
var tagRoute []*api.TagRouteToClient
var dynamicConfig []*api.DynamicConfigToClient
switch msg.Type {
case gvks.AuthenticationPolicy:
for _, d := range msg.Data {
valBytes := d.Value
auth := &api.AuthenticationPolicyToClient{}
err := proto.Unmarshal(valBytes, auth)
if err != nil {
return
}
authentication = append(authentication, auth)
a.handleAuthentication(authentication)
}
case gvks.AuthorizationPolicy:
for _, d := range msg.Data {
valBytes := d.Value
auth := &api.AuthorizationPolicyToClient{}
err := proto.Unmarshal(valBytes, auth)
if err != nil {
return
}
authorization = append(authorization, auth)
a.handleAuthorization(authorization)
}
case gvks.ServiceNameMapping:
for _, d := range msg.Data {
valBytes := d.Value
auth := &api.ServiceNameMappingToClient{}
err := proto.Unmarshal(valBytes, auth)
if err != nil {
return
}
serviceMapping = append(serviceMapping, auth)
a.handleServiceNameMapping(serviceMapping)
}
case gvks.ConditionRoute:
for _, d := range msg.Data {
valBytes := d.Value
auth := &api.ConditionRouteToClient{}
err := proto.Unmarshal(valBytes, auth)
if err != nil {
return
}
conditionRoute = append(conditionRoute, auth)
a.handleConditionRoute(conditionRoute)
}
case gvks.DynamicConfig:
for _, d := range msg.Data {
valBytes := d.Value
auth := &api.DynamicConfigToClient{}
err := proto.Unmarshal(valBytes, auth)
if err != nil {
return
}
dynamicConfig = append(dynamicConfig, auth)
a.handleDynamicConfig(dynamicConfig)
}
case gvks.TagRoute:
for _, d := range msg.Data {
valBytes := d.Value
auth := &api.TagRouteToClient{}
err := proto.Unmarshal(valBytes, auth)
if err != nil {
return
}
tagRoute = append(tagRoute, auth)
a.handleTagRoute(tagRoute)
}
}
a.mutex.Lock()
a.Received[msg.Type] = msg
err = a.ack(msg)
if err != nil {
return
}
a.mutex.Unlock()
}
}
func (a *DDSC) ack(msg *dds.ObserveResponse) error {
return a.stream.Send(&dds.ObserveRequest{
Nonce: msg.Nonce,
Type: msg.Type,
})
}
// Close the stream
func (a *DDSC) Close() {
a.mutex.Lock()
err := a.conn.Close()
if err != nil {
return
}
a.closed = true
a.mutex.Unlock()
}
func (a *DDSC) handleAuthentication(ll []*api.AuthenticationPolicyToClient) {
a.authentication = ll
logger.Sugar().Info(ll)
}
func (a *DDSC) handleAuthorization(ll []*api.AuthorizationPolicyToClient) {
a.authorization = ll
logger.Sugar().Info(ll)
}
func (a *DDSC) handleServiceNameMapping(ll []*api.ServiceNameMappingToClient) {
a.serviceMapping = ll
logger.Sugar().Info(ll)
}
func (a *DDSC) handleConditionRoute(ll []*api.ConditionRouteToClient) {
a.conditionRoute = ll
logger.Sugar().Info(ll)
}
func (a *DDSC) handleTagRoute(ll []*api.TagRouteToClient) {
a.tagRoute = ll
logger.Sugar().Info(ll)
}
func (a *DDSC) handleDynamicConfig(ll []*api.DynamicConfigToClient) {
a.dynamicConfig = ll
logger.Sugar().Info(ll)
}
func main() {
initialWatch := []*dds.ObserveRequest{
{
Nonce: "",
Type: gvks.AuthorizationPolicy,
},
{
Nonce: "",
Type: gvks.AuthenticationPolicy,
},
{
Nonce: "",
Type: gvks.DynamicConfig,
},
{
Nonce: "",
Type: gvks.TagRoute,
},
{
Nonce: "",
Type: gvks.ConditionRoute,
},
{
Nonce: "",
Type: gvks.ServiceNameMapping,
},
}
ddscConn, err := New(grpcUpstreamAddr, &Config{
InitialDiscoveryRequest: initialWatch,
Namespace: "dubbo-system",
})
if err != nil {
panic(err)
}
err = ddscConn.Run()
if err != nil {
panic("DDSC: failed running")
}
ddscConn.RecvWg.Wait()
}