blob: 9265cd90669b2cab38f3731dd27bfee3aad868cd [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package state
import (
"context"
"errors"
"sync"
"github.com/apache/apisix-ingress-controller/pkg/apisix/cache"
"github.com/apache/apisix-ingress-controller/pkg/id"
"github.com/apache/apisix-ingress-controller/pkg/log"
"github.com/apache/apisix-ingress-controller/pkg/seven/conf"
"github.com/apache/apisix-ingress-controller/pkg/seven/utils"
v1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
)
const (
ApisixUpstream = "ApisixUpstream"
WatchFromKind = "watch"
)
// paddingRoute fills route through currentRoute, it returns a boolean
// value to indicate whether the route is a new created one.
func paddingRoute(route *v1.Route, currentRoute *v1.Route) bool {
if currentRoute == nil {
route.ID = id.GenID(route.FullName)
return true
}
route.ID = currentRoute.ID
return false
}
// paddingService fills service through currentService, it returns a boolean
// value to indicate whether the service is a new created one.
func paddingService(service *v1.Service, currentService *v1.Service) bool {
if currentService == nil {
service.ID = id.GenID(service.FullName)
return true
}
service.ID = currentService.ID
return false
}
// paddingUpstream fills upstream through currentUpstream, it returns a boolean
// value to indicate whether the upstream is a new created one.
func paddingUpstream(upstream *v1.Upstream, currentUpstream *v1.Upstream) bool {
if currentUpstream == nil {
upstream.ID = id.GenID(upstream.FullName)
return true
}
upstream.ID = currentUpstream.ID
return false
}
// NewRouteWorkers make routeWorkers group by service per CRD
// 1.make routes group by (1_2_3) it may be a map like map[1_2_3][]Route;
// 2.route is listening Event from the ready of 1_2_3;
func NewRouteWorkers(ctx context.Context,
routes []*v1.Route, wg *sync.WaitGroup, errorChan chan CRDStatus) RouteWorkerGroup {
rwg := make(RouteWorkerGroup)
for _, r := range routes {
rw := &routeWorker{Route: r, Ctx: ctx, Wg: wg, ErrorChan: errorChan}
rw.start()
rwg.Add(r.UpstreamName, rw)
}
return rwg
}
// 3.route get the Event and trigger a padding for object,then diff,sync;
func (r *routeWorker) trigger() {
var (
op string
errNotify error
)
defer func() {
if errNotify != nil {
r.ErrorChan <- CRDStatus{Id: "", Status: "failure", Err: errNotify}
}
r.Wg.Done()
}()
// padding
var cluster string
if r.Route.Group != "" {
cluster = r.Route.Group
}
currentRoute, err := conf.Client.Cluster(cluster).Route().Get(context.TODO(), r.Route.FullName)
if err != nil && !errors.Is(err, cache.ErrNotFound) {
errNotify = err
return
}
if paddingRoute(r.Route, currentRoute) {
op = Create
} else {
op = Update
}
hasDiff, err := utils.HasDiff(r.Route, currentRoute)
if err != nil {
errNotify = err
return
}
if hasDiff {
err := r.sync(op)
if err != nil {
errNotify = err
return
}
}
}
// sync
func (r *routeWorker) sync(op string) error {
var cluster string
if r.Group != "" {
cluster = r.Group
}
if op == Update {
if _, err := conf.Client.Cluster(cluster).Route().Update(context.TODO(), r.Route); err != nil {
log.Errorf("failed to update route %s: %s, ", r.Name, err)
return err
}
log.Infof("update route %s, %s", r.Name, r.ServiceId)
} else {
route, err := conf.Client.Cluster(cluster).Route().Create(context.TODO(), r.Route)
if err != nil {
log.Errorf("failed to create route: %s", err.Error())
return err
}
r.ID = route.ID
}
log.Infof("create route %s, %s", r.Name, r.ServiceId)
return nil
}
// upstream
func SolverUpstream(upstreams []*v1.Upstream, rwg RouteWorkerGroup, wg *sync.WaitGroup, errorChan chan CRDStatus) {
for _, u := range upstreams {
go SolverSingleUpstream(u, rwg, wg, errorChan)
}
}
func SolverSingleUpstream(u *v1.Upstream, rwg RouteWorkerGroup, wg *sync.WaitGroup, errorChan chan CRDStatus) {
var (
op string
errNotify error
)
defer func() {
if errNotify != nil {
errorChan <- CRDStatus{Id: "", Status: "failure", Err: errNotify}
}
wg.Done()
}()
var cluster string
if u.Group != "" {
cluster = u.Group
}
if currentUpstream, err := conf.Client.Cluster(cluster).Upstream().Get(context.TODO(), u.FullName); err != nil && err != cache.ErrNotFound {
log.Errorf("failed to find upstream %s: %s", u.FullName, err)
errNotify = err
return
} else {
if paddingUpstream(u, currentUpstream) {
op = Create
} else {
op = Update
}
if op == Create {
if u.FromKind == WatchFromKind {
// We don't have a pre-defined upstream and the current upstream updating from
// endpoints.
return
}
if _, err := conf.Client.Cluster(cluster).Upstream().Create(context.TODO(), u); err != nil {
log.Errorf("failed to create upstream %s: %s", u.FullName, err)
return
}
} else {
// diff
hasDiff, err := utils.HasDiff(u, currentUpstream)
if err != nil {
errNotify = err
return
}
if hasDiff {
op = Update
// 0.field check
needToUpdate := true
if currentUpstream.FromKind == ApisixUpstream { // update from ApisixUpstream
if u.FromKind != ApisixUpstream {
// currentUpstream > u
// set lb && health check
needToUpdate = false
}
}
if needToUpdate || u.FromKind == WatchFromKind {
if u.FromKind == WatchFromKind {
currentUpstream.Nodes = u.Nodes
} else { // due to CRD update
currentUpstream = u
}
if _, err = conf.Client.Cluster(cluster).Upstream().Update(context.TODO(), currentUpstream); err != nil {
log.Errorf("failed to update upstream %s: %s", u.FullName, err)
return
}
}
}
}
}
log.Infof("solver upstream %s:%s", op, u.Name)
// anyway, broadcast to route
for _, sw := range rwg[u.Name] {
sw.Event <- Event{}
}
}