| /* |
| Copyright 2016 The Kubernetes Authors. |
| |
| Licensed under the Apache License, Version 2.0 (the "License"); |
| you may not use this file except in compliance with the License. |
| You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| */ |
| |
| // Reads the pod configuration from file or a directory of files. |
| package config |
| |
| import ( |
| "fmt" |
| "io/ioutil" |
| "os" |
| "path/filepath" |
| "sort" |
| "strings" |
| "time" |
| |
| "k8s.io/klog" |
| |
| "k8s.io/api/core/v1" |
| "k8s.io/apimachinery/pkg/types" |
| "k8s.io/client-go/tools/cache" |
| api "k8s.io/kubernetes/pkg/apis/core" |
| kubetypes "k8s.io/kubernetes/pkg/kubelet/types" |
| ) |
| |
| type podEventType int |
| |
| const ( |
| podAdd podEventType = iota |
| podModify |
| podDelete |
| |
| eventBufferLen = 10 |
| ) |
| |
| type watchEvent struct { |
| fileName string |
| eventType podEventType |
| } |
| |
| type sourceFile struct { |
| path string |
| nodeName types.NodeName |
| period time.Duration |
| store cache.Store |
| fileKeyMapping map[string]string |
| updates chan<- interface{} |
| watchEvents chan *watchEvent |
| } |
| |
| func NewSourceFile(path string, nodeName types.NodeName, period time.Duration, updates chan<- interface{}) { |
| // "golang.org/x/exp/inotify" requires a path without trailing "/" |
| path = strings.TrimRight(path, string(os.PathSeparator)) |
| |
| config := newSourceFile(path, nodeName, period, updates) |
| klog.V(1).Infof("Watching path %q", path) |
| config.run() |
| } |
| |
| func newSourceFile(path string, nodeName types.NodeName, period time.Duration, updates chan<- interface{}) *sourceFile { |
| send := func(objs []interface{}) { |
| var pods []*v1.Pod |
| for _, o := range objs { |
| pods = append(pods, o.(*v1.Pod)) |
| } |
| updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.FileSource} |
| } |
| store := cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc) |
| return &sourceFile{ |
| path: path, |
| nodeName: nodeName, |
| period: period, |
| store: store, |
| fileKeyMapping: map[string]string{}, |
| updates: updates, |
| watchEvents: make(chan *watchEvent, eventBufferLen), |
| } |
| } |
| |
| func (s *sourceFile) run() { |
| listTicker := time.NewTicker(s.period) |
| |
| go func() { |
| // Read path immediately to speed up startup. |
| if err := s.listConfig(); err != nil { |
| klog.Errorf("Unable to read config path %q: %v", s.path, err) |
| } |
| for { |
| select { |
| case <-listTicker.C: |
| if err := s.listConfig(); err != nil { |
| klog.Errorf("Unable to read config path %q: %v", s.path, err) |
| } |
| case e := <-s.watchEvents: |
| if err := s.consumeWatchEvent(e); err != nil { |
| klog.Errorf("Unable to process watch event: %v", err) |
| } |
| } |
| } |
| }() |
| |
| s.startWatch() |
| } |
| |
| func (s *sourceFile) applyDefaults(pod *api.Pod, source string) error { |
| return applyDefaults(pod, source, true, s.nodeName) |
| } |
| |
| func (s *sourceFile) listConfig() error { |
| path := s.path |
| statInfo, err := os.Stat(path) |
| if err != nil { |
| if !os.IsNotExist(err) { |
| return err |
| } |
| // Emit an update with an empty PodList to allow FileSource to be marked as seen |
| s.updates <- kubetypes.PodUpdate{Pods: []*v1.Pod{}, Op: kubetypes.SET, Source: kubetypes.FileSource} |
| return fmt.Errorf("path does not exist, ignoring") |
| } |
| |
| switch { |
| case statInfo.Mode().IsDir(): |
| pods, err := s.extractFromDir(path) |
| if err != nil { |
| return err |
| } |
| if len(pods) == 0 { |
| // Emit an update with an empty PodList to allow FileSource to be marked as seen |
| s.updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.FileSource} |
| return nil |
| } |
| return s.replaceStore(pods...) |
| |
| case statInfo.Mode().IsRegular(): |
| pod, err := s.extractFromFile(path) |
| if err != nil { |
| return err |
| } |
| return s.replaceStore(pod) |
| |
| default: |
| return fmt.Errorf("path is not a directory or file") |
| } |
| } |
| |
| // Get as many pod manifests as we can from a directory. Return an error if and only if something |
| // prevented us from reading anything at all. Do not return an error if only some files |
| // were problematic. |
| func (s *sourceFile) extractFromDir(name string) ([]*v1.Pod, error) { |
| dirents, err := filepath.Glob(filepath.Join(name, "[^.]*")) |
| if err != nil { |
| return nil, fmt.Errorf("glob failed: %v", err) |
| } |
| |
| pods := make([]*v1.Pod, 0) |
| if len(dirents) == 0 { |
| return pods, nil |
| } |
| |
| sort.Strings(dirents) |
| for _, path := range dirents { |
| statInfo, err := os.Stat(path) |
| if err != nil { |
| klog.Errorf("Can't get metadata for %q: %v", path, err) |
| continue |
| } |
| |
| switch { |
| case statInfo.Mode().IsDir(): |
| klog.Errorf("Not recursing into manifest path %q", path) |
| case statInfo.Mode().IsRegular(): |
| pod, err := s.extractFromFile(path) |
| if err != nil { |
| if !os.IsNotExist(err) { |
| klog.Errorf("Can't process manifest file %q: %v", path, err) |
| } |
| } else { |
| pods = append(pods, pod) |
| } |
| default: |
| klog.Errorf("Manifest path %q is not a directory or file: %v", path, statInfo.Mode()) |
| } |
| } |
| return pods, nil |
| } |
| |
| func (s *sourceFile) extractFromFile(filename string) (pod *v1.Pod, err error) { |
| klog.V(3).Infof("Reading config file %q", filename) |
| defer func() { |
| if err == nil && pod != nil { |
| objKey, keyErr := cache.MetaNamespaceKeyFunc(pod) |
| if keyErr != nil { |
| err = keyErr |
| return |
| } |
| s.fileKeyMapping[filename] = objKey |
| } |
| }() |
| |
| file, err := os.Open(filename) |
| if err != nil { |
| return pod, err |
| } |
| defer file.Close() |
| |
| data, err := ioutil.ReadAll(file) |
| if err != nil { |
| return pod, err |
| } |
| |
| defaultFn := func(pod *api.Pod) error { |
| return s.applyDefaults(pod, filename) |
| } |
| |
| parsed, pod, podErr := tryDecodeSinglePod(data, defaultFn) |
| if parsed { |
| if podErr != nil { |
| return pod, podErr |
| } |
| return pod, nil |
| } |
| |
| return pod, fmt.Errorf("%v: couldn't parse as pod(%v), please check config file.\n", filename, podErr) |
| } |
| |
| func (s *sourceFile) replaceStore(pods ...*v1.Pod) (err error) { |
| objs := []interface{}{} |
| for _, pod := range pods { |
| objs = append(objs, pod) |
| } |
| return s.store.Replace(objs, "") |
| } |