blob: 0db73728065286cd636e8cb1511a065075d2d80f [file] [log] [blame]
// Licensed to the Apache Software Foundation (ASF) under one or more
// contributor license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright ownership.
// The ASF licenses this file to You under the Apache License, Version 2.0
// (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package apisix
import (
"context"
"fmt"
"strconv"
"strings"
"go.uber.org/zap"
"github.com/apache/apisix-ingress-controller/pkg/apisix/cache"
"github.com/apache/apisix-ingress-controller/pkg/log"
v1 "github.com/apache/apisix-ingress-controller/pkg/types/apisix/v1"
)
// to do: Delete one of the upstreams. Currently, only service is deleted. There will be some
// redundant upstream objects, but the results will not be affected. It is hoped that the service controller
// can complete the update nodes logic to avoid the intrusion of relation modules into more code.
// Maintain relationships only when resolveGranularity is endpoint
// There is no need to ensure the consistency between the upstream to services, only need to ensure that the upstream-node can be delete after deleting the service
type upstreamService struct {
cluster *cluster
}
func newUpstreamServiceRelation(c *cluster) *upstreamService {
return &upstreamService{
cluster: c,
}
}
func (u *upstreamService) Get(ctx context.Context, serviceName string) (*v1.UpstreamServiceRelation, error) {
log.Debugw("try to get upstreamService in cache",
zap.String("service_name", serviceName),
zap.String("cluster", u.cluster.name),
)
us, err := u.cluster.cache.GetUpstreamServiceRelation(serviceName)
if err != nil && err != cache.ErrNotFound {
log.Error("failed to find upstreamService in cache",
zap.String("service_name", serviceName), zap.Error(err))
return nil, err
}
return us, err
}
func (u *upstreamService) Delete(ctx context.Context, serviceName string) error {
log.Debugw("try to delete upstreamService in cache",
zap.String("cluster", u.cluster.name),
)
relation, err := u.Get(ctx, serviceName)
if err != nil {
if err == cache.ErrNotFound {
return nil
}
return err
}
_ = u.cluster.cache.DeleteUpstreamServiceRelation(relation)
for upsName := range relation.UpstreamNames {
ups, err := u.cluster.upstream.Get(ctx, upsName)
if err != nil {
continue
}
ups.Nodes = make(v1.UpstreamNodes, 0)
log.Debugw("try to update upstream in cluster",
zap.Any("upstream", ups),
)
_, err = u.cluster.upstream.Update(ctx, ups)
if err != nil {
log.Error(err)
continue
}
}
return nil
}
func (u *upstreamService) Create(ctx context.Context, upstreamName string) error {
log.Debugw("try to create upstreamService in cache",
zap.String("cluster", u.cluster.name),
)
args := strings.Split(upstreamName, "_")
if len(args) < 2 {
return fmt.Errorf("wrong upstream name %s, must contains namespace_name", upstreamName)
}
// The last part of upstreanName should be a port number.
// Please refer to apisixv1.ComposeUpstreamName to see the detailed format.
_, err := strconv.Atoi(args[len(args)-1])
if err != nil {
return nil
}
serviceName := args[0] + "_" + args[1]
relation, err := u.Get(ctx, serviceName)
if err != nil && err != cache.ErrNotFound {
return err
}
if relation == nil {
relation = &v1.UpstreamServiceRelation{
ServiceName: serviceName,
UpstreamNames: map[string]struct{}{
upstreamName: {},
},
}
} else {
relation.UpstreamNames[upstreamName] = struct{}{}
}
if err := u.cluster.cache.InsertUpstreamServiceRelation(relation); err != nil {
log.Errorf("failed to reflect upstreamService create to cache: %s", err)
return err
}
return nil
}
func (u *upstreamService) List(ctx context.Context) ([]*v1.UpstreamServiceRelation, error) {
log.Debugw("try to create upstreamService in cache",
zap.String("cluster", u.cluster.name),
)
usrs, err := u.cluster.cache.ListUpstreamServiceRelation()
if err != nil {
log.Errorw("failed to list upstream in cache",
zap.Error(err),
)
return nil, err
}
return usrs, nil
}