blob: 06b1ace5cd282344a8c7da2be889c5efcf4e8771 [file] [log] [blame]
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.camel.component.kamelet;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.camel.CamelContext;
import org.apache.camel.Endpoint;
import org.apache.camel.Processor;
import org.apache.camel.RuntimeCamelException;
import org.apache.camel.ServiceStatus;
import org.apache.camel.VetoCamelContextStartException;
import org.apache.camel.model.ModelCamelContext;
import org.apache.camel.model.RouteDefinition;
import org.apache.camel.spi.Metadata;
import org.apache.camel.spi.RouteTemplateLoaderListener;
import org.apache.camel.spi.annotations.Component;
import org.apache.camel.util.StopWatch;
import org.apache.camel.util.URISupport;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static org.apache.camel.component.kamelet.Kamelet.NO_ERROR_HANDLER;
import static org.apache.camel.component.kamelet.Kamelet.PARAM_LOCATION;
import static org.apache.camel.component.kamelet.Kamelet.PARAM_ROUTE_ID;
import static org.apache.camel.component.kamelet.Kamelet.PARAM_TEMPLATE_ID;
import static org.apache.camel.component.kamelet.Kamelet.PARAM_UUID;
* Materialize route templates
public class KameletComponent extends DefaultComponent {
private static final Logger LOGGER = LoggerFactory.getLogger(KameletComponent.class);
private final LifecycleHandler lifecycleHandler = new LifecycleHandler();
// active consumers
private final Map<String, KameletConsumer> consumers = new HashMap<>();
// active kamelet EIPs
private final Map<String, Processor> kameletEips = new ConcurrentHashMap<>();
@Metadata(label = "advanced", autowired = true)
private RouteTemplateLoaderListener routeTemplateLoaderListener;
// counter that is used for producers to keep track if any consumer was added/removed since they last checked
// this is used for optimization to avoid each producer to get consumer for each message processed
// (locking via synchronized, and then lookup in the map as the cost)
// consumers and producers are only added/removed during startup/shutdown or if routes is manually controlled
private volatile int stateCounter;
@Metadata(label = "producer", defaultValue = "true")
private boolean block = true;
@Metadata(label = "producer", defaultValue = "30000")
private long timeout = 30000L;
@Metadata(label = "advanced", defaultValue = "true")
private boolean noErrorHandler = true;
private Map<String, Properties> templateProperties;
private Map<String, Properties> routeProperties;
@Metadata(defaultValue = Kamelet.DEFAULT_LOCATION)
private String location = Kamelet.DEFAULT_LOCATION;
public KameletComponent() {
public void addKameletEip(String key, Processor callback) {
kameletEips.put(key, callback);
public Processor removeKameletEip(String key) {
return kameletEips.remove(key);
public Processor getKameletEip(String key) {
return kameletEips.get(key);
protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
final String templateId = Kamelet.extractTemplateId(getCamelContext(), remaining, parameters);
final String uuid = Kamelet.extractUuid();
final String routeId = Kamelet.extractRouteId(getCamelContext(), remaining, parameters, uuid);
final String loc = Kamelet.extractLocation(getCamelContext(), parameters);
// manually need to resolve raw parameters as input to the kamelet because
// resolveRawParameterValues is false
// this ensures that parameters such as passwords are used as-is and not encoded
// but this requires to use RAW() syntax in the kamelet template.
final KameletEndpoint endpoint;
if (Kamelet.SOURCE_ID.equals(remaining) || Kamelet.SINK_ID.equals(remaining)) {
// if remaining is either `source` or `sink' then it is a virtual
// endpoint that is used inside the kamelet definition to mark it
// as in/out endpoint.
// The following snippet defines a template which will act as a
// consumer for this Kamelet:
// from("kamelet:source")
// .to("log:info")
// The following snippet defines a template which will act as a
// producer for this Kamelet:
// from("telegram:bots")
// .to("kamelet:sink")
// Note that at the moment, there's no enforcement around `source`
// and `sink' to be defined on the right side (producer or consumer)
endpoint = new KameletEndpoint(uri, this, templateId, routeId);
// forward component properties
// endpoint specific location
// set endpoint specific properties
setProperties(endpoint, parameters);
} else {
endpoint = new KameletEndpoint(uri, this, templateId, routeId) {
protected void doInit() throws Exception {
// since this is the real kamelet, then we need to hand it
// over to the tracker.
// forward component properties
// endpoint specific location
// set and remove endpoint specific properties
setProperties(endpoint, parameters);
Map<String, Object> kameletProperties = new HashMap<>();
// Load properties from the component configuration. Template and route specific properties
// can be set through properties, as an example:
// camel.component.kamelet.template-properties[templateId].key = val
// camel.component.kamelet.route-properties[routeId].key = val
if (templateProperties != null) {
Properties props = templateProperties.get(templateId);
if (props != null) {
props.stringPropertyNames().forEach(name -> kameletProperties.put(name, props.get(name)));
if (routeProperties != null) {
Properties props = routeProperties.get(routeId);
if (props != null) {
props.stringPropertyNames().forEach(name -> kameletProperties.put(name, props.get(name)));
// We can't mix configuration styles so if properties are not configured through the component,
// then fallback to the old - deprecated - style.
if (kameletProperties.isEmpty()) {
// The properties for the kamelets are determined by global properties
// and local endpoint parameters,
// Global parameters are loaded in the following order:
// camel.kamelet." + templateId
// camel.kamelet." + templateId + "." routeId
Kamelet.extractKameletProperties(getCamelContext(), kameletProperties, templateId, routeId);
// Uri params have the highest precedence
// And finally we have some specific properties that cannot be changed by the user.
kameletProperties.put(PARAM_TEMPLATE_ID, templateId);
kameletProperties.put(PARAM_ROUTE_ID, routeId);
kameletProperties.put(PARAM_UUID, uuid);
kameletProperties.put(NO_ERROR_HANDLER, endpoint.isNoErrorHandler());
// set kamelet specific properties
// Add a custom converter to convert a RouteTemplateDefinition to a RouteDefinition
// and make sure consumer URIs are unique.
((ModelCamelContext) getCamelContext()).addRouteTemplateDefinitionConverter(
return endpoint;
protected boolean resolveRawParameterValues() {
return false;
public boolean isNoErrorHandler() {
return noErrorHandler;
* Kamelets, by default, will not do fine-grained error handling, but works in no-error-handler mode. This can be
* turned off, to use old behaviour in earlier versions of Camel.
public void setNoErrorHandler(boolean noErrorHandler) {
this.noErrorHandler = noErrorHandler;
public boolean isBlock() {
return block;
* If sending a message to a kamelet endpoint which has no active consumer, then we can tell the producer to block
* and wait for the consumer to become active.
public void setBlock(boolean block) {
this.block = block;
public long getTimeout() {
return timeout;
* The timeout value to use if block is enabled.
public void setTimeout(long timeout) {
this.timeout = timeout;
public Map<String, Properties> getTemplateProperties() {
return templateProperties;
* Set template local parameters.
public void setTemplateProperties(Map<String, Properties> templateProperties) {
this.templateProperties = templateProperties;
public Map<String, Properties> getRouteProperties() {
return routeProperties;
* Set route local parameters.
public void setRouteProperties(Map<String, Properties> routeProperties) {
this.routeProperties = routeProperties;
public String getLocation() {
return location;
* The location(s) of the Kamelets on the file system. Multiple locations can be set separated by comma.
public void setLocation(String location) {
this.location = location;
public RouteTemplateLoaderListener getRouteTemplateLoaderListener() {
return routeTemplateLoaderListener;
* To plugin a custom listener for when the Kamelet component is loading Kamelets from external resources.
public void setRouteTemplateLoaderListener(RouteTemplateLoaderListener routeTemplateLoaderListener) {
this.routeTemplateLoaderListener = routeTemplateLoaderListener;
int getStateCounter() {
return stateCounter;
public void addConsumer(String key, KameletConsumer consumer) {
synchronized (consumers) {
if (consumers.putIfAbsent(key, consumer) != null) {
throw new IllegalArgumentException(
"Cannot add a 2nd consumer to the same endpoint: " + key
+ ". KameletEndpoint only allows one consumer.");
// state changed so inc counter
public void removeConsumer(String key, KameletConsumer consumer) {
synchronized (consumers) {
consumers.remove(key, consumer);
// state changed so inc counter
protected KameletConsumer getConsumer(String key, boolean block, long timeout) throws InterruptedException {
synchronized (consumers) {
KameletConsumer answer = consumers.get(key);
if (answer == null && block) {
StopWatch watch = new StopWatch();
for (;;) {
answer = consumers.get(key);
if (answer != null) {
long rem = timeout - watch.taken();
if (rem <= 0) {
return answer;
protected void doInit() throws Exception {
if (getCamelContext().isRunAllowed()) {
protected void doShutdown() throws Exception {
* This LifecycleHandler is used to keep track of created kamelet endpoints during startup as
* we need to defer create routes from templates until camel context has finished loading
* all routes and whatnot.
* Once the camel context is initialized all the endpoint tracked by this LifecycleHandler will
* be used to create routes from templates.
private class LifecycleHandler extends LifecycleStrategySupport {
private final List<KameletEndpoint> endpoints;
private final AtomicBoolean initialized;
public LifecycleHandler() {
this.endpoints = new ArrayList<>();
this.initialized = new AtomicBoolean();
public void createRouteForEndpoint(KameletEndpoint endpoint) throws Exception {
final ModelCamelContext context = (ModelCamelContext) getCamelContext();
final String templateId = endpoint.getTemplateId();
final String routeId = endpoint.getRouteId();
final String loc = endpoint.getLocation() != null ? endpoint.getLocation() : getLocation();
final String uuid = (String) endpoint.getKameletProperties().get(PARAM_UUID);
if (context.getRouteTemplateDefinition(templateId) == null && loc != null) {
LOGGER.debug("Loading route template={} from {}", templateId, loc);
RouteTemplateHelper.loadRouteTemplateFromLocation(getCamelContext(), routeTemplateLoaderListener, templateId,
LOGGER.debug("Creating route from template={} and id={}", templateId, routeId);
try {
String id = context.addRouteFromTemplate(routeId, templateId, uuid, endpoint.getKameletProperties());
RouteDefinition def = context.getRouteDefinition(id);
// start the route if not already started
ServiceStatus status = context.getRouteController().getRouteStatus(id);
boolean started = status != null && status.isStarted();
if (!started) {
LOGGER.debug("Route with id={} created from template={}", id, templateId);
} catch (Exception e) {
throw new KameletNotFoundException(templateId, loc, e);
public void onContextInitialized(CamelContext context) throws VetoCamelContextStartException {
if (this.initialized.compareAndSet(false, true)) {
for (KameletEndpoint endpoint : endpoints) {
try {
} catch (Exception e) {
throw new VetoCamelContextStartException(
"Failure creating route from template: " + endpoint.getTemplateId(), e, context);
public void setInitialized(boolean initialized) {
public void track(KameletEndpoint endpoint) {
if (this.initialized.get()) {
try {
} catch (Exception e) {
throw RuntimeCamelException.wrapRuntimeException(e);
} else {
LOGGER.debug("Tracking route template={} and id={}", endpoint.getTemplateId(), endpoint.getRouteId());