blob: e94dec02debc34ee966e93ae9d9abf1d9762cd4a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package data_loader
import (
"bytes"
"context"
"fmt"
"path"
"reflect"
"github.com/gin-gonic/gin"
"github.com/juliangruber/go-intersect"
"github.com/pkg/errors"
"github.com/shiningrush/droplet"
"github.com/shiningrush/droplet/wrapper"
wgin "github.com/shiningrush/droplet/wrapper/gin"
"github.com/apisix/manager-api/internal/conf"
"github.com/apisix/manager-api/internal/core/entity"
"github.com/apisix/manager-api/internal/core/store"
"github.com/apisix/manager-api/internal/handler"
loader "github.com/apisix/manager-api/internal/handler/data_loader/loader"
"github.com/apisix/manager-api/internal/handler/data_loader/loader/openapi3"
)
type ImportHandler struct {
routeStore store.Interface
upstreamStore store.Interface
serviceStore store.Interface
consumerStore store.Interface
sslStore store.Interface
streamRouteStore store.Interface
globalPluginStore store.Interface
pluginConfigStore store.Interface
protoStore store.Interface
}
func NewImportHandler() (handler.RouteRegister, error) {
return &ImportHandler{
routeStore: store.GetStore(store.HubKeyRoute),
upstreamStore: store.GetStore(store.HubKeyUpstream),
serviceStore: store.GetStore(store.HubKeyService),
consumerStore: store.GetStore(store.HubKeyConsumer),
sslStore: store.GetStore(store.HubKeySsl),
streamRouteStore: store.GetStore(store.HubKeyStreamRoute),
globalPluginStore: store.GetStore(store.HubKeyGlobalRule),
pluginConfigStore: store.GetStore(store.HubKeyPluginConfig),
protoStore: store.GetStore(store.HubKeyProto),
}, nil
}
func (h *ImportHandler) ApplyRoute(r *gin.Engine) {
r.POST("/apisix/admin/import/routes", wgin.Wraps(h.Import,
wrapper.InputType(reflect.TypeOf(ImportInput{}))))
}
type ImportResult struct {
Total int `json:"total"`
Failed int `json:"failed"`
Errors []string `json:"errors"`
}
type LoaderType string
type ImportInput struct {
Type string `auto_read:"type"`
TaskName string `auto_read:"task_name"`
FileName string `auto_read:"_file"`
FileContent []byte `auto_read:"file"`
MergeMethod string `auto_read:"merge_method"`
}
const (
LoaderTypeOpenAPI3 LoaderType = "openapi3"
)
func (h *ImportHandler) Import(c droplet.Context) (interface{}, error) {
input := c.Input().(*ImportInput)
// input file content check
suffix := path.Ext(input.FileName)
if suffix != ".json" && suffix != ".yaml" && suffix != ".yml" {
return nil, errors.Errorf("required file type is .yaml, .yml or .json but got: %s", suffix)
}
contentLen := bytes.Count(input.FileContent, nil) - 1
if contentLen <= 0 {
return nil, errors.New("uploaded file is empty")
}
if contentLen > conf.ImportSizeLimit {
return nil, errors.Errorf("uploaded file size exceeds the limit, limit is %d", conf.ImportSizeLimit)
}
var l loader.Loader
switch LoaderType(input.Type) {
case LoaderTypeOpenAPI3:
l = &openapi3.Loader{
MergeMethod: input.MergeMethod == "true",
TaskName: input.TaskName,
}
break
default:
return nil, fmt.Errorf("unsupported data loader type: %s", input.Type)
}
dataSets, err := l.Import(input.FileContent)
if err != nil {
return nil, err
}
// Pre-checking for route duplication
preCheckErrs := h.preCheck(c.Context(), dataSets)
if _, ok := preCheckErrs[store.HubKeyRoute]; ok && len(preCheckErrs[store.HubKeyRoute]) > 0 {
return h.convertToImportResult(dataSets, preCheckErrs), nil
}
// Create APISIX resources
createErrs := h.createEntities(c.Context(), dataSets)
return h.convertToImportResult(dataSets, createErrs), nil
}
// Pre-check imported data for duplicates
// The main problem facing duplication is routing, so here
// we mainly check the duplication of routes, based on
// domain name and uri.
func (h *ImportHandler) preCheck(ctx context.Context, data *loader.DataSets) map[store.HubKey][]string {
errs := make(map[store.HubKey][]string)
for _, route := range data.Routes {
errs[store.HubKeyRoute] = make([]string, 0)
o, err := h.routeStore.List(ctx, store.ListInput{
// The check logic here is that if when a duplicate HOST or URI
// has been found, the HTTP method is checked for overlap, and
// if there is overlap it is determined to be a duplicate route
// and the import is rejected.
Predicate: func(obj interface{}) bool {
r := obj.(*entity.Route)
// Check URI and host duplication
isURIDuplicated := r.URI != "" && route.URI != "" && r.URI == route.URI
isURIsDuplicated := len(r.Uris) > 0 && len(route.Uris) > 0 &&
len(intersect.Hash(r.Uris, route.Uris)) > 0
isMethodDuplicated := len(intersect.Hash(r.Methods, route.Methods)) > 0
// First check for duplicate URIs
if isURIDuplicated || isURIsDuplicated {
// Then check if the host field exists, and if it does, check for duplicates
if r.Host != "" && route.Host != "" {
return r.Host == route.Host && isMethodDuplicated
} else if len(r.Hosts) > 0 && len(route.Hosts) > 0 {
return len(intersect.Hash(r.Hosts, route.Hosts)) > 0 && isMethodDuplicated
}
// If the host field does not exist, only the presence or absence
// of HTTP method duplication is returned by default.
return isMethodDuplicated
}
return false
},
PageSize: 0,
PageNumber: 0,
})
if err != nil {
// When a special storage layer error occurs, return directly.
return map[store.HubKey][]string{
store.HubKeyRoute: {err.Error()},
}
}
// Duplicate routes found
if o.TotalSize > 0 {
for _, row := range o.Rows {
r, ok := row.(*entity.Route)
if ok {
errs[store.HubKeyRoute] = append(errs[store.HubKeyRoute],
errors.Errorf("%s is duplicated with route %s",
route.Uris[0],
r.Name).
Error())
}
}
}
}
return errs
}
// Create parsed resources
func (h *ImportHandler) createEntities(ctx context.Context, data *loader.DataSets) map[store.HubKey][]string {
errs := make(map[store.HubKey][]string)
for _, route := range data.Routes {
_, err := h.routeStore.Create(ctx, &route)
if err != nil {
errs[store.HubKeyRoute] = append(errs[store.HubKeyRoute], err.Error())
}
}
for _, upstream := range data.Upstreams {
_, err := h.upstreamStore.Create(ctx, &upstream)
if err != nil {
errs[store.HubKeyUpstream] = append(errs[store.HubKeyUpstream], err.Error())
}
}
for _, service := range data.Services {
_, err := h.serviceStore.Create(ctx, &service)
if err != nil {
errs[store.HubKeyService] = append(errs[store.HubKeyService], err.Error())
}
}
for _, consumer := range data.Consumers {
_, err := h.consumerStore.Create(ctx, &consumer)
if err != nil {
errs[store.HubKeyConsumer] = append(errs[store.HubKeyConsumer], err.Error())
}
}
for _, ssl := range data.SSLs {
_, err := h.sslStore.Create(ctx, &ssl)
if err != nil {
errs[store.HubKeySsl] = append(errs[store.HubKeySsl], err.Error())
}
}
for _, route := range data.StreamRoutes {
_, err := h.streamRouteStore.Create(ctx, &route)
if err != nil {
errs[store.HubKeyStreamRoute] = append(errs[store.HubKeyStreamRoute], err.Error())
}
}
for _, plugin := range data.GlobalPlugins {
_, err := h.globalPluginStore.Create(ctx, &plugin)
if err != nil {
errs[store.HubKeyGlobalRule] = append(errs[store.HubKeyGlobalRule], err.Error())
}
}
for _, config := range data.PluginConfigs {
_, err := h.pluginConfigStore.Create(ctx, &config)
if err != nil {
errs[store.HubKeyPluginConfig] = append(errs[store.HubKeyPluginConfig], err.Error())
}
}
for _, proto := range data.Protos {
_, err := h.protoStore.Create(ctx, &proto)
if err != nil {
errs[store.HubKeyProto] = append(errs[store.HubKeyProto], err.Error())
}
}
return errs
}
// Convert import errors to response result
func (ImportHandler) convertToImportResult(data *loader.DataSets, errs map[store.HubKey][]string) map[store.HubKey]ImportResult {
return map[store.HubKey]ImportResult{
store.HubKeyRoute: {
Total: len(data.Routes),
Failed: len(errs[store.HubKeyRoute]),
Errors: errs[store.HubKeyRoute],
},
store.HubKeyUpstream: {
Total: len(data.Upstreams),
Failed: len(errs[store.HubKeyUpstream]),
Errors: errs[store.HubKeyUpstream],
},
store.HubKeyService: {
Total: len(data.Services),
Failed: len(errs[store.HubKeyService]),
Errors: errs[store.HubKeyService],
},
store.HubKeyConsumer: {
Total: len(data.Consumers),
Failed: len(errs[store.HubKeyConsumer]),
Errors: errs[store.HubKeyConsumer],
},
store.HubKeySsl: {
Total: len(data.SSLs),
Failed: len(errs[store.HubKeySsl]),
Errors: errs[store.HubKeySsl],
},
store.HubKeyStreamRoute: {
Total: len(data.StreamRoutes),
Failed: len(errs[store.HubKeyStreamRoute]),
Errors: errs[store.HubKeyStreamRoute],
},
store.HubKeyGlobalRule: {
Total: len(data.GlobalPlugins),
Failed: len(errs[store.HubKeyGlobalRule]),
Errors: errs[store.HubKeyGlobalRule],
},
store.HubKeyPluginConfig: {
Total: len(data.PluginConfigs),
Failed: len(errs[store.HubKeyPluginConfig]),
Errors: errs[store.HubKeyPluginConfig],
},
store.HubKeyProto: {
Total: len(data.Protos),
Failed: len(errs[store.HubKeyProto]),
Errors: errs[store.HubKeyProto],
},
}
}