blob: 8e40db54c46850421e80224cfc7d568444441b6c [file] [log] [blame]
/*
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package trait
import (
"net/url"
"strings"
"github.com/apache/camel-k/pkg/apis/camel/v1alpha1"
knativeapi "github.com/apache/camel-k/pkg/apis/camel/v1alpha1/knative"
"github.com/apache/camel-k/pkg/metadata"
"github.com/apache/camel-k/pkg/util/envvar"
knativeutil "github.com/apache/camel-k/pkg/util/knative"
"github.com/pkg/errors"
v1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
eventing "knative.dev/eventing/pkg/apis/eventing/v1alpha1"
serving "knative.dev/serving/pkg/apis/serving/v1beta1"
)
// The Knative trait automatically discovers addresses of Knative resources and inject them into the
// running integration.
//
// The full Knative configuration is injected in the CAMEL_KNATIVE_CONFIGURATION in JSON format.
// The Camel Knative component will then use the full configuration to configure the routes.
//
// The trait is enabled by default when the Knative profile is active.
//
// +camel-k:trait=knative
type knativeTrait struct {
BaseTrait `property:",squash"`
// Can be used to inject a Knative complete configuration in JSON format.
Configuration string `property:"configuration"`
// Comma-separated list of channels used as source of integration routes.
// Can contain simple channel names or full Camel URIs.
ChannelSources string `property:"channel-sources"`
// Comma-separated list of channels used as destination of integration routes.
// Can contain simple channel names or full Camel URIs.
ChannelSinks string `property:"channel-sinks"`
// Comma-separated list of channels used as source of integration routes.
EndpointSources string `property:"endpoint-sources"`
// Comma-separated list of endpoints used as destination of integration routes.
// Can contain simple endpoint names or full Camel URIs.
EndpointSinks string `property:"endpoint-sinks"`
// Comma-separated list of event types that the integration will be subscribed to.
// Can contain simple event types or full Camel URIs (to use a specific broker different from "default").
EventSources string `property:"event-sources"`
// Comma-separated list of event types that the integration will produce.
// Can contain simple event types or full Camel URIs (to use a specific broker).
EventSinks string `property:"event-sinks"`
// Enables filtering on events based on the header "ce-knativehistory". Since this is an experimental header
// that can be removed in a future version of Knative, filtering is enabled only when the integration is
// listening from more than 1 channel.
FilterSourceChannels *bool `property:"filter-source-channels"`
// Enable automatic discovery of all trait properties.
Auto *bool `property:"auto"`
}
const (
knativeHistoryHeader = "ce-knativehistory"
)
func newKnativeTrait() *knativeTrait {
t := &knativeTrait{
BaseTrait: newBaseTrait("knative"),
}
return t
}
func (t *knativeTrait) Configure(e *Environment) (bool, error) {
if t.Enabled != nil && !*t.Enabled {
return false, nil
}
if !e.IntegrationInPhase(v1alpha1.IntegrationPhaseDeploying, v1alpha1.IntegrationPhaseRunning) {
return false, nil
}
if t.Auto == nil || *t.Auto {
if t.ChannelSources == "" {
items := make([]string, 0)
metadata.Each(e.CamelCatalog, e.Integration.Spec.Sources, func(_ int, meta metadata.IntegrationMetadata) bool {
items = append(items, knativeutil.FilterURIs(meta.FromURIs, knativeapi.CamelServiceTypeChannel)...)
return true
})
t.ChannelSources = strings.Join(items, ",")
}
if t.ChannelSinks == "" {
items := make([]string, 0)
metadata.Each(e.CamelCatalog, e.Integration.Spec.Sources, func(_ int, meta metadata.IntegrationMetadata) bool {
items = append(items, knativeutil.FilterURIs(meta.ToURIs, knativeapi.CamelServiceTypeChannel)...)
return true
})
t.ChannelSinks = strings.Join(items, ",")
}
if t.EndpointSources == "" {
items := make([]string, 0)
metadata.Each(e.CamelCatalog, e.Integration.Spec.Sources, func(_ int, meta metadata.IntegrationMetadata) bool {
items = append(items, knativeutil.FilterURIs(meta.FromURIs, knativeapi.CamelServiceTypeEndpoint)...)
return true
})
t.EndpointSources = strings.Join(items, ",")
}
if t.EndpointSinks == "" {
items := make([]string, 0)
metadata.Each(e.CamelCatalog, e.Integration.Spec.Sources, func(_ int, meta metadata.IntegrationMetadata) bool {
items = append(items, knativeutil.FilterURIs(meta.ToURIs, knativeapi.CamelServiceTypeEndpoint)...)
return true
})
t.EndpointSinks = strings.Join(items, ",")
}
if t.EventSources == "" {
items := make([]string, 0)
metadata.Each(e.CamelCatalog, e.Integration.Spec.Sources, func(_ int, meta metadata.IntegrationMetadata) bool {
items = append(items, knativeutil.FilterURIs(meta.FromURIs, knativeapi.CamelServiceTypeEvent)...)
return true
})
t.EventSources = strings.Join(items, ",")
}
if t.EventSinks == "" {
items := make([]string, 0)
metadata.Each(e.CamelCatalog, e.Integration.Spec.Sources, func(_ int, meta metadata.IntegrationMetadata) bool {
items = append(items, knativeutil.FilterURIs(meta.ToURIs, knativeapi.CamelServiceTypeEvent)...)
return true
})
t.EventSinks = strings.Join(items, ",")
}
if len(strings.Split(t.ChannelSources, ",")) > 1 {
// Always filter channels when the integration subscribes to more than one
// Using Knative experimental header: https://github.com/knative/eventing/blob/7df0cc56c28d58223ff25d5ddfb487fa8c29a004/pkg/provisioners/message.go#L28
// TODO: filter automatically all source channels when the feature becomes stable
filter := true
t.FilterSourceChannels = &filter
}
}
return true, nil
}
func (t *knativeTrait) Apply(e *Environment) error {
env := knativeapi.NewCamelEnvironment()
if t.Configuration != "" {
if err := env.Deserialize(t.Configuration); err != nil {
return err
}
}
if err := t.configureChannels(e, &env); err != nil {
return err
}
if err := t.configureEndpoints(e, &env); err != nil {
return err
}
if err := t.configureEvents(e, &env); err != nil {
return err
}
conf, err := env.Serialize()
if err != nil {
return errors.Wrap(err, "unable to fetch environment configuration")
}
envvar.SetVal(&e.EnvVars, "CAMEL_KNATIVE_CONFIGURATION", conf)
return nil
}
func (t *knativeTrait) configureChannels(e *Environment, env *knativeapi.CamelEnvironment) error {
// Sources
err := t.ifServiceMissingDo(e, env, t.ChannelSources, knativeapi.CamelServiceTypeChannel, knativeapi.CamelEndpointKindSource,
func(ref *v1.ObjectReference, loc *url.URL, serviceURI string) error {
meta := map[string]string{
knativeapi.CamelMetaServicePath: "/",
knativeapi.CamelMetaEndpointKind: string(knativeapi.CamelEndpointKindSource),
knativeapi.CamelMetaKnativeAPIVersion: ref.APIVersion,
knativeapi.CamelMetaKnativeKind: ref.Kind,
}
if t.FilterSourceChannels != nil && *t.FilterSourceChannels {
meta[knativeapi.CamelMetaFilterPrefix+knativeHistoryHeader] = loc.Host
}
svc := knativeapi.CamelServiceDefinition{
Name: ref.Name,
Host: "0.0.0.0",
Port: 8080,
ServiceType: knativeapi.CamelServiceTypeChannel,
Metadata: meta,
}
env.Services = append(env.Services, svc)
if err := t.createSubscription(e, ref); err != nil {
return err
}
return nil
})
if err != nil {
return err
}
// Sinks
err = t.ifServiceMissingDo(e, env, t.ChannelSinks, knativeapi.CamelServiceTypeChannel, knativeapi.CamelEndpointKindSink,
func(ref *v1.ObjectReference, loc *url.URL, serviceURI string) error {
svc, err := knativeapi.BuildCamelServiceDefinition(ref.Name, knativeapi.CamelEndpointKindSink,
knativeapi.CamelServiceTypeChannel, *loc, ref.APIVersion, ref.Kind)
if err != nil {
return err
}
env.Services = append(env.Services, svc)
return nil
})
if err != nil {
return err
}
return nil
}
func (t *knativeTrait) createSubscription(e *Environment, ref *v1.ObjectReference) error {
sub := knativeutil.CreateSubscription(*ref, e.Integration.Name)
e.Resources.Add(sub)
return nil
}
func (t *knativeTrait) configureEndpoints(e *Environment, env *knativeapi.CamelEnvironment) error {
// Sources
serviceSources := t.extractServices(t.EndpointSources, knativeapi.CamelServiceTypeEndpoint)
for _, endpoint := range serviceSources {
ref, err := knativeutil.ExtractObjectReference(endpoint)
if err != nil {
return err
}
if env.ContainsService(endpoint, knativeapi.CamelEndpointKindSource, knativeapi.CamelServiceTypeEndpoint,
serving.SchemeGroupVersion.String(), "Service") {
continue
}
svc := knativeapi.CamelServiceDefinition{
Name: ref.Name,
Host: "0.0.0.0",
Port: 8080,
ServiceType: knativeapi.CamelServiceTypeEndpoint,
Metadata: map[string]string{
knativeapi.CamelMetaServicePath: "/",
knativeapi.CamelMetaEndpointKind: string(knativeapi.CamelEndpointKindSource),
knativeapi.CamelMetaKnativeAPIVersion: serving.SchemeGroupVersion.String(),
knativeapi.CamelMetaKnativeKind: "Service",
},
}
env.Services = append(env.Services, svc)
}
// Sinks
err := t.ifServiceMissingDo(e, env, t.EndpointSinks, knativeapi.CamelServiceTypeEndpoint, knativeapi.CamelEndpointKindSink,
func(ref *v1.ObjectReference, loc *url.URL, serviceURI string) error {
svc, err := knativeapi.BuildCamelServiceDefinition(ref.Name, knativeapi.CamelEndpointKindSink,
knativeapi.CamelServiceTypeEndpoint, *loc, ref.APIVersion, ref.Kind)
if err != nil {
return err
}
env.Services = append(env.Services, svc)
return nil
})
if err != nil {
return err
}
return nil
}
func (t *knativeTrait) configureEvents(e *Environment, env *knativeapi.CamelEnvironment) error {
// Sources
err := t.withServiceDo(false, e, env, t.EventSources, knativeapi.CamelServiceTypeEvent, knativeapi.CamelEndpointKindSource,
func(ref *v1.ObjectReference, loc *url.URL, serviceURI string) error {
// Iterate over all, without skipping duplicates
eventType := knativeutil.ExtractEventType(serviceURI)
t.createTrigger(e, ref, eventType)
if !env.ContainsService(ref.Name, knativeapi.CamelEndpointKindSource, knativeapi.CamelServiceTypeEvent, ref.APIVersion, ref.Kind) {
svc := knativeapi.CamelServiceDefinition{
Name: ref.Name,
Host: "0.0.0.0",
Port: 8080,
ServiceType: knativeapi.CamelServiceTypeEvent,
Metadata: map[string]string{
knativeapi.CamelMetaServicePath: "/",
knativeapi.CamelMetaEndpointKind: string(knativeapi.CamelEndpointKindSource),
knativeapi.CamelMetaKnativeAPIVersion: ref.APIVersion,
knativeapi.CamelMetaKnativeKind: ref.Kind,
},
}
env.Services = append(env.Services, svc)
}
return nil
})
if err != nil {
return err
}
// Sinks
err = t.ifServiceMissingDo(e, env, t.EventSinks, knativeapi.CamelServiceTypeEvent, knativeapi.CamelEndpointKindSink,
func(ref *v1.ObjectReference, loc *url.URL, serviceURI string) error {
svc, err := knativeapi.BuildCamelServiceDefinition(ref.Name, knativeapi.CamelEndpointKindSink,
knativeapi.CamelServiceTypeEvent, *loc, ref.APIVersion, ref.Kind)
if err != nil {
return err
}
env.Services = append(env.Services, svc)
return nil
})
if err != nil {
return err
}
return nil
}
func (t *knativeTrait) createTrigger(e *Environment, ref *v1.ObjectReference, eventType string) {
// TODO extend to additional filters too, to filter them at source and not at destination
found := e.Resources.HasKnativeTrigger(func(trigger *eventing.Trigger) bool {
return trigger.Spec.Broker == ref.Name &&
trigger.Spec.Filter != nil &&
trigger.Spec.Filter.Attributes != nil &&
(*trigger.Spec.Filter.Attributes)["type"] == eventType
})
if !found {
trigger := knativeutil.CreateTrigger(*ref, e.Integration.Name, eventType)
e.Resources.Add(trigger)
}
}
func (t *knativeTrait) ifServiceMissingDo(
e *Environment,
env *knativeapi.CamelEnvironment,
serviceURIsAsString string,
serviceType knativeapi.CamelServiceType,
endpointKind knativeapi.CamelEndpointKind,
gen func(ref *v1.ObjectReference, url *url.URL, serviceURI string) error) error {
return t.withServiceDo(true, e, env, serviceURIsAsString, serviceType, endpointKind, gen)
}
func (t *knativeTrait) withServiceDo(
skipDuplicates bool,
e *Environment,
env *knativeapi.CamelEnvironment,
serviceURIsAsString string,
serviceType knativeapi.CamelServiceType,
endpointKind knativeapi.CamelEndpointKind,
gen func(ref *v1.ObjectReference, url *url.URL, serviceURI string) error) error {
serviceURIs := t.extractServices(serviceURIsAsString, serviceType)
for _, serviceURI := range serviceURIs {
ref, err := knativeutil.ExtractObjectReference(serviceURI)
if err != nil {
return err
}
if skipDuplicates && env.ContainsService(ref.Name, endpointKind, serviceType, ref.APIVersion, ref.Kind) {
continue
}
possibleRefs := knativeutil.FillMissingReferenceData(serviceType, ref)
actualRef, err := knativeutil.GetAddressableReference(t.ctx, t.client, possibleRefs, e.Integration.Namespace, ref.Name)
if err != nil && k8serrors.IsNotFound(err) {
return errors.Errorf("cannot find %s %s", serviceType, ref.Name)
} else if err != nil {
return errors.Wrapf(err, "error looking up %s %s", serviceType, ref.Name)
}
targetURL, err := knativeutil.GetSinkURL(t.ctx, t.client, actualRef, e.Integration.Namespace)
if err != nil {
return errors.Wrapf(err, "cannot determine address of %s %s", string(serviceType), ref.Name)
}
t.L.Infof("Found URL for %s: %s", string(serviceType), targetURL.String())
err = gen(actualRef, targetURL, serviceURI)
if err != nil {
return errors.Wrapf(err, "unexpected error while executing handler for %s %s", string(serviceType), ref.Name)
}
}
return nil
}
func (t *knativeTrait) extractServices(names string, serviceType knativeapi.CamelServiceType) []string {
answer := make([]string, 0)
for _, item := range strings.Split(names, ",") {
i := strings.Trim(item, " \t\"")
if i != "" {
i = knativeutil.NormalizeToURI(serviceType, i)
answer = append(answer, i)
}
}
return answer
}