blob: fa9ce470f9a4a611bcfd94f80212fa0de39f2007 [file] [log] [blame]
// Copyright Istio Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package wasm
import (
"sync"
"time"
)
import (
udpa "github.com/cncf/xds/go/udpa/type/v1"
core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
wasm "github.com/envoyproxy/go-control-plane/envoy/extensions/filters/http/wasm/v3"
"github.com/envoyproxy/go-control-plane/pkg/conversion"
"go.uber.org/atomic"
any "google.golang.org/protobuf/types/known/anypb"
extensions "istio.io/api/extensions/v1alpha1"
)
import (
"github.com/apache/dubbo-go-pixiu/pilot/pkg/model"
"github.com/apache/dubbo-go-pixiu/pkg/config/xds"
)
// MaybeConvertWasmExtensionConfig converts any presence of module remote download to local file.
// It downloads the Wasm module and stores the module locally in the file system.
func MaybeConvertWasmExtensionConfig(resources []*any.Any, cache Cache) bool {
var wg sync.WaitGroup
numResources := len(resources)
wg.Add(numResources)
sendNack := atomic.NewBool(false)
startTime := time.Now()
defer func() {
wasmConfigConversionDuration.Record(float64(time.Since(startTime).Milliseconds()))
}()
for i := 0; i < numResources; i++ {
go func(i int) {
defer wg.Done()
newExtensionConfig, nack := convert(resources[i], cache)
if nack {
sendNack.Store(true)
return
}
resources[i] = newExtensionConfig
}(i)
}
wg.Wait()
return sendNack.Load()
}
func convert(resource *any.Any, cache Cache) (newExtensionConfig *any.Any, sendNack bool) {
ec := &core.TypedExtensionConfig{}
newExtensionConfig = resource
sendNack = false
status := noRemoteLoad
defer func() {
wasmConfigConversionCount.
With(resultTag.Value(status)).
Increment()
}()
if err := resource.UnmarshalTo(ec); err != nil {
wasmLog.Debugf("failed to unmarshal extension config resource: %v", err)
return
}
wasmHTTPFilterConfig := &wasm.Wasm{}
// Wasm filter can be configured using typed struct and Wasm filter type
if ec.GetTypedConfig() != nil && ec.GetTypedConfig().TypeUrl == xds.WasmHTTPFilterType {
err := ec.GetTypedConfig().UnmarshalTo(wasmHTTPFilterConfig)
if err != nil {
wasmLog.Debugf("failed to unmarshal extension config resource into Wasm HTTP filter: %v", err)
return
}
} else if ec.GetTypedConfig() == nil || ec.GetTypedConfig().TypeUrl != xds.TypedStructType {
wasmLog.Debugf("cannot find typed struct in %+v", ec)
return
} else {
wasmStruct := &udpa.TypedStruct{}
wasmTypedConfig := ec.GetTypedConfig()
if err := wasmTypedConfig.UnmarshalTo(wasmStruct); err != nil {
wasmLog.Debugf("failed to unmarshal typed config for wasm filter: %v", err)
return
}
if wasmStruct.TypeUrl != xds.WasmHTTPFilterType {
wasmLog.Debugf("typed extension config %+v does not contain wasm http filter", wasmStruct)
return
}
if err := conversion.StructToMessage(wasmStruct.Value, wasmHTTPFilterConfig); err != nil {
wasmLog.Debugf("failed to convert extension config struct %+v to Wasm HTTP filter", wasmStruct)
return
}
}
if wasmHTTPFilterConfig.Config.GetVmConfig().GetCode().GetRemote() == nil {
wasmLog.Debugf("no remote load found in Wasm HTTP filter %+v", wasmHTTPFilterConfig)
return
}
// Wasm plugin configuration has remote load. From this point, any failure should result as a Nack,
// unless the plugin is marked as fail open.
failOpen := wasmHTTPFilterConfig.Config.GetFailOpen()
sendNack = !failOpen
status = conversionSuccess
vm := wasmHTTPFilterConfig.Config.GetVmConfig()
envs := vm.GetEnvironmentVariables()
var pullSecret []byte
pullPolicy := extensions.PullPolicy_UNSPECIFIED_POLICY
resourceVersion := ""
if envs != nil {
if sec, found := envs.KeyValues[model.WasmSecretEnv]; found {
if sec == "" {
status = fetchFailure
wasmLog.Errorf("cannot fetch Wasm module %v: missing image pulling secret", wasmHTTPFilterConfig.Config.Name)
return
}
pullSecret = []byte(sec)
}
// Strip all internal env variables from VM env variable.
// These env variables are added by Istio control plane and meant to be consumed by the agent for image pulling control,
// thus should not be leaked to Envoy or the Wasm extension runtime.
delete(envs.KeyValues, model.WasmSecretEnv)
if len(envs.KeyValues) == 0 {
if len(envs.HostEnvKeys) == 0 {
vm.EnvironmentVariables = nil
} else {
envs.KeyValues = nil
}
}
if ps, found := envs.KeyValues[model.WasmPolicyEnv]; found {
if p, found := extensions.PullPolicy_value[ps]; found {
pullPolicy = extensions.PullPolicy(p)
}
}
resourceVersion = envs.KeyValues[model.WasmResourceVersionEnv]
}
remote := vm.GetCode().GetRemote()
httpURI := remote.GetHttpUri()
if httpURI == nil {
status = missRemoteFetchHint
wasmLog.Errorf("wasm remote fetch %+v does not have httpUri specified", remote)
return
}
// checksum sent by istiod can be "nil" if not set by user - magic value used to avoid unmarshaling errors
if remote.Sha256 == "nil" {
remote.Sha256 = ""
}
timeout := time.Duration(0)
if remote.GetHttpUri().Timeout != nil {
timeout = remote.GetHttpUri().Timeout.AsDuration()
}
f, err := cache.Get(httpURI.GetUri(), remote.Sha256, wasmHTTPFilterConfig.Config.Name, resourceVersion, timeout, pullSecret, pullPolicy)
if err != nil {
status = fetchFailure
wasmLog.Errorf("cannot fetch Wasm module %v: %v", remote.GetHttpUri().GetUri(), err)
return
}
// Rewrite remote fetch to local file.
vm.Code = &core.AsyncDataSource{
Specifier: &core.AsyncDataSource_Local{
Local: &core.DataSource{
Specifier: &core.DataSource_Filename{
Filename: f,
},
},
},
}
wasmTypedConfig, err := any.New(wasmHTTPFilterConfig)
if err != nil {
status = marshalFailure
wasmLog.Errorf("failed to marshal new wasm HTTP filter %+v to protobuf Any: %v", wasmHTTPFilterConfig, err)
return
}
ec.TypedConfig = wasmTypedConfig
wasmLog.Debugf("new extension config resource %+v", ec)
nec, err := any.New(ec)
if err != nil {
status = marshalFailure
wasmLog.Errorf("failed to marshal new extension config resource: %v", err)
return
}
// At this point, we are certain that wasm module has been downloaded and config is rewritten.
// ECDS has been rewritten successfully and should not nack.
newExtensionConfig = nec
sendNack = false
return
}