blob: 69a89203c4ea67b3e3cf39efcdd0722183355866 [file] [log] [blame]
/*
Copyright Istio Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package file
import (
"bufio"
"bytes"
"crypto/sha256"
"encoding/json"
"errors"
"fmt"
"io"
"strings"
"sync"
)
import (
"github.com/hashicorp/go-multierror"
yamlv3 "gopkg.in/yaml.v3"
"istio.io/pkg/log"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"
kubeJson "k8s.io/apimachinery/pkg/runtime/serializer/json"
"k8s.io/apimachinery/pkg/util/yaml"
"k8s.io/client-go/tools/cache"
)
import (
kubeyaml2 "github.com/apache/dubbo-go-pixiu/pilot/pkg/config/file/util/kubeyaml"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/config/memory"
"github.com/apache/dubbo-go-pixiu/pilot/pkg/model"
"github.com/apache/dubbo-go-pixiu/pkg/config"
kube2 "github.com/apache/dubbo-go-pixiu/pkg/config/legacy/source/kube"
"github.com/apache/dubbo-go-pixiu/pkg/config/resource"
"github.com/apache/dubbo-go-pixiu/pkg/config/schema/collection"
schemaresource "github.com/apache/dubbo-go-pixiu/pkg/config/schema/resource"
"github.com/apache/dubbo-go-pixiu/pkg/kube"
"github.com/apache/dubbo-go-pixiu/pkg/util/sets"
)
var (
inMemoryKubeNameDiscriminator int64
scope = log.RegisterScope("file", "File client messages", 0)
)
// KubeSource is an in-memory source implementation that can handle K8s style resources.
type KubeSource struct {
mu sync.Mutex
name string
schemas *collection.Schemas
inner model.ConfigStore
defaultNs resource.Namespace
versionCtr int64
shas map[kubeResourceKey]resourceSha
byFile map[string]map[kubeResourceKey]config.GroupVersionKind
}
func (s *KubeSource) Schemas() collection.Schemas {
return *s.schemas
}
func (s *KubeSource) Get(typ config.GroupVersionKind, name, namespace string) *config.Config {
return s.inner.Get(typ, name, namespace)
}
func (s *KubeSource) List(typ config.GroupVersionKind, namespace string) ([]config.Config, error) {
return s.inner.List(typ, namespace)
}
func (s *KubeSource) Create(config config.Config) (revision string, err error) {
return s.inner.Create(config)
}
func (s *KubeSource) Update(config config.Config) (newRevision string, err error) {
return s.inner.Update(config)
}
func (s *KubeSource) UpdateStatus(config config.Config) (newRevision string, err error) {
return s.inner.UpdateStatus(config)
}
func (s *KubeSource) Patch(orig config.Config, patchFn config.PatchFunc) (string, error) {
return s.inner.Patch(orig, patchFn)
}
func (s *KubeSource) Delete(typ config.GroupVersionKind, name, namespace string, resourceVersion *string) error {
return s.inner.Delete(typ, name, namespace, resourceVersion)
}
func (s *KubeSource) RegisterEventHandler(kind config.GroupVersionKind, handler model.EventHandler) {
panic("implement me")
}
func (s *KubeSource) Run(stop <-chan struct{}) {
}
func (s *KubeSource) SetWatchErrorHandler(f func(r *cache.Reflector, err error)) error {
panic("implement me")
}
func (s *KubeSource) HasSynced() bool {
return true
}
type resourceSha [sha256.Size]byte
type kubeResource struct {
// resource *resource.Instance
config *config.Config
schema collection.Schema
sha resourceSha
}
func (r *kubeResource) newKey() kubeResourceKey {
return kubeResourceKey{
kind: r.schema.Resource().Kind(),
fullName: r.fullName(),
}
}
func (r *kubeResource) fullName() resource.FullName {
return resource.NewFullName(resource.Namespace(r.config.Namespace),
resource.LocalName(r.config.Name))
}
type kubeResourceKey struct {
fullName resource.FullName
kind string
}
var _ model.ConfigStore = &KubeSource{}
// NewKubeSource returns a new in-memory Source that works with Kubernetes resources.
func NewKubeSource(schemas collection.Schemas) *KubeSource {
name := fmt.Sprintf("kube-inmemory-%d", inMemoryKubeNameDiscriminator)
inMemoryKubeNameDiscriminator++
return &KubeSource{
name: name,
schemas: &schemas,
inner: memory.MakeSkipValidation(schemas),
shas: make(map[kubeResourceKey]resourceSha),
byFile: make(map[string]map[kubeResourceKey]config.GroupVersionKind),
}
}
// SetDefaultNamespace enables injecting a default namespace for resources where none is already specified
func (s *KubeSource) SetDefaultNamespace(defaultNs resource.Namespace) {
s.defaultNs = defaultNs
}
// Clear the contents of this source
func (s *KubeSource) Clear() {
s.versionCtr = 0
s.shas = make(map[kubeResourceKey]resourceSha)
s.byFile = make(map[string]map[kubeResourceKey]config.GroupVersionKind)
s.inner = memory.MakeSkipValidation(*s.schemas)
}
// ContentNames returns the names known to this source.
func (s *KubeSource) ContentNames() map[string]struct{} {
s.mu.Lock()
defer s.mu.Unlock()
result := sets.New()
for n := range s.byFile {
result.Insert(n)
}
return result
}
// ApplyContent applies the given yamltext to this source. The content is tracked with the given name. If ApplyContent
// gets called multiple times with the same name, the contents applied by the previous incarnation will be overwritten
// or removed, depending on the new content.
// Returns an error if any were encountered, but that still may represent a partial success
func (s *KubeSource) ApplyContent(name, yamlText string) error {
s.mu.Lock()
defer s.mu.Unlock()
// We hold off on dealing with parseErr until the end, since partial success is possible
resources, parseErrs := s.parseContent(s.schemas, name, yamlText)
oldKeys := s.byFile[name]
newKeys := make(map[kubeResourceKey]config.GroupVersionKind)
for _, r := range resources {
key := r.newKey()
oldSha, found := s.shas[key]
if !found || oldSha != r.sha {
s.versionCtr++
r.config.ResourceVersion = fmt.Sprintf("v%d", s.versionCtr)
scope.Debug("KubeSource.ApplyContent: Set: ", r.schema.Name(), r.fullName())
// apply is idempotent, but configstore is not, thus the odd logic here
_, err := s.inner.Update(*r.config)
if err != nil {
_, err = s.inner.Create(*r.config)
if err != nil {
return fmt.Errorf("cannot store config %s/%s %s from reader: %s",
r.schema.Resource().Version(), r.schema.Resource().Kind(), r.fullName(), err)
}
}
s.shas[key] = r.sha
}
newKeys[key] = r.schema.Resource().GroupVersionKind()
if oldKeys != nil {
scope.Debug("KubeSource.ApplyContent: Delete: ", r.schema.Name(), key)
delete(oldKeys, key)
}
}
for k, col := range oldKeys {
empty := ""
err := s.inner.Delete(col, k.fullName.Name.String(), k.fullName.Namespace.String(), &empty)
if err != nil {
scope.Errorf("encountered unexpected error removing resource from filestore: %s", err)
}
}
s.byFile[name] = newKeys
if parseErrs != nil {
return fmt.Errorf("errors parsing content %q: %v", name, parseErrs)
}
return nil
}
// RemoveContent removes the content for the given name
func (s *KubeSource) RemoveContent(name string) {
s.mu.Lock()
defer s.mu.Unlock()
keys := s.byFile[name]
if keys != nil {
for key, col := range keys {
empty := ""
err := s.inner.Delete(col, key.fullName.Name.String(), key.fullName.Namespace.String(), &empty)
if err != nil {
scope.Errorf("encountered unexpected error removing resource from filestore: %s", err)
}
delete(s.shas, key)
}
delete(s.byFile, name)
}
}
func (s *KubeSource) parseContent(r *collection.Schemas, name, yamlText string) ([]kubeResource, error) {
var resources []kubeResource
var errs error
reader := bufio.NewReader(strings.NewReader(yamlText))
decoder := kubeyaml2.NewYAMLReader(reader)
chunkCount := -1
for {
chunkCount++
doc, lineNum, err := decoder.Read()
if err == io.EOF {
break
}
if err != nil {
e := fmt.Errorf("error reading documents in %s[%d]: %v", name, chunkCount, err)
scope.Warnf("%v - skipping", e)
scope.Debugf("Failed to parse yamlText chunk: %v", yamlText)
errs = multierror.Append(errs, e)
break
}
chunk := bytes.TrimSpace(doc)
r, err := s.parseChunk(r, name, lineNum, chunk)
if err != nil {
var uerr *unknownSchemaError
if errors.As(err, &uerr) {
// Note the error to the debug log but continue
scope.Debugf("skipping unknown yaml chunk %s: %s", name, uerr.Error())
} else {
e := fmt.Errorf("error processing %s[%d]: %v", name, chunkCount, err)
scope.Warnf("%v - skipping", e)
scope.Debugf("Failed to parse yaml chunk: %v", string(chunk))
errs = multierror.Append(errs, e)
}
continue
}
resources = append(resources, r)
}
return resources, errs
}
// unknownSchemaError represents a schema was not found for a group+version+kind.
type unknownSchemaError struct {
group string
version string
kind string
}
func (e unknownSchemaError) Error() string {
return fmt.Sprintf("failed finding schema for group/version/kind: %s/%s/%s", e.group, e.version, e.kind)
}
func (s *KubeSource) parseChunk(r *collection.Schemas, name string, lineNum int, yamlChunk []byte) (kubeResource, error) {
// Convert to JSON
jsonChunk, err := yaml.ToJSON(yamlChunk)
if err != nil {
return kubeResource{}, fmt.Errorf("failed converting YAML to JSON: %v", err)
}
// Peek at the beginning of the JSON to
groupVersionKind, err := kubeJson.DefaultMetaFactory.Interpret(jsonChunk)
if err != nil {
return kubeResource{}, fmt.Errorf("failed interpreting jsonChunk: %v", err)
}
if groupVersionKind.Empty() {
return kubeResource{}, fmt.Errorf("unable to parse resource with no group, version and kind")
}
schema, found := r.FindByGroupVersionKind(schemaresource.FromKubernetesGVK(groupVersionKind))
if !found {
return kubeResource{}, &unknownSchemaError{
group: groupVersionKind.Group,
version: groupVersionKind.Version,
kind: groupVersionKind.Kind,
}
}
// Cannot create new instance. This occurs because while newer types do not implement proto.Message,
// this legacy code only supports proto.Messages.
// Note: while NewInstance can be slightly modified to not return error here, the rest of the code
// still requires a proto.Message so it won't work without completely refactoring galley/
_, e := schema.Resource().NewInstance()
cannotHandleProto := e != nil
if cannotHandleProto {
return kubeResource{}, &unknownSchemaError{
group: groupVersionKind.Group,
version: groupVersionKind.Version,
kind: groupVersionKind.Kind,
}
}
runtimeScheme := runtime.NewScheme()
codecs := serializer.NewCodecFactory(runtimeScheme)
deserializer := codecs.UniversalDeserializer()
obj, err := kube.IstioScheme.New(schema.Resource().GroupVersionKind().Kubernetes())
if err != nil {
return kubeResource{}, fmt.Errorf("failed to initialize interface for built-in type: %v", err)
}
_, _, err = deserializer.Decode(jsonChunk, nil, obj)
if err != nil {
return kubeResource{}, fmt.Errorf("failed parsing JSON for built-in type: %v", err)
}
objMeta, ok := obj.(metav1.Object)
if !ok {
return kubeResource{}, errors.New("failed to assert type of object metadata")
}
// If namespace is blank and we have a default set, fill in the default
// (This mirrors the behavior if you kubectl apply a resource without a namespace defined)
// Don't do this for cluster scoped resources
if !schema.Resource().IsClusterScoped() {
if objMeta.GetNamespace() == "" && s.defaultNs != "" {
scope.Debugf("KubeSource.parseChunk: namespace not specified for %q, using %q", objMeta.GetName(), s.defaultNs)
objMeta.SetNamespace(string(s.defaultNs))
}
} else {
// Clear the namespace if there is any specified.
objMeta.SetNamespace("")
}
// Build flat map for analyzers if the line JSON object exists, if the YAML text is ill-formed, this will be nil
fieldMap := make(map[string]int)
// yamlv3.Node contains information like line number of the node, which will be used with its name to construct the field map
yamlChunkNode := yamlv3.Node{}
err = yamlv3.Unmarshal(yamlChunk, &yamlChunkNode)
if err == nil && len(yamlChunkNode.Content) == 1 {
// Get the Node that contains all the YAML chunk information
yamlNode := yamlChunkNode.Content[0]
BuildFieldPathMap(yamlNode, lineNum, "", fieldMap)
}
pos := kube2.Position{Filename: name, Line: lineNum}
c, err := ToConfig(objMeta, schema, &pos, fieldMap)
if err != nil {
return kubeResource{}, err
}
return kubeResource{
schema: schema,
sha: sha256.Sum256(yamlChunk),
config: c,
}, nil
}
const (
FieldMapKey = "istiofilefieldmap"
ReferenceKey = "istiosource"
)
// ToConfig converts the given object and proto to a config.Config
func ToConfig(object metav1.Object, schema collection.Schema, source resource.Reference, fieldMap map[string]int) (*config.Config, error) {
m, err := runtime.DefaultUnstructuredConverter.ToUnstructured(object)
if err != nil {
return nil, err
}
u := &unstructured.Unstructured{Object: m}
if len(fieldMap) > 0 || source != nil {
// TODO: populate
annots := u.GetAnnotations()
if annots == nil {
annots = map[string]string{}
}
jsonfm, err := json.Marshal(fieldMap)
if err != nil {
return nil, err
}
annots[FieldMapKey] = string(jsonfm)
jsonsource, err := json.Marshal(source)
if err != nil {
return nil, err
}
annots[ReferenceKey] = string(jsonsource)
u.SetAnnotations(annots)
}
result := TranslateObject(u, "", schema)
return result, nil
}
func TranslateObject(obj *unstructured.Unstructured, domainSuffix string, schema collection.Schema) *config.Config {
mv2, err := schema.Resource().NewInstance()
if err != nil {
panic(err)
}
if spec, ok := obj.UnstructuredContent()["spec"]; ok {
err = runtime.DefaultUnstructuredConverter.FromUnstructured(spec.(map[string]interface{}), mv2)
} else {
err = runtime.DefaultUnstructuredConverter.FromUnstructured(obj.UnstructuredContent(), mv2)
}
if err != nil {
panic(err)
}
m := obj
return &config.Config{
Meta: config.Meta{
GroupVersionKind: config.GroupVersionKind{
Group: m.GetObjectKind().GroupVersionKind().Group,
Version: m.GetObjectKind().GroupVersionKind().Version,
Kind: m.GetObjectKind().GroupVersionKind().Kind,
},
UID: string(m.GetUID()),
Name: m.GetName(),
Namespace: m.GetNamespace(),
Labels: m.GetLabels(),
Annotations: m.GetAnnotations(),
ResourceVersion: m.GetResourceVersion(),
CreationTimestamp: m.GetCreationTimestamp().Time,
OwnerReferences: m.GetOwnerReferences(),
Generation: m.GetGeneration(),
Domain: domainSuffix,
},
Spec: mv2,
}
}
// BuildFieldPathMap builds the flat map for each field of the YAML resource
func BuildFieldPathMap(yamlNode *yamlv3.Node, startLineNum int, curPath string, fieldPathMap map[string]int) {
// If no content in the node, terminate the DFS search
if len(yamlNode.Content) == 0 {
return
}
nodeContent := yamlNode.Content
// Iterate content by a step of 2, because in the content array the value is in the key's next index position
for i := 0; i < len(nodeContent)-1; i += 2 {
// Two condition, i + 1 positions have no content, which means they have the format like "key: value", then build the map
// Or i + 1 has contents, which means "key:\n value...", then perform one more DFS search
keyNode := nodeContent[i]
valueNode := nodeContent[i+1]
pathKeyForMap := fmt.Sprintf("%s.%s", curPath, keyNode.Value)
switch {
case valueNode.Kind == yamlv3.ScalarNode:
// Can build map because the value node has no content anymore
// minus one because startLineNum starts at line 1, and yamlv3.Node.line also starts at line 1
fieldPathMap[fmt.Sprintf("{%s}", pathKeyForMap)] = valueNode.Line + startLineNum - 1
case valueNode.Kind == yamlv3.MappingNode:
BuildFieldPathMap(valueNode, startLineNum, pathKeyForMap, fieldPathMap)
case valueNode.Kind == yamlv3.SequenceNode:
for j, node := range valueNode.Content {
pathWithIndex := fmt.Sprintf("%s[%d]", pathKeyForMap, j)
// Array with values or array with maps
if node.Kind == yamlv3.ScalarNode {
fieldPathMap[fmt.Sprintf("{%s}", pathWithIndex)] = node.Line + startLineNum - 1
} else {
BuildFieldPathMap(node, startLineNum, pathWithIndex, fieldPathMap)
}
}
}
}
}