blob: a1e230d49886510b7f3f20e4e63931400802efb1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/*
*
* Copyright 2021 gRPC authors.
*
*/
package controller
import (
"context"
"fmt"
"time"
)
import (
"github.com/golang/protobuf/proto"
"google.golang.org/grpc"
)
import (
controllerversion "dubbo.apache.org/dubbo-go/v3/xds/client/controller/version"
resourceversion "dubbo.apache.org/dubbo-go/v3/xds/client/controller/version"
"dubbo.apache.org/dubbo-go/v3/xds/client/load"
"dubbo.apache.org/dubbo-go/v3/xds/client/resource"
)
// AddWatch adds a watch for an xDS resource given its type and name.
func (t *Controller) AddWatch(rType resource.ResourceType, resourceName string) {
t.sendCh.Put(&watchAction{
rType: rType,
remove: false,
resource: resourceName,
})
}
// RemoveWatch cancels an already registered watch for an xDS resource
// given its type and name.
func (t *Controller) RemoveWatch(rType resource.ResourceType, resourceName string) {
t.sendCh.Put(&watchAction{
rType: rType,
remove: true,
resource: resourceName,
})
}
// run starts an ADS stream (and backs off exponentially, if the previous
// stream failed without receiving a single reply) and runs the sender and
// receiver routines to send and receive data from the stream respectively.
func (t *Controller) run(ctx context.Context) {
go t.send(ctx)
// TODO: start a goroutine monitoring ClientConn's connectivity state, and
// report error (and log) when stats is transient failure.
retries := 0
for {
select {
case <-ctx.Done():
return
default:
}
if retries != 0 {
timer := time.NewTimer(t.backoff(retries))
select {
case <-timer.C:
case <-ctx.Done():
if !timer.Stop() {
<-timer.C
}
return
}
}
retries++
stream, err := t.vClient.NewStream(ctx, t.cc)
if err != nil {
t.updateHandler.NewConnectionError(err)
t.logger.Warnf("xds: ADS stream creation failed: %v", err)
continue
}
t.logger.Infof("ADS stream created")
select {
case <-t.streamCh:
default:
}
t.streamCh <- stream
if t.recv(stream) {
retries = 0
}
}
}
// send is a separate goroutine for sending watch requests on the xds stream.
//
// It watches the stream channel for new streams, and the request channel for
// new requests to send on the stream.
//
// For each new request (watchAction), it's
// - processed and added to the watch map
// - so resend will pick them up when there are new streams
// - sent on the current stream if there's one
// - the current stream is cleared when any send on it fails
//
// For each new stream, all the existing requests will be resent.
//
// Note that this goroutine doesn't do anything to the old stream when there's a
// new one. In fact, there should be only one stream in progress, and new one
// should only be created when the old one fails (recv returns an error).
func (t *Controller) send(ctx context.Context) {
var stream grpc.ClientStream
for {
select {
case <-ctx.Done():
return
case stream = <-t.streamCh:
if !t.sendExisting(stream) {
// send failed, clear the current stream.
stream = nil
}
case u := <-t.sendCh.Get():
t.sendCh.Load()
var (
target []string
rType resource.ResourceType
version, nonce, errMsg string
send bool
)
switch update := u.(type) {
case *watchAction:
target, rType, version, nonce = t.processWatchInfo(update)
case *ackAction:
target, rType, version, nonce, send = t.processAckInfo(update, stream)
if !send {
continue
}
errMsg = update.errMsg
}
if stream == nil {
// There's no stream yet. Skip the request. This request
// will be resent to the new streams. If no stream is
// created, the watcher will timeout (same as server not
// sending response back).
continue
}
if err := t.vClient.SendRequest(stream, target, rType, version, nonce, errMsg); err != nil {
t.logger.Warnf("ADS request for {target: %q, type: %v, version: %q, nonce: %q} failed: %v", target, rType, version, nonce, err)
// send failed, clear the current stream.
stream = nil
}
}
}
}
// sendExisting sends out xDS requests for registered watchers when recovering
// from a broken stream.
//
// We call stream.Send() here with the lock being held. It should be OK to do
// that here because the stream has just started and Send() usually returns
// quickly (once it pushes the message onto the transport layer) and is only
// ever blocked if we don't have enough flow control quota.
func (t *Controller) sendExisting(stream grpc.ClientStream) bool {
t.mu.Lock()
defer t.mu.Unlock()
// Reset the ack versions when the stream restarts.
t.versionMap = make(map[resource.ResourceType]string)
t.nonceMap = make(map[resource.ResourceType]string)
for rType, s := range t.watchMap {
if err := t.vClient.SendRequest(stream, mapToSlice(s), rType, "", "", ""); err != nil {
t.logger.Warnf("ADS request failed: %v", err)
return false
}
}
return true
}
// recv receives xDS responses on the provided ADS stream and branches out to
// message specific handlers.
func (t *Controller) recv(stream grpc.ClientStream) bool {
success := false
for {
resp, err := t.vClient.RecvResponse(stream)
if err != nil {
t.updateHandler.NewConnectionError(err)
t.logger.Warnf("ADS stream is closed with error: %v", err)
return success
}
rType, version, nonce, err := t.handleResponse(resp)
if e, ok := err.(resourceversion.ErrResourceTypeUnsupported); ok {
t.logger.Warnf("%s", e.ErrStr)
continue
}
if err != nil {
t.sendCh.Put(&ackAction{
rType: rType,
version: "",
nonce: nonce,
errMsg: err.Error(),
stream: stream,
})
t.logger.Warnf("Sending NACK for response type: %v, version: %v, nonce: %v, reason: %v", rType, version, nonce, err)
continue
}
t.sendCh.Put(&ackAction{
rType: rType,
version: version,
nonce: nonce,
stream: stream,
})
t.logger.Infof("Sending ACK for response type: %v, version: %v, nonce: %v", rType, version, nonce)
success = true
}
}
func (t *Controller) handleResponse(resp proto.Message) (resource.ResourceType, string, string, error) {
rType, resources, version, nonce, err := t.vClient.ParseResponse(resp)
if err != nil {
return rType, version, nonce, err
}
opts := &resource.UnmarshalOptions{
Version: version,
Resources: resources,
Logger: t.logger,
UpdateValidator: t.updateValidator,
}
var md resource.UpdateMetadata
switch rType {
case resource.ListenerResource:
var update map[string]resource.ListenerUpdateErrTuple
update, md, err = resource.UnmarshalListener(opts)
t.updateHandler.NewListeners(update, md)
case resource.RouteConfigResource:
var update map[string]resource.RouteConfigUpdateErrTuple
update, md, err = resource.UnmarshalRouteConfig(opts)
t.updateHandler.NewRouteConfigs(update, md)
case resource.ClusterResource:
var update map[string]resource.ClusterUpdateErrTuple
update, md, err = resource.UnmarshalCluster(opts)
t.updateHandler.NewClusters(update, md)
case resource.EndpointsResource:
var update map[string]resource.EndpointsUpdateErrTuple
update, md, err = resource.UnmarshalEndpoints(opts)
t.updateHandler.NewEndpoints(update, md)
default:
return rType, "", "", resourceversion.ErrResourceTypeUnsupported{
ErrStr: fmt.Sprintf("Resource type %v unknown in response from server", rType),
}
}
return rType, version, nonce, err
}
func mapToSlice(m map[string]bool) []string {
ret := make([]string, 0, len(m))
for i := range m {
ret = append(ret, i)
}
return ret
}
type watchAction struct {
rType resource.ResourceType
remove bool // Whether this is to remove watch for the resource.
resource string
}
// processWatchInfo pulls the fields needed by the request from a watchAction.
//
// It also updates the watch map.
func (t *Controller) processWatchInfo(w *watchAction) (target []string, rType resource.ResourceType, ver, nonce string) {
t.mu.Lock()
defer t.mu.Unlock()
var current map[string]bool
current, ok := t.watchMap[w.rType]
if !ok {
current = make(map[string]bool)
t.watchMap[w.rType] = current
}
if w.remove {
delete(current, w.resource)
if len(current) == 0 {
delete(t.watchMap, w.rType)
}
} else {
current[w.resource] = true
}
rType = w.rType
target = mapToSlice(current)
// We don't reset version or nonce when a new watch is started. The version
// and nonce from previous response are carried by the request unless the
// stream is recreated.
ver = t.versionMap[rType]
nonce = t.nonceMap[rType]
return target, rType, ver, nonce
}
type ackAction struct {
rType resource.ResourceType
version string // NACK if version is an empty string.
nonce string
errMsg string // Empty unless it's a NACK.
// ACK/NACK are tagged with the stream it's for. When the stream is down,
// all the ACK/NACK for this stream will be dropped, and the version/nonce
// won't be updated.
stream grpc.ClientStream
}
// processAckInfo pulls the fields needed by the ack request from a ackAction.
//
// If no active watch is found for this ack, it returns false for send.
func (t *Controller) processAckInfo(ack *ackAction, stream grpc.ClientStream) (target []string, rType resource.ResourceType, version, nonce string, send bool) {
if ack.stream != stream {
// If ACK's stream isn't the current sending stream, this means the ACK
// was pushed to queue before the old stream broke, and a new stream has
// been started since. Return immediately here so we don't update the
// nonce for the new stream.
return nil, resource.UnknownResource, "", "", false
}
rType = ack.rType
t.mu.Lock()
defer t.mu.Unlock()
// Update the nonce no matter if we are going to send the ACK request on
// wire. We may not send the request if the watch is canceled. But the nonce
// needs to be updated so the next request will have the right nonce.
nonce = ack.nonce
t.nonceMap[rType] = nonce
s, ok := t.watchMap[rType]
if !ok || len(s) == 0 {
// We don't send the request ack if there's no active watch (this can be
// either the server sends responses before any request, or the watch is
// canceled while the ackAction is in queue), because there's no resource
// name. And if we send a request with empty resource name list, the
// server may treat it as a wild card and send us everything.
return nil, resource.UnknownResource, "", "", false
}
send = true
target = mapToSlice(s)
version = ack.version
if version == "" {
// This is a nack, get the previous acked version.
version = t.versionMap[rType]
// version will still be an empty string if rType isn't
// found in versionMap, this can happen if there wasn't any ack
// before.
} else {
t.versionMap[rType] = version
}
return target, rType, version, nonce, send
}
// reportLoad starts an LRS stream to report load data to the management server.
// It blocks until the context is canceled.
func (t *Controller) reportLoad(ctx context.Context, cc *grpc.ClientConn, opts controllerversion.LoadReportingOptions) {
retries := 0
for {
if ctx.Err() != nil {
return
}
if retries != 0 {
timer := time.NewTimer(t.backoff(retries))
select {
case <-timer.C:
case <-ctx.Done():
if !timer.Stop() {
<-timer.C
}
return
}
}
retries++
stream, err := t.vClient.NewLoadStatsStream(ctx, cc)
if err != nil {
t.logger.Warnf("lrs: failed to create stream: %v", err)
continue
}
t.logger.Infof("lrs: created LRS stream")
if err = t.vClient.SendFirstLoadStatsRequest(stream); err != nil {
t.logger.Warnf("lrs: failed to send first request: %v", err)
continue
}
clusters, interval, err := t.vClient.HandleLoadStatsResponse(stream)
if err != nil {
t.logger.Warnf("%v", err)
continue
}
retries = 0
t.sendLoads(ctx, stream, opts.LoadStore, clusters, interval)
}
}
func (t *Controller) sendLoads(ctx context.Context, stream grpc.ClientStream, store *load.Store, clusterNames []string, interval time.Duration) {
tick := time.NewTicker(interval)
defer tick.Stop()
for {
select {
case <-tick.C:
case <-ctx.Done():
return
}
if err := t.vClient.SendLoadStatsRequest(stream, store.Stats(clusterNames)); err != nil {
t.logger.Warnf("%v", err)
return
}
}
}