blob: 50cfcf495347f866e7560dd9bed81d61a8d38fe8 [file] [log] [blame]
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package logs
import (
"compress/gzip"
"fmt"
"io"
"os"
"path/filepath"
"sort"
"strings"
"time"
"k8s.io/klog"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/util/clock"
"k8s.io/apimachinery/pkg/util/wait"
internalapi "k8s.io/kubernetes/pkg/kubelet/apis/cri"
runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/runtime/v1alpha2"
)
const (
// logMonitorPeriod is the period container log manager monitors
// container logs and performs log rotation.
logMonitorPeriod = 10 * time.Second
// timestampFormat is format of the timestamp suffix for rotated log.
// See https://golang.org/pkg/time/#Time.Format.
timestampFormat = "20060102-150405"
// compressSuffix is the suffix for compressed log.
compressSuffix = ".gz"
// tmpSuffix is the suffix for temporary file.
tmpSuffix = ".tmp"
)
// ContainerLogManager manages lifecycle of all container logs.
//
// Implementation is thread-safe.
type ContainerLogManager interface {
// TODO(random-liu): Add RotateLogs function and call it under disk pressure.
// Start container log manager.
Start()
}
// LogRotatePolicy is a policy for container log rotation. The policy applies to all
// containers managed by kubelet.
type LogRotatePolicy struct {
// MaxSize in bytes of the container log file before it is rotated. Negative
// number means to disable container log rotation.
MaxSize int64
// MaxFiles is the maximum number of log files that can be present.
// If rotating the logs creates excess files, the oldest file is removed.
MaxFiles int
}
// GetAllLogs gets all inuse (rotated/compressed) logs for a specific container log.
// Returned logs are sorted in oldest to newest order.
// TODO(#59902): Leverage this function to support log rotation in `kubectl logs`.
func GetAllLogs(log string) ([]string, error) {
// pattern is used to match all rotated files.
pattern := fmt.Sprintf("%s.*", log)
logs, err := filepath.Glob(pattern)
if err != nil {
return nil, fmt.Errorf("failed to list all log files with pattern %q: %v", pattern, err)
}
inuse, _ := filterUnusedLogs(logs)
sort.Strings(inuse)
return append(inuse, log), nil
}
// compressReadCloser wraps gzip.Reader with a function to close file handler.
type compressReadCloser struct {
f *os.File
*gzip.Reader
}
func (rc *compressReadCloser) Close() error {
ferr := rc.f.Close()
rerr := rc.Reader.Close()
if ferr != nil {
return ferr
}
if rerr != nil {
return rerr
}
return nil
}
// UncompressLog compresses a compressed log and return a readcloser for the
// stream of the uncompressed content.
// TODO(#59902): Leverage this function to support log rotation in `kubectl logs`.
func UncompressLog(log string) (_ io.ReadCloser, retErr error) {
if !strings.HasSuffix(log, compressSuffix) {
return nil, fmt.Errorf("log is not compressed")
}
f, err := os.Open(log)
if err != nil {
return nil, fmt.Errorf("failed to open log: %v", err)
}
defer func() {
if retErr != nil {
f.Close()
}
}()
r, err := gzip.NewReader(f)
if err != nil {
return nil, fmt.Errorf("failed to create gzip reader: %v", err)
}
return &compressReadCloser{f: f, Reader: r}, nil
}
// parseMaxSize parses quantity string to int64 max size in bytes.
func parseMaxSize(size string) (int64, error) {
quantity, err := resource.ParseQuantity(size)
if err != nil {
return 0, err
}
maxSize, ok := quantity.AsInt64()
if !ok {
return 0, fmt.Errorf("invalid max log size")
}
if maxSize < 0 {
return 0, fmt.Errorf("negative max log size %d", maxSize)
}
return maxSize, nil
}
type containerLogManager struct {
runtimeService internalapi.RuntimeService
policy LogRotatePolicy
clock clock.Clock
}
// NewContainerLogManager creates a new container log manager.
func NewContainerLogManager(runtimeService internalapi.RuntimeService, maxSize string, maxFiles int) (ContainerLogManager, error) {
if maxFiles <= 1 {
return nil, fmt.Errorf("invalid MaxFiles %d, must be > 1", maxFiles)
}
parsedMaxSize, err := parseMaxSize(maxSize)
if err != nil {
return nil, fmt.Errorf("failed to parse container log max size %q: %v", maxSize, err)
}
// policy LogRotatePolicy
return &containerLogManager{
runtimeService: runtimeService,
policy: LogRotatePolicy{
MaxSize: parsedMaxSize,
MaxFiles: maxFiles,
},
clock: clock.RealClock{},
}, nil
}
// Start the container log manager.
func (c *containerLogManager) Start() {
// Start a goroutine periodically does container log rotation.
go wait.Forever(func() {
if err := c.rotateLogs(); err != nil {
klog.Errorf("Failed to rotate container logs: %v", err)
}
}, logMonitorPeriod)
}
func (c *containerLogManager) rotateLogs() error {
// TODO(#59998): Use kubelet pod cache.
containers, err := c.runtimeService.ListContainers(&runtimeapi.ContainerFilter{})
if err != nil {
return fmt.Errorf("failed to list containers: %v", err)
}
// NOTE(random-liu): Figure out whether we need to rotate container logs in parallel.
for _, container := range containers {
// Only rotate logs for running containers. Non-running containers won't
// generate new output, it doesn't make sense to keep an empty latest log.
if container.GetState() != runtimeapi.ContainerState_CONTAINER_RUNNING {
continue
}
id := container.GetId()
// Note that we should not block log rotate for an error of a single container.
status, err := c.runtimeService.ContainerStatus(id)
if err != nil {
klog.Errorf("Failed to get container status for %q: %v", id, err)
continue
}
path := status.GetLogPath()
info, err := os.Stat(path)
if err != nil {
if !os.IsNotExist(err) {
klog.Errorf("Failed to stat container log %q: %v", path, err)
continue
}
// In rotateLatestLog, there are several cases that we may
// lose original container log after ReopenContainerLog fails.
// We try to recover it by reopening container log.
if err := c.runtimeService.ReopenContainerLog(id); err != nil {
klog.Errorf("Container %q log %q doesn't exist, reopen container log failed: %v", id, path, err)
continue
}
// The container log should be recovered.
info, err = os.Stat(path)
if err != nil {
klog.Errorf("Failed to stat container log %q after reopen: %v", path, err)
continue
}
}
if info.Size() < c.policy.MaxSize {
continue
}
// Perform log rotation.
if err := c.rotateLog(id, path); err != nil {
klog.Errorf("Failed to rotate log %q for container %q: %v", path, id, err)
continue
}
}
return nil
}
func (c *containerLogManager) rotateLog(id, log string) error {
// pattern is used to match all rotated files.
pattern := fmt.Sprintf("%s.*", log)
logs, err := filepath.Glob(pattern)
if err != nil {
return fmt.Errorf("failed to list all log files with pattern %q: %v", pattern, err)
}
logs, err = c.cleanupUnusedLogs(logs)
if err != nil {
return fmt.Errorf("failed to cleanup logs: %v", err)
}
logs, err = c.removeExcessLogs(logs)
if err != nil {
return fmt.Errorf("failed to remove excess logs: %v", err)
}
// Compress uncompressed log files.
for _, l := range logs {
if strings.HasSuffix(l, compressSuffix) {
continue
}
if err := c.compressLog(l); err != nil {
return fmt.Errorf("failed to compress log %q: %v", l, err)
}
}
if err := c.rotateLatestLog(id, log); err != nil {
return fmt.Errorf("failed to rotate log %q: %v", log, err)
}
return nil
}
// cleanupUnusedLogs cleans up temporary or unused log files generated by previous log rotation
// failure.
func (c *containerLogManager) cleanupUnusedLogs(logs []string) ([]string, error) {
inuse, unused := filterUnusedLogs(logs)
for _, l := range unused {
if err := os.Remove(l); err != nil {
return nil, fmt.Errorf("failed to remove unused log %q: %v", l, err)
}
}
return inuse, nil
}
// filterUnusedLogs splits logs into 2 groups, the 1st group is in used logs,
// the second group is unused logs.
func filterUnusedLogs(logs []string) (inuse []string, unused []string) {
for _, l := range logs {
if isInUse(l, logs) {
inuse = append(inuse, l)
} else {
unused = append(unused, l)
}
}
return inuse, unused
}
// isInUse checks whether a container log file is still inuse.
func isInUse(l string, logs []string) bool {
// All temporary files are not in use.
if strings.HasSuffix(l, tmpSuffix) {
return false
}
// All compresed logs are in use.
if strings.HasSuffix(l, compressSuffix) {
return true
}
// Files has already been compressed are not in use.
for _, another := range logs {
if l+compressSuffix == another {
return false
}
}
return true
}
// removeExcessLogs removes old logs to make sure there are only at most MaxFiles log files.
func (c *containerLogManager) removeExcessLogs(logs []string) ([]string, error) {
// Sort log files in oldest to newest order.
sort.Strings(logs)
// Container will create a new log file, and we'll rotate the latest log file.
// Other than those 2 files, we can have at most MaxFiles-2 rotated log files.
// Keep MaxFiles-2 files by removing old files.
// We should remove from oldest to newest, so as not to break ongoing `kubectl logs`.
maxRotatedFiles := c.policy.MaxFiles - 2
if maxRotatedFiles < 0 {
maxRotatedFiles = 0
}
i := 0
for ; i < len(logs)-maxRotatedFiles; i++ {
if err := os.Remove(logs[i]); err != nil {
return nil, fmt.Errorf("failed to remove old log %q: %v", logs[i], err)
}
}
logs = logs[i:]
return logs, nil
}
// compressLog compresses a log to log.gz with gzip.
func (c *containerLogManager) compressLog(log string) error {
r, err := os.Open(log)
if err != nil {
return fmt.Errorf("failed to open log %q: %v", log, err)
}
defer r.Close()
tmpLog := log + tmpSuffix
f, err := os.OpenFile(tmpLog, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0644)
if err != nil {
return fmt.Errorf("failed to create temporary log %q: %v", tmpLog, err)
}
defer func() {
// Best effort cleanup of tmpLog.
os.Remove(tmpLog)
}()
defer f.Close()
w := gzip.NewWriter(f)
defer w.Close()
if _, err := io.Copy(w, r); err != nil {
return fmt.Errorf("failed to compress %q to %q: %v", log, tmpLog, err)
}
compressedLog := log + compressSuffix
if err := os.Rename(tmpLog, compressedLog); err != nil {
return fmt.Errorf("failed to rename %q to %q: %v", tmpLog, compressedLog, err)
}
// Remove old log file.
if err := os.Remove(log); err != nil {
return fmt.Errorf("failed to remove log %q after compress: %v", log, err)
}
return nil
}
// rotateLatestLog rotates latest log without compression, so that container can still write
// and fluentd can finish reading.
func (c *containerLogManager) rotateLatestLog(id, log string) error {
timestamp := c.clock.Now().Format(timestampFormat)
rotated := fmt.Sprintf("%s.%s", log, timestamp)
if err := os.Rename(log, rotated); err != nil {
return fmt.Errorf("failed to rotate log %q to %q: %v", log, rotated, err)
}
if err := c.runtimeService.ReopenContainerLog(id); err != nil {
// Rename the rotated log back, so that we can try rotating it again
// next round.
// If kubelet gets restarted at this point, we'll lose original log.
if renameErr := os.Rename(rotated, log); renameErr != nil {
// This shouldn't happen.
// Report an error if this happens, because we will lose original
// log.
klog.Errorf("Failed to rename rotated log %q back to %q: %v, reopen container log error: %v", rotated, log, renameErr, err)
}
return fmt.Errorf("failed to reopen container log %q: %v", id, err)
}
return nil
}