blob: 6bac72b21b89b5b1898b271cbe6717fc8cf87b85 [file] [log] [blame]
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package wasm
import (
"bytes"
"context"
"crypto/sha256"
"encoding/hex"
"fmt"
"net/url"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
"time"
)
import (
"github.com/google/go-containerregistry/pkg/name"
extensions "istio.io/api/extensions/v1alpha1"
"istio.io/pkg/log"
)
import (
"github.com/apache/dubbo-go-pixiu/pkg/util/sets"
)
var wasmLog = log.RegisterScope("wasm", "", 0)
const (
// DefaultWasmModulePurgeInterval is the default interval for periodic stale Wasm module clean up.
DefaultWasmModulePurgeInterval = 10 * time.Minute
// DefaultWasmModuleExpiry is the default duration for least recently touched Wasm module to become stale.
DefaultWasmModuleExpiry = 24 * time.Hour
// oci URL prefix
ociURLPrefix = "oci://"
// sha256 scheme prefix
sha256SchemePrefix = "sha256:"
)
// Cache models a Wasm module cache.
type Cache interface {
Get(url, checksum, resourceName, resourceVersion string, timeout time.Duration, pullSecret []byte, pullPolicy extensions.PullPolicy) (string, error)
Cleanup()
}
// LocalFileCache for downloaded Wasm modules. Currently it stores the Wasm module as local file.
type LocalFileCache struct {
// Map from Wasm module checksum to cache entry.
modules map[moduleKey]*cacheEntry
// Map from tagged URL to checksum
checksums map[string]*checksumEntry
// http fetcher fetches Wasm module with HTTP get.
httpFetcher *HTTPFetcher
// directory path used to store Wasm module.
dir string
// mux is needed because stale Wasm module files will be purged periodically.
mux sync.Mutex
// Duration for stale Wasm module purging.
purgeInterval time.Duration
wasmModuleExpiry time.Duration
insecureRegistries sets.Set
allowAllInsecureRegistries bool
// stopChan currently is only used by test
stopChan chan struct{}
}
var _ Cache = &LocalFileCache{}
type checksumEntry struct {
checksum string
// Keeps the resource version per each resource for dealing with multiple resources which pointing the same image.
resourceVersionByResource map[string]string
}
type moduleKey struct {
// Identifier for the module. It should be neutral for the checksum.
// e.g.) oci://docker.io/test@sha256:0123456789 is not allowed.
// oci://docker.io/test:latest (tagged form) is allowed.
name string
checksum string
}
type cacheKey struct {
moduleKey
downloadURL string
// Resource name of WasmPlugin resource. This should be a fully-qualified name.
resourceName string
// Resource version of WasmPlugin resource. Even though PullPolicy is Always,
// if there is no change of resource state, a cached entry is used instead of pulling newly.
resourceVersion string
}
// cacheEntry contains information about a Wasm module cache entry.
type cacheEntry struct {
// File path to the downloaded wasm modules.
modulePath string
// Last time that this local Wasm module is referenced.
last time.Time
// set of URLs referencing this entry
referencingURLs sets.Set
}
// NewLocalFileCache create a new Wasm module cache which downloads and stores Wasm module files locally.
func NewLocalFileCache(dir string, purgeInterval, moduleExpiry time.Duration, insecureRegistries []string) *LocalFileCache {
ir := sets.New(insecureRegistries...)
cache := &LocalFileCache{
httpFetcher: NewHTTPFetcher(),
modules: make(map[moduleKey]*cacheEntry),
checksums: make(map[string]*checksumEntry),
dir: dir,
purgeInterval: purgeInterval,
wasmModuleExpiry: moduleExpiry,
stopChan: make(chan struct{}),
insecureRegistries: ir,
// If the set of the given insecure registries contains "*", then allow all the insecure registries.
allowAllInsecureRegistries: ir.Contains("*"),
}
go func() {
cache.purge()
}()
return cache
}
func urlAsResourceName(fullURLStr string) string {
if strings.HasPrefix(fullURLStr, ociURLPrefix) {
if tag, err := name.ParseReference(fullURLStr[len(ociURLPrefix):]); err == nil {
// remove tag or sha
return ociURLPrefix + tag.Context().Name()
}
}
return fullURLStr
}
func pullIfNotPresent(pullPolicy extensions.PullPolicy, u *url.URL) bool {
if u.Scheme == "oci" {
switch pullPolicy {
case extensions.PullPolicy_Always:
return false
case extensions.PullPolicy_IfNotPresent:
return true
default:
return !strings.HasSuffix(u.Path, ":latest")
}
}
// If http/https is used, it has `always` semantics at this time.
return false
}
// Get returns path the local Wasm module file.
func (c *LocalFileCache) Get(
downloadURL, checksum, resourceName, resourceVersion string,
timeout time.Duration, pullSecret []byte, pullPolicy extensions.PullPolicy) (string, error) {
// Construct Wasm cache key with downloading URL and provided checksum of the module.
key := cacheKey{
downloadURL: downloadURL,
moduleKey: moduleKey{
name: urlAsResourceName(downloadURL),
checksum: checksum,
},
resourceName: resourceName,
resourceVersion: resourceVersion,
}
u, err := url.Parse(downloadURL)
if err != nil {
return "", fmt.Errorf("fail to parse Wasm module fetch url: %s", downloadURL)
}
// First check if the cache entry is already downloaded and policy does not require to pull always.
var modulePath string
modulePath, key.checksum = c.getEntry(key, pullIfNotPresent(pullPolicy, u))
if modulePath != "" {
c.touchEntry(key)
return modulePath, nil
}
// If not, fetch images.
// Byte array of Wasm binary.
var b []byte
// Hex-Encoded sha256 checksum of binary.
var dChecksum string
var binaryFetcher func() ([]byte, error)
switch u.Scheme {
case "http", "https":
// Download the Wasm module with http fetcher.
b, err = c.httpFetcher.Fetch(downloadURL, timeout)
if err != nil {
wasmRemoteFetchCount.With(resultTag.Value(downloadFailure)).Increment()
return "", err
}
// Get sha256 checksum and check if it is the same as provided one.
sha := sha256.Sum256(b)
dChecksum = hex.EncodeToString(sha[:])
case "oci":
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
insecure := false
if c.allowAllInsecureRegistries || c.insecureRegistries.Contains(u.Host) {
insecure = true
}
// TODO: support imagePullSecret and pass it to ImageFetcherOption.
imgFetcherOps := ImageFetcherOption{
Insecure: insecure,
}
if pullSecret != nil {
imgFetcherOps.PullSecret = pullSecret
}
wasmLog.Debugf("wasm oci fetch %s with options: %v", downloadURL, imgFetcherOps)
fetcher := NewImageFetcher(ctx, imgFetcherOps)
binaryFetcher, dChecksum, err = fetcher.PrepareFetch(u.Host + u.Path)
if err != nil {
wasmRemoteFetchCount.With(resultTag.Value(downloadFailure)).Increment()
return "", fmt.Errorf("could not fetch Wasm OCI image: %v", err)
}
default:
return "", fmt.Errorf("unsupported Wasm module downloading URL scheme: %v", u.Scheme)
}
if key.checksum == "" {
key.checksum = dChecksum
// check again if the cache is having the checksum.
if modulePath, _ := c.getEntry(key, true); modulePath != "" {
c.touchEntry(key)
return modulePath, nil
}
} else if dChecksum != key.checksum {
wasmRemoteFetchCount.With(resultTag.Value(checksumMismatch)).Increment()
return "", fmt.Errorf("module downloaded from %v has checksum %v, which does not match: %v", downloadURL, dChecksum, key.checksum)
}
if binaryFetcher != nil {
b, err = binaryFetcher()
if err != nil {
wasmRemoteFetchCount.With(resultTag.Value(downloadFailure)).Increment()
return "", fmt.Errorf("could not fetch Wasm binary: %v", err)
}
}
if !isValidWasmBinary(b) {
wasmRemoteFetchCount.With(resultTag.Value(fetchFailure)).Increment()
return "", fmt.Errorf("fetched Wasm binary from %s is invalid", downloadURL)
}
wasmRemoteFetchCount.With(resultTag.Value(fetchSuccess)).Increment()
key.checksum = dChecksum
f := filepath.Join(c.dir, fmt.Sprintf("%s.wasm", dChecksum))
if err := c.addEntry(key, b, f); err != nil {
return "", err
}
return f, nil
}
// Cleanup closes background Wasm module purge routine.
func (c *LocalFileCache) Cleanup() {
close(c.stopChan)
}
func (c *LocalFileCache) updateChecksum(key cacheKey) bool {
// If OCI URL having a tag, we need to update checksum.
needChecksumUpdate := strings.HasPrefix(key.downloadURL, ociURLPrefix) && !strings.Contains(key.downloadURL, "@")
if needChecksumUpdate {
ce := c.checksums[key.downloadURL]
if ce == nil {
ce = new(checksumEntry)
ce.resourceVersionByResource = make(map[string]string)
c.checksums[key.downloadURL] = ce
}
ce.checksum = key.checksum
ce.resourceVersionByResource[key.resourceName] = key.resourceVersion
}
return needChecksumUpdate
}
func (c *LocalFileCache) touchEntry(key cacheKey) {
c.mux.Lock()
defer c.mux.Unlock()
c.updateChecksum(key)
}
func (c *LocalFileCache) addEntry(key cacheKey, wasmModule []byte, f string) error {
c.mux.Lock()
defer c.mux.Unlock()
needChecksumUpdate := c.updateChecksum(key)
if needChecksumUpdate {
ce := c.checksums[key.downloadURL]
if ce == nil {
ce = new(checksumEntry)
ce.resourceVersionByResource = make(map[string]string)
c.checksums[key.downloadURL] = ce
}
ce.checksum = key.checksum
ce.resourceVersionByResource[key.resourceName] = key.resourceVersion
}
// Check if the module has already been added. If so, avoid writing the file again.
if ce, ok := c.modules[key.moduleKey]; ok {
// Update last touched time.
ce.last = time.Now()
if needChecksumUpdate {
ce.referencingURLs.Insert(key.downloadURL)
}
return nil
}
// Materialize the Wasm module into a local file. Use checksum as name of the module.
if err := os.WriteFile(f, wasmModule, 0o644); err != nil {
return err
}
ce := cacheEntry{
modulePath: f,
last: time.Now(),
referencingURLs: sets.New(),
}
if needChecksumUpdate {
ce.referencingURLs.Insert(key.downloadURL)
}
c.modules[key.moduleKey] = &ce
wasmCacheEntries.Record(float64(len(c.modules)))
return nil
}
// getEntry finds a cached module, and returns the path of the module and its checksum.
func (c *LocalFileCache) getEntry(key cacheKey, ignoreResourceVersion bool) (string, string) {
modulePath := ""
cacheHit := false
c.mux.Lock()
defer c.mux.Unlock()
// Only apply this for OCI image, not http/https because OCI image has ImagePullPolicy
// to control the pull policy, but http/https currently rely on existence of checksum.
// At this point, we don't need to break the current behavior for http/https.
if len(key.checksum) == 0 && strings.HasPrefix(key.downloadURL, ociURLPrefix) {
if d, err := name.NewDigest(key.downloadURL[len(ociURLPrefix):]); err == nil {
// If there is no checksum and the digest is suffixed in URL, use the digest.
dstr := d.DigestStr()
if strings.HasPrefix(dstr, sha256SchemePrefix) {
key.checksum = dstr[len(sha256SchemePrefix):]
}
// For other digest scheme, give up to use cache.
} else {
// If no checksum, try the checksum cache.
// If the image was pulled before, there should be a checksum of the most recently pulled image.
if ce, found := c.checksums[key.downloadURL]; found {
if ignoreResourceVersion || key.resourceVersion == ce.resourceVersionByResource[key.resourceName] {
key.checksum = ce.checksum
}
// update resource version here
ce.resourceVersionByResource[key.resourceName] = key.resourceVersion
}
}
}
if ce, ok := c.modules[key.moduleKey]; ok {
// Update last touched time.
ce.last = time.Now()
modulePath = ce.modulePath
cacheHit = true
}
wasmCacheLookupCount.With(hitTag.Value(strconv.FormatBool(cacheHit))).Increment()
return modulePath, key.checksum
}
// Purge periodically clean up the stale Wasm modules local file and the cache map.
func (c *LocalFileCache) purge() {
ticker := time.NewTicker(c.purgeInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
c.mux.Lock()
for k, m := range c.modules {
if !m.expired(c.wasmModuleExpiry) {
continue
}
// The module has not be touched for expiry duration, delete it from the map as well as the local dir.
if err := os.Remove(m.modulePath); err != nil {
wasmLog.Errorf("failed to purge Wasm module %v: %v", m.modulePath, err)
} else {
for downloadURL := range m.referencingURLs {
delete(c.checksums, downloadURL)
}
delete(c.modules, k)
wasmLog.Debugf("successfully removed stale Wasm module %v", m.modulePath)
}
}
wasmCacheEntries.Record(float64(len(c.modules)))
c.mux.Unlock()
case <-c.stopChan:
// Currently this will only happen in test.
return
}
}
}
// Expired returns true if the module has not been touched for Wasm module Expiry.
func (ce *cacheEntry) expired(expiry time.Duration) bool {
now := time.Now()
return now.Sub(ce.last) > expiry
}
var wasmMagicNumber = []byte{0x00, 0x61, 0x73, 0x6d}
func isValidWasmBinary(in []byte) bool {
// Wasm file header is 8 bytes (magic number + version).
return len(in) >= 8 && bytes.Equal(in[:4], wasmMagicNumber)
}