blob: f36988c7ce1b4912709a699d5207c66d904e5a58 [file] [log] [blame]
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package distribution
import (
"context"
"fmt"
"sync"
"time"
)
import (
"gopkg.in/yaml.v2"
"istio.io/pkg/ledger"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
v1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/utils/clock"
)
import (
"github.com/apache/dubbo-go-pixiu/pilot/pkg/status"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/xds"
"github.com/apache/dubbo-go-pixiu/pkg/config"
"github.com/apache/dubbo-go-pixiu/pkg/util/sets"
)
func GenStatusReporterMapKey(conID string, distributionType xds.EventType) string {
key := conID + "~" + distributionType
return key
}
type inProgressEntry struct {
// the resource, including resourceVersion, we are currently tracking
status.Resource
// the number of reports we have written with this resource at 100%
completedIterations int
}
type Reporter struct {
mu sync.RWMutex
// map from connection id to latest nonce
status map[string]string
// map from nonce to connection ids for which it is current
// using map[string]struct to approximate a hashset
reverseStatus map[string]sets.Set
inProgressResources map[string]*inProgressEntry
client v1.ConfigMapInterface
cm *corev1.ConfigMap
UpdateInterval time.Duration
PodName string
clock clock.Clock
ledger ledger.Ledger
distributionEventQueue chan distributionEvent
controller *Controller
}
var _ xds.DistributionStatusCache = &Reporter{}
const (
labelKey = "internal.istio.io/distribution-report"
dataField = "distribution-report"
)
// Init starts all the read only features of the reporter, used for nonce generation
// and responding to istioctl wait.
func (r *Reporter) Init(ledger ledger.Ledger, stop <-chan struct{}) {
r.ledger = ledger
if r.clock == nil {
r.clock = clock.RealClock{}
}
r.distributionEventQueue = make(chan distributionEvent, 100_000)
r.status = make(map[string]string)
r.reverseStatus = make(map[string]sets.Set)
r.inProgressResources = make(map[string]*inProgressEntry)
go r.readFromEventQueue(stop)
}
// Start starts the reporter, which watches dataplane ack's and resource changes so that it can update status leader
// with distribution information.
func (r *Reporter) Start(clientSet kubernetes.Interface, namespace string, podname string, stop <-chan struct{}) {
scope.Info("Starting status follower controller")
r.client = clientSet.CoreV1().ConfigMaps(namespace)
r.cm = &corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: r.PodName + "-distribution",
Labels: map[string]string{labelKey: "true"},
},
Data: make(map[string]string),
}
t := r.clock.Tick(r.UpdateInterval)
ctx := status.NewIstioContext(stop)
x, err := clientSet.CoreV1().Pods(namespace).Get(ctx, podname, metav1.GetOptions{})
if err != nil {
scope.Errorf("can't identify pod %s context: %s", podname, err)
} else {
r.cm.OwnerReferences = []metav1.OwnerReference{
*metav1.NewControllerRef(x, metav1.SchemeGroupVersion.WithKind("Pod")),
}
}
go func() {
for {
select {
case <-ctx.Done():
if r.cm != nil {
// TODO: is the use of a cancelled context here a problem? Maybe set a short timeout context?
if err := r.client.Delete(context.Background(), r.cm.Name, metav1.DeleteOptions{}); err != nil {
scope.Errorf("failed to properly clean up distribution report: %v", err)
}
}
close(r.distributionEventQueue)
return
case <-t:
// TODO, check if report is necessary? May already be handled by client
r.writeReport(ctx)
}
}
}()
}
// build a distribution report to send to status leader
func (r *Reporter) buildReport() (Report, []status.Resource) {
r.mu.RLock()
defer r.mu.RUnlock()
var finishedResources []status.Resource
out := Report{
Reporter: r.PodName,
DataPlaneCount: len(r.status),
InProgressResources: map[string]int{},
}
// for every resource in flight
for _, ipr := range r.inProgressResources {
res := ipr.Resource
key := res.String()
// for every version (nonce) of the config currently in play
for nonce, dataplanes := range r.reverseStatus {
// check to see if this version of the config contains this version of the resource
// it might be more optimal to provide for a full dump of the config at a certain version?
dpVersion, err := r.ledger.GetPreviousValue(nonce, res.ToModelKey())
if err == nil && dpVersion == res.Generation {
if _, ok := out.InProgressResources[key]; !ok {
out.InProgressResources[key] = len(dataplanes)
} else {
out.InProgressResources[key] += len(dataplanes)
}
} else if err != nil {
scope.Errorf("Encountered error retrieving version %s of key %s from Store: %v", nonce, key, err)
continue
} else if nonce == r.ledger.RootHash() {
scope.Warnf("Cache appears to be missing latest version of %s", key)
}
if out.InProgressResources[key] >= out.DataPlaneCount {
// if this resource is done reconciling, let's not worry about it anymore
finishedResources = append(finishedResources, res)
// deleting it here doesn't work because we have a read lock and are inside an iterator.
// TODO: this will leak when a resource never reaches 100% before it is replaced.
// TODO: do deletes propagate through this thing?
}
}
}
return out, finishedResources
}
// For efficiency, we don't want to be checking on resources that have already reached 100% distribution.
// When this happens, we remove them from our watch list.
func (r *Reporter) removeCompletedResource(completedResources []status.Resource) {
r.mu.Lock()
defer r.mu.Unlock()
var toDelete []status.Resource
for _, item := range completedResources {
// TODO: handle cache miss
// if cache miss, need to skip current loop, otherwise is will cause errors like
// invalid memory address or nil pointer dereference
if _, ok := r.inProgressResources[item.ToModelKey()]; !ok {
continue
}
total := r.inProgressResources[item.ToModelKey()].completedIterations + 1
if int64(total) > (time.Minute.Milliseconds() / r.UpdateInterval.Milliseconds()) {
// remove from inProgressResources
// TODO: cleanup completedResources
toDelete = append(toDelete, item)
} else {
r.inProgressResources[item.ToModelKey()].completedIterations = total
}
}
for _, resource := range toDelete {
delete(r.inProgressResources, resource.ToModelKey())
}
}
// AddInProgressResource must be called every time a resource change is detected by pilot. This allows us to lookup
// only the resources we expect to be in flight, not the ones that have already distributed
func (r *Reporter) AddInProgressResource(res config.Config) {
tryLedgerPut(r.ledger, res)
myRes := status.ResourceFromModelConfig(res)
if myRes == (status.Resource{}) {
scope.Errorf("Unable to locate schema for %v, will not update status.", res)
return
}
r.mu.Lock()
defer r.mu.Unlock()
r.inProgressResources[myRes.ToModelKey()] = &inProgressEntry{
Resource: myRes,
completedIterations: 0,
}
}
func (r *Reporter) DeleteInProgressResource(res config.Config) {
tryLedgerDelete(r.ledger, res)
if r.controller != nil {
r.controller.configDeleted(res)
}
r.mu.Lock()
defer r.mu.Unlock()
delete(r.inProgressResources, res.Key())
}
// generate a distribution report and write it to a ConfigMap for the leader to read.
func (r *Reporter) writeReport(ctx context.Context) {
report, finishedResources := r.buildReport()
go r.removeCompletedResource(finishedResources)
// write to kubernetes here.
reportbytes, err := yaml.Marshal(report)
if err != nil {
scope.Errorf("Error serializing Distribution Report: %v", err)
return
}
r.cm.Data[dataField] = string(reportbytes)
// TODO: short circuit this write in the leader
_, err = CreateOrUpdateConfigMap(ctx, r.cm, r.client)
if err != nil {
scope.Errorf("Error writing Distribution Report: %v", err)
}
}
// CreateOrUpdateConfigMap is lifted with few modifications from kubeadm's apiclient
func CreateOrUpdateConfigMap(ctx context.Context, cm *corev1.ConfigMap, client v1.ConfigMapInterface) (res *corev1.ConfigMap, err error) {
if res, err = client.Create(ctx, cm, metav1.CreateOptions{}); err != nil {
if !apierrors.IsAlreadyExists(err) {
scope.Errorf("%v", err)
return nil, fmt.Errorf("unable to create ConfigMap: %w", err)
}
if res, err = client.Update(context.TODO(), cm, metav1.UpdateOptions{}); err != nil {
return nil, fmt.Errorf("unable to update ConfigMap: %w", err)
}
}
return res, nil
}
type distributionEvent struct {
conID string
distributionType xds.EventType
nonce string
}
func (r *Reporter) QueryLastNonce(conID string, distributionType xds.EventType) (noncePrefix string) {
key := GenStatusReporterMapKey(conID, distributionType)
r.mu.RLock()
defer r.mu.RUnlock()
return r.status[key]
}
// Register that a dataplane has acknowledged a new version of the config.
// Theoretically, we could use the ads connections themselves to harvest this data,
// but the mutex there is pretty hot, and it seems best to trade memory for time.
func (r *Reporter) RegisterEvent(conID string, distributionType xds.EventType, nonce string) {
// Skip unsupported event types. This ensures we do not leak memory for types
// which may not be handled properly. For example, a type not in AllEventTypes
// will not be properly unregistered.
if _, f := xds.AllEventTypes[distributionType]; !f {
return
}
d := distributionEvent{nonce: nonce, distributionType: distributionType, conID: conID}
select {
case r.distributionEventQueue <- d:
return
default:
scope.Errorf("Distribution Event Queue overwhelmed, status will be invalid.")
}
}
func (r *Reporter) readFromEventQueue(stop <-chan struct{}) {
for {
select {
case ev := <-r.distributionEventQueue:
// TODO might need to batch this to prevent lock contention
r.processEvent(ev.conID, ev.distributionType, ev.nonce)
case <-stop:
return
}
}
}
func (r *Reporter) processEvent(conID string, distributionType xds.EventType, nonce string) {
r.mu.Lock()
defer r.mu.Unlock()
key := GenStatusReporterMapKey(conID, distributionType)
r.deleteKeyFromReverseMap(key)
var version string
if len(nonce) > 12 {
version = nonce[:xds.VersionLen]
} else {
version = nonce
}
// touch
r.status[key] = version
if _, ok := r.reverseStatus[version]; !ok {
r.reverseStatus[version] = sets.New()
}
r.reverseStatus[version].Insert(key)
}
// This is a helper function for keeping our reverseStatus map in step with status.
// must have write lock before calling.
func (r *Reporter) deleteKeyFromReverseMap(key string) {
if old, ok := r.status[key]; ok {
if keys, ok := r.reverseStatus[old]; ok {
keys.Delete(key)
if keys.IsEmpty() {
delete(r.reverseStatus, old)
}
}
}
}
// RegisterDisconnect : when a dataplane disconnects, we should no longer count it, nor expect it to ack config.
func (r *Reporter) RegisterDisconnect(conID string, types []xds.EventType) {
r.mu.Lock()
defer r.mu.Unlock()
for _, xdsType := range types {
key := GenStatusReporterMapKey(conID, xdsType)
r.deleteKeyFromReverseMap(key)
delete(r.status, key)
}
}
func (r *Reporter) SetController(controller *Controller) {
r.controller = controller
}