blob: 5ff9c46cd66e0432e47d6412f91c385833095155 [file] [log] [blame]
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
See the License for the specific language governing permissions and
limitations under the License.
package trait
import (
v1 ""
knativeapi ""
knativeutil ""
corev1 ""
k8serrors ""
eventing ""
serving ""
// The Knative trait automatically discovers addresses of Knative resources and inject them into the
// running integration.
// The full Knative configuration is injected in the CAMEL_KNATIVE_CONFIGURATION in JSON format.
// The Camel Knative component will then use the full configuration to configure the routes.
// The trait is enabled by default when the Knative profile is active.
// +camel-k:trait=knative
type knativeTrait struct {
BaseTrait `property:",squash"`
// Can be used to inject a Knative complete configuration in JSON format.
Configuration string `property:"configuration" json:"configuration,omitempty"`
// List of channels used as source of integration routes.
// Can contain simple channel names or full Camel URIs.
ChannelSources []string `property:"channel-sources" json:"channelSources,omitempty"`
// List of channels used as destination of integration routes.
// Can contain simple channel names or full Camel URIs.
ChannelSinks []string `property:"channel-sinks" json:"channelSinks,omitempty"`
// List of channels used as source of integration routes.
EndpointSources []string `property:"endpoint-sources" json:"endpointSources,omitempty"`
// List of endpoints used as destination of integration routes.
// Can contain simple endpoint names or full Camel URIs.
EndpointSinks []string `property:"endpoint-sinks" json:"endpointSinks,omitempty"`
// List of event types that the integration will be subscribed to.
// Can contain simple event types or full Camel URIs (to use a specific broker different from "default").
EventSources []string `property:"event-sources" json:"eventSources,omitempty"`
// List of event types that the integration will produce.
// Can contain simple event types or full Camel URIs (to use a specific broker).
EventSinks []string `property:"event-sinks" json:"eventSinks,omitempty"`
// Enables filtering on events based on the header "ce-knativehistory". Since this is an experimental header
// that can be removed in a future version of Knative, filtering is enabled only when the integration is
// listening from more than 1 channel.
FilterSourceChannels *bool `property:"filter-source-channels" json:"filterSourceChannels,omitempty"`
// Enables Knative CamelSource pre 0.15 compatibility fixes (will be removed in future versions).
CamelSourceCompat *bool `property:"camel-source-compat" json:"camelSourceCompat,omitempty"`
// Allows binding the integration to a sink via a Knative SinkBinding resource.
// This can be used when the integration targets a single sink.
// It's disabled by default.
SinkBinding *bool `property:"sink-binding" json:"sinkBinding,omitempty"`
// Enable automatic discovery of all trait properties.
Auto *bool `property:"auto" json:"auto,omitempty"`
const (
knativeHistoryHeader = "ce-knativehistory"
func newKnativeTrait() Trait {
t := &knativeTrait{
BaseTrait: NewBaseTrait("knative", 400),
return t
// IsAllowedInProfile overrides default
func (t *knativeTrait) IsAllowedInProfile(profile v1.TraitProfile) bool {
return profile == v1.TraitProfileKnative
func (t *knativeTrait) Configure(e *Environment) (bool, error) {
if t.Enabled != nil && !*t.Enabled {
return false, nil
if !e.IntegrationInPhase(v1.IntegrationPhaseInitialization, v1.IntegrationPhaseDeploying, v1.IntegrationPhaseRunning) {
return false, nil
if t.Auto == nil || *t.Auto {
if len(t.ChannelSources) == 0 {
items := make([]string, 0)
sources, err := kubernetes.ResolveIntegrationSources(e.C, e.Client, e.Integration, e.Resources)
if err != nil {
return false, err
metadata.Each(e.CamelCatalog, sources, func(_ int, meta metadata.IntegrationMetadata) bool {
items = append(items, knativeutil.FilterURIs(meta.FromURIs, knativeapi.CamelServiceTypeChannel)...)
return true
t.ChannelSources = items
if len(t.ChannelSinks) == 0 {
items := make([]string, 0)
sources, err := kubernetes.ResolveIntegrationSources(e.C, e.Client, e.Integration, e.Resources)
if err != nil {
return false, err
metadata.Each(e.CamelCatalog, sources, func(_ int, meta metadata.IntegrationMetadata) bool {
items = append(items, knativeutil.FilterURIs(meta.ToURIs, knativeapi.CamelServiceTypeChannel)...)
return true
t.ChannelSinks = items
if len(t.EndpointSources) == 0 {
items := make([]string, 0)
sources, err := kubernetes.ResolveIntegrationSources(e.C, e.Client, e.Integration, e.Resources)
if err != nil {
return false, err
metadata.Each(e.CamelCatalog, sources, func(_ int, meta metadata.IntegrationMetadata) bool {
items = append(items, knativeutil.FilterURIs(meta.FromURIs, knativeapi.CamelServiceTypeEndpoint)...)
return true
t.EndpointSources = items
if len(t.EndpointSinks) == 0 {
items := make([]string, 0)
sources, err := kubernetes.ResolveIntegrationSources(e.C, e.Client, e.Integration, e.Resources)
if err != nil {
return false, err
metadata.Each(e.CamelCatalog, sources, func(_ int, meta metadata.IntegrationMetadata) bool {
items = append(items, knativeutil.FilterURIs(meta.ToURIs, knativeapi.CamelServiceTypeEndpoint)...)
return true
t.EndpointSinks = items
if len(t.EventSources) == 0 {
items := make([]string, 0)
sources, err := kubernetes.ResolveIntegrationSources(e.C, e.Client, e.Integration, e.Resources)
if err != nil {
return false, err
metadata.Each(e.CamelCatalog, sources, func(_ int, meta metadata.IntegrationMetadata) bool {
items = append(items, knativeutil.FilterURIs(meta.FromURIs, knativeapi.CamelServiceTypeEvent)...)
return true
t.EventSources = items
if len(t.EventSinks) == 0 {
items := make([]string, 0)
sources, err := kubernetes.ResolveIntegrationSources(e.C, e.Client, e.Integration, e.Resources)
if err != nil {
return false, err
metadata.Each(e.CamelCatalog, sources, func(_ int, meta metadata.IntegrationMetadata) bool {
items = append(items, knativeutil.FilterURIs(meta.ToURIs, knativeapi.CamelServiceTypeEvent)...)
return true
t.EventSinks = items
if len(t.ChannelSources) > 1 {
// Always filter channels when the integration subscribes to more than one
// Using Knative experimental header:
// TODO: filter automatically all source channels when the feature becomes stable
filter := true
t.FilterSourceChannels = &filter
return true, nil
func (t *knativeTrait) Apply(e *Environment) error {
// To be removed when Knative CamelSources < 0.15 will no longer be supported
// Older versions of Knative Sources use a loader rather than an interceptor
if t.CamelSourceCompat == nil || *t.CamelSourceCompat {
for i, s := range e.Integration.Spec.Sources {
if s.Loader == "knative-source" {
s.Loader = ""
util.StringSliceUniqueAdd(&s.Interceptors, "knative-source")
e.Integration.Spec.Sources[i] = s
// End of temporary code
if e.IntegrationInPhase(v1.IntegrationPhaseInitialization) {
// Interceptor may have been set by a Knative CamelSource
if util.StringSliceExists(e.getAllInterceptors(), "knative-source") {
// Adding required libraries for Camel sources
util.StringSliceUniqueAdd(&e.Integration.Status.Dependencies, "mvn:org.apache.camel.k/camel-knative")
util.StringSliceUniqueAdd(&e.Integration.Status.Dependencies, "mvn:org.apache.camel.k/camel-k-runtime-knative")
// Adding platform HTTP
util.StringSliceUniqueAdd(&e.Integration.Status.Capabilities, v1.CapabilityPlatformHTTP)
if t.SinkBinding != nil && *t.SinkBinding {
util.StringSliceUniqueAdd(&e.Integration.Status.Dependencies, "mvn:org.apache.camel.k/camel-k-runtime-knative")
if len(t.ChannelSources) > 0 || len(t.EndpointSources) > 0 || len(t.EventSources) > 0 {
util.StringSliceUniqueAdd(&e.Integration.Status.Capabilities, v1.CapabilityPlatformHTTP)
if len(t.ChannelSinks) > 0 || len(t.EndpointSinks) > 0 || len(t.EventSinks) > 0 {
util.StringSliceUniqueAdd(&e.Integration.Status.Capabilities, v1.CapabilityPlatformHTTP)
if e.IntegrationInPhase(v1.IntegrationPhaseDeploying, v1.IntegrationPhaseRunning) {
env := knativeapi.NewCamelEnvironment()
if t.Configuration != "" {
if err := env.Deserialize(t.Configuration); err != nil {
return err
if err := t.configureChannels(e, &env); err != nil {
return err
if err := t.configureEndpoints(e, &env); err != nil {
return err
if err := t.configureEvents(e, &env); err != nil {
return err
if err := t.configureSinkBinding(e, &env); err != nil {
return err
conf, err := env.Serialize()
if err != nil {
return errors.Wrap(err, "unable to fetch environment configuration")
envvar.SetVal(&e.EnvVars, "CAMEL_KNATIVE_CONFIGURATION", conf)
return nil
func (t *knativeTrait) configureChannels(e *Environment, env *knativeapi.CamelEnvironment) error {
// Sources
err := t.ifServiceMissingDo(e, env, t.ChannelSources, knativeapi.CamelServiceTypeChannel, knativeapi.CamelEndpointKindSource,
func(ref *corev1.ObjectReference, serviceURI string, urlProvider func() (*url.URL, error)) error {
loc, err := urlProvider()
if err != nil {
return err
meta := map[string]string{
knativeapi.CamelMetaServicePath: "/",
knativeapi.CamelMetaEndpointKind: string(knativeapi.CamelEndpointKindSource),
knativeapi.CamelMetaKnativeAPIVersion: ref.APIVersion,
knativeapi.CamelMetaKnativeKind: ref.Kind,
knativeapi.CamelMetaKnativeReply: "false",
if t.FilterSourceChannels != nil && *t.FilterSourceChannels {
meta[knativeapi.CamelMetaFilterPrefix+knativeHistoryHeader] = loc.Host
svc := knativeapi.CamelServiceDefinition{
Name: ref.Name,
ServiceType: knativeapi.CamelServiceTypeChannel,
Metadata: meta,
env.Services = append(env.Services, svc)
if err := t.createSubscription(e, ref); err != nil {
return err
return nil
if err != nil {
return err
if t.SinkBinding == nil || !*t.SinkBinding {
// Sinks
err = t.ifServiceMissingDo(e, env, t.ChannelSinks, knativeapi.CamelServiceTypeChannel, knativeapi.CamelEndpointKindSink,
func(ref *corev1.ObjectReference, serviceURI string, urlProvider func() (*url.URL, error)) error {
loc, err := urlProvider()
if err != nil {
return err
svc, err := knativeapi.BuildCamelServiceDefinition(ref.Name, knativeapi.CamelEndpointKindSink,
knativeapi.CamelServiceTypeChannel, *loc, ref.APIVersion, ref.Kind)
if err != nil {
return err
env.Services = append(env.Services, svc)
return nil
if err != nil {
return err
return nil
func (t *knativeTrait) createSubscription(e *Environment, ref *corev1.ObjectReference) error {
sub := knativeutil.CreateSubscription(*ref, e.Integration.Name)
return nil
func (t *knativeTrait) configureEndpoints(e *Environment, env *knativeapi.CamelEnvironment) error {
// Sources
serviceSources := t.extractServices(t.EndpointSources, knativeapi.CamelServiceTypeEndpoint)
for _, endpoint := range serviceSources {
ref, err := knativeutil.ExtractObjectReference(endpoint)
if err != nil {
return err
if env.ContainsService(endpoint, knativeapi.CamelEndpointKindSource, knativeapi.CamelServiceTypeEndpoint,
serving.SchemeGroupVersion.String(), "Service") {
svc := knativeapi.CamelServiceDefinition{
Name: ref.Name,
ServiceType: knativeapi.CamelServiceTypeEndpoint,
Metadata: map[string]string{
knativeapi.CamelMetaServicePath: "/",
knativeapi.CamelMetaEndpointKind: string(knativeapi.CamelEndpointKindSource),
knativeapi.CamelMetaKnativeAPIVersion: serving.SchemeGroupVersion.String(),
knativeapi.CamelMetaKnativeKind: "Service",
// knative.reply is left to default ("true") in case of simple service
env.Services = append(env.Services, svc)
// Sinks
if t.SinkBinding == nil || !*t.SinkBinding {
err := t.ifServiceMissingDo(e, env, t.EndpointSinks, knativeapi.CamelServiceTypeEndpoint, knativeapi.CamelEndpointKindSink,
func(ref *corev1.ObjectReference, serviceURI string, urlProvider func() (*url.URL, error)) error {
loc, err := urlProvider()
if err != nil {
return err
svc, err := knativeapi.BuildCamelServiceDefinition(ref.Name, knativeapi.CamelEndpointKindSink,
knativeapi.CamelServiceTypeEndpoint, *loc, ref.APIVersion, ref.Kind)
if err != nil {
return err
env.Services = append(env.Services, svc)
return nil
if err != nil {
return err
return nil
func (t *knativeTrait) configureEvents(e *Environment, env *knativeapi.CamelEnvironment) error {
// Sources
err := t.withServiceDo(false, e, env, t.EventSources, knativeapi.CamelServiceTypeEvent, knativeapi.CamelEndpointKindSource,
func(ref *corev1.ObjectReference, serviceURI string, _ func() (*url.URL, error)) error {
// Iterate over all, without skipping duplicates
eventType := knativeutil.ExtractEventType(serviceURI)
t.createTrigger(e, ref, eventType)
if !env.ContainsService(ref.Name, knativeapi.CamelEndpointKindSource, knativeapi.CamelServiceTypeEvent, ref.APIVersion, ref.Kind) {
svc := knativeapi.CamelServiceDefinition{
Name: ref.Name,
ServiceType: knativeapi.CamelServiceTypeEvent,
Metadata: map[string]string{
knativeapi.CamelMetaServicePath: "/",
knativeapi.CamelMetaEndpointKind: string(knativeapi.CamelEndpointKindSource),
knativeapi.CamelMetaKnativeAPIVersion: ref.APIVersion,
knativeapi.CamelMetaKnativeKind: ref.Kind,
knativeapi.CamelMetaKnativeReply: "false",
env.Services = append(env.Services, svc)
return nil
if err != nil {
return err
// Sinks
if t.SinkBinding == nil || !*t.SinkBinding {
err = t.ifServiceMissingDo(e, env, t.EventSinks, knativeapi.CamelServiceTypeEvent, knativeapi.CamelEndpointKindSink,
func(ref *corev1.ObjectReference, serviceURI string, urlProvider func() (*url.URL, error)) error {
loc, err := urlProvider()
if err != nil {
return err
svc, err := knativeapi.BuildCamelServiceDefinition(ref.Name, knativeapi.CamelEndpointKindSink,
knativeapi.CamelServiceTypeEvent, *loc, ref.APIVersion, ref.Kind)
if err != nil {
return err
env.Services = append(env.Services, svc)
return nil
if err != nil {
return err
return nil
func (t *knativeTrait) configureSinkBinding(e *Environment, env *knativeapi.CamelEnvironment) error {
if t.SinkBinding == nil || !*t.SinkBinding {
return nil
var serviceType knativeapi.CamelServiceType
services := t.extractServices(t.ChannelSinks, knativeapi.CamelServiceTypeChannel)
if len(services) > 0 {
serviceType = knativeapi.CamelServiceTypeChannel
services = append(services, t.extractServices(t.EndpointSinks, knativeapi.CamelServiceTypeEndpoint)...)
if len(serviceType) == 0 && len(services) > 0 {
serviceType = knativeapi.CamelServiceTypeEndpoint
services = append(services, t.extractServices(t.EventSinks, knativeapi.CamelServiceTypeEvent)...)
if len(serviceType) == 0 && len(services) > 0 {
serviceType = knativeapi.CamelServiceTypeEvent
if len(services) != 1 {
return fmt.Errorf("sinkbinding can only be used with a single sink: found %d sinks", len(services))
err := t.withServiceDo(false, e, env, services, serviceType, knativeapi.CamelEndpointKindSink, func(ref *corev1.ObjectReference, serviceURI string, _ func() (*url.URL, error)) error {
e.ApplicationProperties["camel.k.customizer.sinkbinding.enabled"] = "true"
e.ApplicationProperties[""] = ref.Name
e.ApplicationProperties["camel.k.customizer.sinkbinding.type"] = string(serviceType)
e.ApplicationProperties["camel.k.customizer.sinkbinding.kind"] = ref.Kind
e.ApplicationProperties["camel.k.customizer.sinkbinding.api-version"] = ref.APIVersion
if e.IntegrationInPhase(v1.IntegrationPhaseDeploying) {
e.PostStepProcessors = append(e.PostStepProcessors, func(e *Environment) error {
sinkBindingInjected := false
e.Resources.Visit(func(object runtime.Object) {
gvk := object.GetObjectKind().GroupVersionKind()
if gvk.Kind == "SinkBinding" && strings.Contains(gvk.Group, "knative") {
sinkBindingInjected = true
if sinkBindingInjected {
return nil
controller := e.Resources.GetController(func(object runtime.Object) bool {
return true
if controller != nil && !reflect.ValueOf(controller).IsNil() {
gvk := controller.GetObjectKind().GroupVersionKind()
av, k := gvk.ToAPIVersionAndKind()
source := corev1.ObjectReference{
Kind: k,
Namespace: e.Integration.Namespace,
Name: e.Integration.Name,
APIVersion: av,
target := corev1.ObjectReference{
Kind: ref.Kind,
Namespace: e.Integration.Namespace,
Name: ref.Name,
APIVersion: ref.APIVersion,
e.Resources.AddFirst(knativeutil.CreateSinkBinding(source, target))
return nil
return nil
return err
func (t *knativeTrait) createTrigger(e *Environment, ref *corev1.ObjectReference, eventType string) {
// TODO extend to additional filters too, to filter them at source and not at destination
found := e.Resources.HasKnativeTrigger(func(trigger *eventing.Trigger) bool {
return trigger.Spec.Broker == ref.Name &&
trigger.Spec.Filter != nil &&
trigger.Spec.Filter.Attributes["type"] == eventType
if !found {
trigger := knativeutil.CreateTrigger(*ref, e.Integration.Name, eventType)
func (t *knativeTrait) ifServiceMissingDo(
e *Environment,
env *knativeapi.CamelEnvironment,
serviceURIs []string,
serviceType knativeapi.CamelServiceType,
endpointKind knativeapi.CamelEndpointKind,
gen func(ref *corev1.ObjectReference, serviceURI string, urlProvider func() (*url.URL, error)) error) error {
return t.withServiceDo(true, e, env, serviceURIs, serviceType, endpointKind, gen)
func (t *knativeTrait) withServiceDo(
skipDuplicates bool,
e *Environment,
env *knativeapi.CamelEnvironment,
serviceURIs []string,
serviceType knativeapi.CamelServiceType,
endpointKind knativeapi.CamelEndpointKind,
gen func(ref *corev1.ObjectReference, serviceURI string, urlProvider func() (*url.URL, error)) error) error {
for _, serviceURI := range t.extractServices(serviceURIs, serviceType) {
ref, err := knativeutil.ExtractObjectReference(serviceURI)
if err != nil {
return err
if skipDuplicates && env.ContainsService(ref.Name, endpointKind, serviceType, ref.APIVersion, ref.Kind) {
possibleRefs := knativeutil.FillMissingReferenceData(serviceType, ref)
var actualRef *corev1.ObjectReference
if len(possibleRefs) == 1 {
actualRef = &possibleRefs[0]
} else {
actualRef, err = knativeutil.GetAddressableReference(t.Ctx, t.Client, possibleRefs, e.Integration.Namespace, ref.Name)
if err != nil && k8serrors.IsNotFound(err) {
return errors.Errorf("cannot find %s", serviceType.ResourceDescription(ref.Name))
} else if err != nil {
return errors.Wrapf(err, "error looking up %s", serviceType.ResourceDescription(ref.Name))
urlProvider := func() (*url.URL, error) {
targetURL, err := knativeutil.GetSinkURL(t.Ctx, t.Client, actualRef, e.Integration.Namespace)
if err != nil {
return nil, errors.Wrapf(err, "cannot determine address of %s", serviceType.ResourceDescription(ref.Name))
t.L.Infof("Found URL for %s: %s", serviceType.ResourceDescription(ref.Name), targetURL.String())
return targetURL, nil
err = gen(actualRef, serviceURI, urlProvider)
if err != nil {
return errors.Wrapf(err, "unexpected error while executing handler for %s", serviceType.ResourceDescription(ref.Name))
return nil
func (t *knativeTrait) extractServices(names []string, serviceType knativeapi.CamelServiceType) []string {
answer := make([]string, 0)
for _, item := range names {
i := strings.Trim(item, " \t\"")
if i != "" {
i = knativeutil.NormalizeToURI(serviceType, i)
answer = append(answer, i)
return answer