| /* |
| Licensed to the Apache Software Foundation (ASF) under one or more |
| contributor license agreements. See the NOTICE file distributed with |
| this work for additional information regarding copyright ownership. |
| The ASF licenses this file to You under the Apache License, Version 2.0 |
| (the "License"); you may not use this file except in compliance with |
| the License. You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| */ |
| |
| package trait |
| |
| import ( |
| "fmt" |
| "net/url" |
| "reflect" |
| "strconv" |
| "strings" |
| |
| v1 "github.com/apache/camel-k/pkg/apis/camel/v1" |
| knativeapi "github.com/apache/camel-k/pkg/apis/camel/v1/knative" |
| "github.com/apache/camel-k/pkg/metadata" |
| "github.com/apache/camel-k/pkg/util" |
| "github.com/apache/camel-k/pkg/util/envvar" |
| knativeutil "github.com/apache/camel-k/pkg/util/knative" |
| "github.com/apache/camel-k/pkg/util/kubernetes" |
| "github.com/pkg/errors" |
| corev1 "k8s.io/api/core/v1" |
| k8serrors "k8s.io/apimachinery/pkg/api/errors" |
| "k8s.io/apimachinery/pkg/runtime" |
| eventing "knative.dev/eventing/pkg/apis/eventing/v1beta1" |
| serving "knative.dev/serving/pkg/apis/serving/v1" |
| ) |
| |
| // The Knative trait automatically discovers addresses of Knative resources and inject them into the |
| // running integration. |
| // |
| // The full Knative configuration is injected in the CAMEL_KNATIVE_CONFIGURATION in JSON format. |
| // The Camel Knative component will then use the full configuration to configure the routes. |
| // |
| // The trait is enabled by default when the Knative profile is active. |
| // |
| // +camel-k:trait=knative |
| type knativeTrait struct { |
| BaseTrait `property:",squash"` |
| // Can be used to inject a Knative complete configuration in JSON format. |
| Configuration string `property:"configuration" json:"configuration,omitempty"` |
| // List of channels used as source of integration routes. |
| // Can contain simple channel names or full Camel URIs. |
| ChannelSources []string `property:"channel-sources" json:"channelSources,omitempty"` |
| // List of channels used as destination of integration routes. |
| // Can contain simple channel names or full Camel URIs. |
| ChannelSinks []string `property:"channel-sinks" json:"channelSinks,omitempty"` |
| // List of channels used as source of integration routes. |
| EndpointSources []string `property:"endpoint-sources" json:"endpointSources,omitempty"` |
| // List of endpoints used as destination of integration routes. |
| // Can contain simple endpoint names or full Camel URIs. |
| EndpointSinks []string `property:"endpoint-sinks" json:"endpointSinks,omitempty"` |
| // List of event types that the integration will be subscribed to. |
| // Can contain simple event types or full Camel URIs (to use a specific broker different from "default"). |
| EventSources []string `property:"event-sources" json:"eventSources,omitempty"` |
| // List of event types that the integration will produce. |
| // Can contain simple event types or full Camel URIs (to use a specific broker). |
| EventSinks []string `property:"event-sinks" json:"eventSinks,omitempty"` |
| // Enables filtering on events based on the header "ce-knativehistory". Since this is an experimental header |
| // that can be removed in a future version of Knative, filtering is enabled only when the integration is |
| // listening from more than 1 channel. |
| FilterSourceChannels *bool `property:"filter-source-channels" json:"filterSourceChannels,omitempty"` |
| // Enables Knative CamelSource pre 0.15 compatibility fixes (will be removed in future versions). |
| CamelSourceCompat *bool `property:"camel-source-compat" json:"camelSourceCompat,omitempty"` |
| // Allows binding the integration to a sink via a Knative SinkBinding resource. |
| // This can be used when the integration targets a single sink. |
| // It's enabled by default when the integration targets a single sink |
| // (except when the integration is owned by a Knative source). |
| SinkBinding *bool `property:"sink-binding" json:"sinkBinding,omitempty"` |
| // Enable automatic discovery of all trait properties. |
| Auto *bool `property:"auto" json:"auto,omitempty"` |
| } |
| |
| const ( |
| knativeHistoryHeader = "ce-knativehistory" |
| ) |
| |
| func newKnativeTrait() Trait { |
| t := &knativeTrait{ |
| BaseTrait: NewBaseTrait("knative", 400), |
| } |
| |
| return t |
| } |
| |
| // IsAllowedInProfile overrides default |
| func (t *knativeTrait) IsAllowedInProfile(profile v1.TraitProfile) bool { |
| return profile == v1.TraitProfileKnative |
| } |
| |
| func (t *knativeTrait) Configure(e *Environment) (bool, error) { |
| if t.Enabled != nil && !*t.Enabled { |
| return false, nil |
| } |
| |
| if !e.IntegrationInPhase(v1.IntegrationPhaseInitialization, v1.IntegrationPhaseDeploying, v1.IntegrationPhaseRunning) { |
| return false, nil |
| } |
| |
| if t.Auto == nil || *t.Auto { |
| if len(t.ChannelSources) == 0 { |
| items := make([]string, 0) |
| sources, err := kubernetes.ResolveIntegrationSources(e.C, e.Client, e.Integration, e.Resources) |
| if err != nil { |
| return false, err |
| } |
| metadata.Each(e.CamelCatalog, sources, func(_ int, meta metadata.IntegrationMetadata) bool { |
| items = append(items, knativeutil.FilterURIs(meta.FromURIs, knativeapi.CamelServiceTypeChannel)...) |
| return true |
| }) |
| |
| t.ChannelSources = items |
| } |
| if len(t.ChannelSinks) == 0 { |
| items := make([]string, 0) |
| sources, err := kubernetes.ResolveIntegrationSources(e.C, e.Client, e.Integration, e.Resources) |
| if err != nil { |
| return false, err |
| } |
| metadata.Each(e.CamelCatalog, sources, func(_ int, meta metadata.IntegrationMetadata) bool { |
| items = append(items, knativeutil.FilterURIs(meta.ToURIs, knativeapi.CamelServiceTypeChannel)...) |
| return true |
| }) |
| |
| t.ChannelSinks = items |
| } |
| if len(t.EndpointSources) == 0 { |
| items := make([]string, 0) |
| sources, err := kubernetes.ResolveIntegrationSources(e.C, e.Client, e.Integration, e.Resources) |
| if err != nil { |
| return false, err |
| } |
| metadata.Each(e.CamelCatalog, sources, func(_ int, meta metadata.IntegrationMetadata) bool { |
| items = append(items, knativeutil.FilterURIs(meta.FromURIs, knativeapi.CamelServiceTypeEndpoint)...) |
| return true |
| }) |
| |
| t.EndpointSources = items |
| } |
| if len(t.EndpointSinks) == 0 { |
| items := make([]string, 0) |
| sources, err := kubernetes.ResolveIntegrationSources(e.C, e.Client, e.Integration, e.Resources) |
| if err != nil { |
| return false, err |
| } |
| metadata.Each(e.CamelCatalog, sources, func(_ int, meta metadata.IntegrationMetadata) bool { |
| items = append(items, knativeutil.FilterURIs(meta.ToURIs, knativeapi.CamelServiceTypeEndpoint)...) |
| return true |
| }) |
| |
| t.EndpointSinks = items |
| } |
| if len(t.EventSources) == 0 { |
| items := make([]string, 0) |
| sources, err := kubernetes.ResolveIntegrationSources(e.C, e.Client, e.Integration, e.Resources) |
| if err != nil { |
| return false, err |
| } |
| metadata.Each(e.CamelCatalog, sources, func(_ int, meta metadata.IntegrationMetadata) bool { |
| items = append(items, knativeutil.FilterURIs(meta.FromURIs, knativeapi.CamelServiceTypeEvent)...) |
| return true |
| }) |
| |
| t.EventSources = items |
| } |
| if len(t.EventSinks) == 0 { |
| items := make([]string, 0) |
| sources, err := kubernetes.ResolveIntegrationSources(e.C, e.Client, e.Integration, e.Resources) |
| if err != nil { |
| return false, err |
| } |
| metadata.Each(e.CamelCatalog, sources, func(_ int, meta metadata.IntegrationMetadata) bool { |
| items = append(items, knativeutil.FilterURIs(meta.ToURIs, knativeapi.CamelServiceTypeEvent)...) |
| return true |
| }) |
| |
| t.EventSinks = items |
| } |
| if len(t.ChannelSources) > 1 { |
| // Always filter channels when the integration subscribes to more than one |
| // Using Knative experimental header: https://github.com/knative/eventing/blob/7df0cc56c28d58223ff25d5ddfb487fa8c29a004/pkg/provisioners/message.go#L28 |
| // TODO: filter automatically all source channels when the feature becomes stable |
| filter := true |
| t.FilterSourceChannels = &filter |
| } |
| if t.SinkBinding == nil { |
| allowed := t.isSinkBindingAllowed(e) |
| t.SinkBinding = &allowed |
| } |
| } |
| |
| return true, nil |
| } |
| |
| func (t *knativeTrait) Apply(e *Environment) error { |
| // To be removed when Knative CamelSources < 0.15 will no longer be supported |
| // Older versions of Knative Sources use a loader rather than an interceptor |
| if t.CamelSourceCompat == nil || *t.CamelSourceCompat { |
| for i, s := range e.Integration.Spec.Sources { |
| if s.Loader == "knative-source" { |
| s.Loader = "" |
| util.StringSliceUniqueAdd(&s.Interceptors, "knative-source") |
| e.Integration.Spec.Sources[i] = s |
| } |
| } |
| } |
| // End of temporary code |
| |
| if e.IntegrationInPhase(v1.IntegrationPhaseInitialization) { |
| // Interceptor may have been set by a Knative CamelSource |
| if util.StringSliceExists(e.getAllInterceptors(), "knative-source") { |
| // Adding required libraries for Camel sources |
| util.StringSliceUniqueAdd(&e.Integration.Status.Dependencies, "mvn:org.apache.camel.k/camel-knative") |
| util.StringSliceUniqueAdd(&e.Integration.Status.Dependencies, "mvn:org.apache.camel.k/camel-k-knative") |
| util.StringSliceUniqueAdd(&e.Integration.Status.Dependencies, "mvn:org.apache.camel.k/camel-k-knative-producer") |
| } |
| } |
| |
| if t.SinkBinding != nil && *t.SinkBinding { |
| util.StringSliceUniqueAdd(&e.Integration.Status.Dependencies, "mvn:org.apache.camel.k/camel-k-knative") |
| } |
| |
| if len(t.ChannelSources) > 0 || len(t.EndpointSources) > 0 || len(t.EventSources) > 0 { |
| util.StringSliceUniqueAdd(&e.Integration.Status.Capabilities, v1.CapabilityPlatformHTTP) |
| } |
| if len(t.ChannelSinks) > 0 || len(t.EndpointSinks) > 0 || len(t.EventSinks) > 0 { |
| util.StringSliceUniqueAdd(&e.Integration.Status.Capabilities, v1.CapabilityPlatformHTTP) |
| } |
| |
| if e.IntegrationInPhase(v1.IntegrationPhaseDeploying, v1.IntegrationPhaseRunning) { |
| env := knativeapi.NewCamelEnvironment() |
| if t.Configuration != "" { |
| if err := env.Deserialize(t.Configuration); err != nil { |
| return err |
| } |
| } |
| |
| // Convert deprecated Host and Port fields to URL field |
| // Can be removed once CamelSource controller migrate to the new API |
| for i, service := range env.Services { |
| if service.URL == "" { |
| URL := "http://" + service.Host |
| if service.Port != nil { |
| URL = URL + ":" + strconv.Itoa(*service.Port) |
| } |
| service.URL = URL |
| service.Host = "" |
| service.Port = nil |
| env.Services[i] = service |
| } |
| } |
| |
| if err := t.configureChannels(e, &env); err != nil { |
| return err |
| } |
| if err := t.configureEndpoints(e, &env); err != nil { |
| return err |
| } |
| if err := t.configureEvents(e, &env); err != nil { |
| return err |
| } |
| if err := t.configureSinkBinding(e, &env); err != nil { |
| return err |
| } |
| |
| conf, err := env.Serialize() |
| if err != nil { |
| return errors.Wrap(err, "unable to fetch environment configuration") |
| } |
| |
| envvar.SetVal(&e.EnvVars, "CAMEL_KNATIVE_CONFIGURATION", conf) |
| } |
| |
| return nil |
| } |
| |
| func (t *knativeTrait) configureChannels(e *Environment, env *knativeapi.CamelEnvironment) error { |
| // Sources |
| err := t.ifServiceMissingDo(e, env, t.ChannelSources, knativeapi.CamelServiceTypeChannel, knativeapi.CamelEndpointKindSource, |
| func(ref *corev1.ObjectReference, serviceURI string, urlProvider func() (*url.URL, error)) error { |
| loc, err := urlProvider() |
| if err != nil { |
| return err |
| } |
| meta := map[string]string{ |
| knativeapi.CamelMetaServicePath: "/", |
| knativeapi.CamelMetaEndpointKind: string(knativeapi.CamelEndpointKindSource), |
| knativeapi.CamelMetaKnativeAPIVersion: ref.APIVersion, |
| knativeapi.CamelMetaKnativeKind: ref.Kind, |
| knativeapi.CamelMetaKnativeReply: "false", |
| } |
| if t.FilterSourceChannels != nil && *t.FilterSourceChannels { |
| meta[knativeapi.CamelMetaFilterPrefix+knativeHistoryHeader] = loc.Host |
| } |
| svc := knativeapi.CamelServiceDefinition{ |
| Name: ref.Name, |
| ServiceType: knativeapi.CamelServiceTypeChannel, |
| Metadata: meta, |
| } |
| env.Services = append(env.Services, svc) |
| |
| if err := t.createSubscription(e, ref); err != nil { |
| return err |
| } |
| |
| return nil |
| }) |
| if err != nil { |
| return err |
| } |
| |
| if t.SinkBinding == nil || !*t.SinkBinding { |
| // Sinks |
| err = t.ifServiceMissingDo(e, env, t.ChannelSinks, knativeapi.CamelServiceTypeChannel, knativeapi.CamelEndpointKindSink, |
| func(ref *corev1.ObjectReference, serviceURI string, urlProvider func() (*url.URL, error)) error { |
| loc, err := urlProvider() |
| if err != nil { |
| return err |
| } |
| svc, err := knativeapi.BuildCamelServiceDefinition(ref.Name, knativeapi.CamelEndpointKindSink, |
| knativeapi.CamelServiceTypeChannel, *loc, ref.APIVersion, ref.Kind) |
| if err != nil { |
| return err |
| } |
| env.Services = append(env.Services, svc) |
| return nil |
| }) |
| if err != nil { |
| return err |
| } |
| } |
| |
| return nil |
| } |
| |
| func (t *knativeTrait) createSubscription(e *Environment, ref *corev1.ObjectReference) error { |
| if ref.Namespace == "" { |
| ref.Namespace = e.Integration.Namespace |
| } |
| sub := knativeutil.CreateSubscription(*ref, e.Integration.Name) |
| e.Resources.Add(sub) |
| return nil |
| } |
| |
| func (t *knativeTrait) configureEndpoints(e *Environment, env *knativeapi.CamelEnvironment) error { |
| // Sources |
| serviceSources := t.extractServices(t.EndpointSources, knativeapi.CamelServiceTypeEndpoint) |
| for _, endpoint := range serviceSources { |
| ref, err := knativeutil.ExtractObjectReference(endpoint) |
| if err != nil { |
| return err |
| } |
| if env.ContainsService(endpoint, knativeapi.CamelEndpointKindSource, knativeapi.CamelServiceTypeEndpoint, |
| serving.SchemeGroupVersion.String(), "Service") { |
| continue |
| } |
| svc := knativeapi.CamelServiceDefinition{ |
| Name: ref.Name, |
| ServiceType: knativeapi.CamelServiceTypeEndpoint, |
| Metadata: map[string]string{ |
| knativeapi.CamelMetaServicePath: "/", |
| knativeapi.CamelMetaEndpointKind: string(knativeapi.CamelEndpointKindSource), |
| knativeapi.CamelMetaKnativeAPIVersion: serving.SchemeGroupVersion.String(), |
| knativeapi.CamelMetaKnativeKind: "Service", |
| // knative.reply is left to default ("true") in case of simple service |
| }, |
| } |
| env.Services = append(env.Services, svc) |
| } |
| |
| // Sinks |
| if t.SinkBinding == nil || !*t.SinkBinding { |
| err := t.ifServiceMissingDo(e, env, t.EndpointSinks, knativeapi.CamelServiceTypeEndpoint, knativeapi.CamelEndpointKindSink, |
| func(ref *corev1.ObjectReference, serviceURI string, urlProvider func() (*url.URL, error)) error { |
| loc, err := urlProvider() |
| if err != nil { |
| return err |
| } |
| svc, err := knativeapi.BuildCamelServiceDefinition(ref.Name, knativeapi.CamelEndpointKindSink, |
| knativeapi.CamelServiceTypeEndpoint, *loc, ref.APIVersion, ref.Kind) |
| if err != nil { |
| return err |
| } |
| env.Services = append(env.Services, svc) |
| return nil |
| }) |
| if err != nil { |
| return err |
| } |
| } |
| |
| return nil |
| } |
| |
| func (t *knativeTrait) configureEvents(e *Environment, env *knativeapi.CamelEnvironment) error { |
| // Sources |
| err := t.withServiceDo(false, e, env, t.EventSources, knativeapi.CamelServiceTypeEvent, knativeapi.CamelEndpointKindSource, |
| func(ref *corev1.ObjectReference, serviceURI string, _ func() (*url.URL, error)) error { |
| // Iterate over all, without skipping duplicates |
| eventType := knativeutil.ExtractEventType(serviceURI) |
| t.createTrigger(e, ref, eventType) |
| |
| if !env.ContainsService(ref.Name, knativeapi.CamelEndpointKindSource, knativeapi.CamelServiceTypeEvent, ref.APIVersion, ref.Kind) { |
| svc := knativeapi.CamelServiceDefinition{ |
| Name: ref.Name, |
| ServiceType: knativeapi.CamelServiceTypeEvent, |
| Metadata: map[string]string{ |
| knativeapi.CamelMetaServicePath: "/", |
| knativeapi.CamelMetaEndpointKind: string(knativeapi.CamelEndpointKindSource), |
| knativeapi.CamelMetaKnativeAPIVersion: ref.APIVersion, |
| knativeapi.CamelMetaKnativeKind: ref.Kind, |
| knativeapi.CamelMetaKnativeReply: "false", |
| }, |
| } |
| env.Services = append(env.Services, svc) |
| } |
| |
| return nil |
| }) |
| if err != nil { |
| return err |
| } |
| |
| // Sinks |
| if t.SinkBinding == nil || !*t.SinkBinding { |
| err = t.ifServiceMissingDo(e, env, t.EventSinks, knativeapi.CamelServiceTypeEvent, knativeapi.CamelEndpointKindSink, |
| func(ref *corev1.ObjectReference, serviceURI string, urlProvider func() (*url.URL, error)) error { |
| loc, err := urlProvider() |
| if err != nil { |
| return err |
| } |
| svc, err := knativeapi.BuildCamelServiceDefinition(ref.Name, knativeapi.CamelEndpointKindSink, |
| knativeapi.CamelServiceTypeEvent, *loc, ref.APIVersion, ref.Kind) |
| if err != nil { |
| return err |
| } |
| env.Services = append(env.Services, svc) |
| return nil |
| }) |
| if err != nil { |
| return err |
| } |
| } |
| |
| return nil |
| } |
| |
| func (t *knativeTrait) isSinkBindingAllowed(e *Environment) bool { |
| services := t.extractServices(t.ChannelSinks, knativeapi.CamelServiceTypeChannel) |
| services = append(services, t.extractServices(t.EndpointSinks, knativeapi.CamelServiceTypeEndpoint)...) |
| services = append(services, t.extractServices(t.EventSinks, knativeapi.CamelServiceTypeEvent)...) |
| |
| if len(services) != 1 { |
| return false |
| } |
| |
| for _, owner := range e.Integration.OwnerReferences { |
| if strings.Contains(owner.APIVersion, "sources.knative.dev") { |
| return false |
| } |
| } |
| return true |
| } |
| |
| func (t *knativeTrait) configureSinkBinding(e *Environment, env *knativeapi.CamelEnvironment) error { |
| if t.SinkBinding == nil || !*t.SinkBinding { |
| return nil |
| } |
| var serviceType knativeapi.CamelServiceType |
| services := t.extractServices(t.ChannelSinks, knativeapi.CamelServiceTypeChannel) |
| if len(services) > 0 { |
| serviceType = knativeapi.CamelServiceTypeChannel |
| } |
| services = append(services, t.extractServices(t.EndpointSinks, knativeapi.CamelServiceTypeEndpoint)...) |
| if len(serviceType) == 0 && len(services) > 0 { |
| serviceType = knativeapi.CamelServiceTypeEndpoint |
| } |
| services = append(services, t.extractServices(t.EventSinks, knativeapi.CamelServiceTypeEvent)...) |
| if len(serviceType) == 0 && len(services) > 0 { |
| serviceType = knativeapi.CamelServiceTypeEvent |
| } |
| |
| if len(services) != 1 { |
| return fmt.Errorf("sinkbinding can only be used with a single sink: found %d sinks", len(services)) |
| } |
| |
| err := t.withServiceDo(false, e, env, services, serviceType, knativeapi.CamelEndpointKindSink, func(ref *corev1.ObjectReference, serviceURI string, _ func() (*url.URL, error)) error { |
| e.ApplicationProperties["camel.k.customizer.sinkbinding.enabled"] = "true" |
| e.ApplicationProperties["camel.k.customizer.sinkbinding.name"] = ref.Name |
| e.ApplicationProperties["camel.k.customizer.sinkbinding.type"] = string(serviceType) |
| e.ApplicationProperties["camel.k.customizer.sinkbinding.kind"] = ref.Kind |
| e.ApplicationProperties["camel.k.customizer.sinkbinding.api-version"] = ref.APIVersion |
| |
| if e.IntegrationInPhase(v1.IntegrationPhaseDeploying) { |
| e.PostStepProcessors = append(e.PostStepProcessors, func(e *Environment) error { |
| sinkBindingInjected := false |
| e.Resources.Visit(func(object runtime.Object) { |
| gvk := object.GetObjectKind().GroupVersionKind() |
| if gvk.Kind == "SinkBinding" && strings.Contains(gvk.Group, "knative") { |
| sinkBindingInjected = true |
| } |
| }) |
| if sinkBindingInjected { |
| return nil |
| } |
| |
| controller := e.Resources.GetController(func(object runtime.Object) bool { |
| return true |
| }) |
| if controller != nil && !reflect.ValueOf(controller).IsNil() { |
| gvk := controller.GetObjectKind().GroupVersionKind() |
| av, k := gvk.ToAPIVersionAndKind() |
| source := corev1.ObjectReference{ |
| Kind: k, |
| Namespace: e.Integration.Namespace, |
| Name: e.Integration.Name, |
| APIVersion: av, |
| } |
| target := corev1.ObjectReference{ |
| Kind: ref.Kind, |
| Namespace: e.Integration.Namespace, |
| Name: ref.Name, |
| APIVersion: ref.APIVersion, |
| } |
| e.Resources.AddFirst(knativeutil.CreateSinkBinding(source, target)) |
| } |
| return nil |
| }) |
| } |
| return nil |
| }) |
| |
| return err |
| } |
| |
| func (t *knativeTrait) createTrigger(e *Environment, ref *corev1.ObjectReference, eventType string) { |
| // TODO extend to additional filters too, to filter them at source and not at destination |
| found := e.Resources.HasKnativeTrigger(func(trigger *eventing.Trigger) bool { |
| return trigger.Spec.Broker == ref.Name && |
| trigger.Spec.Filter != nil && |
| trigger.Spec.Filter.Attributes["type"] == eventType |
| }) |
| if !found { |
| if ref.Namespace == "" { |
| ref.Namespace = e.Integration.Namespace |
| } |
| trigger := knativeutil.CreateTrigger(*ref, e.Integration.Name, eventType) |
| e.Resources.Add(trigger) |
| } |
| } |
| |
| func (t *knativeTrait) ifServiceMissingDo( |
| e *Environment, |
| env *knativeapi.CamelEnvironment, |
| serviceURIs []string, |
| serviceType knativeapi.CamelServiceType, |
| endpointKind knativeapi.CamelEndpointKind, |
| gen func(ref *corev1.ObjectReference, serviceURI string, urlProvider func() (*url.URL, error)) error) error { |
| return t.withServiceDo(true, e, env, serviceURIs, serviceType, endpointKind, gen) |
| } |
| |
| func (t *knativeTrait) withServiceDo( |
| skipDuplicates bool, |
| e *Environment, |
| env *knativeapi.CamelEnvironment, |
| serviceURIs []string, |
| serviceType knativeapi.CamelServiceType, |
| endpointKind knativeapi.CamelEndpointKind, |
| gen func(ref *corev1.ObjectReference, serviceURI string, urlProvider func() (*url.URL, error)) error) error { |
| |
| for _, serviceURI := range t.extractServices(serviceURIs, serviceType) { |
| ref, err := knativeutil.ExtractObjectReference(serviceURI) |
| if err != nil { |
| return err |
| } |
| if skipDuplicates && env.ContainsService(ref.Name, endpointKind, serviceType, ref.APIVersion, ref.Kind) { |
| continue |
| } |
| possibleRefs := knativeutil.FillMissingReferenceData(serviceType, ref) |
| var actualRef *corev1.ObjectReference |
| if len(possibleRefs) == 1 { |
| actualRef = &possibleRefs[0] |
| } else { |
| actualRef, err = knativeutil.GetAddressableReference(t.Ctx, t.Client, possibleRefs, e.Integration.Namespace, ref.Name) |
| if err != nil && k8serrors.IsNotFound(err) { |
| return errors.Errorf("cannot find %s", serviceType.ResourceDescription(ref.Name)) |
| } else if err != nil { |
| return errors.Wrapf(err, "error looking up %s", serviceType.ResourceDescription(ref.Name)) |
| } |
| } |
| |
| urlProvider := func() (*url.URL, error) { |
| targetURL, err := knativeutil.GetSinkURL(t.Ctx, t.Client, actualRef, e.Integration.Namespace) |
| if err != nil { |
| return nil, errors.Wrapf(err, "cannot determine address of %s", serviceType.ResourceDescription(ref.Name)) |
| } |
| t.L.Infof("Found URL for %s: %s", serviceType.ResourceDescription(ref.Name), targetURL.String()) |
| return targetURL, nil |
| } |
| |
| err = gen(actualRef, serviceURI, urlProvider) |
| if err != nil { |
| return errors.Wrapf(err, "unexpected error while executing handler for %s", serviceType.ResourceDescription(ref.Name)) |
| } |
| } |
| return nil |
| } |
| |
| func (t *knativeTrait) extractServices(names []string, serviceType knativeapi.CamelServiceType) []string { |
| answer := make([]string, 0) |
| for _, item := range names { |
| i := strings.Trim(item, " \t\"") |
| if i != "" { |
| i = knativeutil.NormalizeToURI(serviceType, i) |
| answer = append(answer, i) |
| } |
| } |
| return answer |
| } |