| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.camel.component.kamelet; |
| |
| import java.util.ArrayList; |
| import java.util.Collections; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Properties; |
| import java.util.concurrent.ConcurrentHashMap; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| |
| import org.apache.camel.CamelContext; |
| import org.apache.camel.Endpoint; |
| import org.apache.camel.Processor; |
| import org.apache.camel.RuntimeCamelException; |
| import org.apache.camel.ServiceStatus; |
| import org.apache.camel.VetoCamelContextStartException; |
| import org.apache.camel.model.ModelCamelContext; |
| import org.apache.camel.model.RouteDefinition; |
| import org.apache.camel.spi.Metadata; |
| import org.apache.camel.spi.RouteTemplateLoaderListener; |
| import org.apache.camel.spi.annotations.Component; |
| import org.apache.camel.support.DefaultComponent; |
| import org.apache.camel.support.LifecycleStrategySupport; |
| import org.apache.camel.support.RouteTemplateHelper; |
| import org.apache.camel.support.service.ServiceHelper; |
| import org.apache.camel.util.StopWatch; |
| import org.apache.camel.util.URISupport; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import static org.apache.camel.component.kamelet.Kamelet.NO_ERROR_HANDLER; |
| import static org.apache.camel.component.kamelet.Kamelet.PARAM_LOCATION; |
| import static org.apache.camel.component.kamelet.Kamelet.PARAM_ROUTE_ID; |
| import static org.apache.camel.component.kamelet.Kamelet.PARAM_TEMPLATE_ID; |
| import static org.apache.camel.component.kamelet.Kamelet.PARAM_UUID; |
| |
| /** |
| * Materialize route templates |
| */ |
| @Component(Kamelet.SCHEME) |
| public class KameletComponent extends DefaultComponent { |
| private static final Logger LOGGER = LoggerFactory.getLogger(KameletComponent.class); |
| |
| private final LifecycleHandler lifecycleHandler = new LifecycleHandler(); |
| |
| // active consumers |
| private final Map<String, KameletConsumer> consumers = new HashMap<>(); |
| // active kamelet EIPs |
| private final Map<String, Processor> kameletEips = new ConcurrentHashMap<>(); |
| |
| @Metadata(label = "advanced", autowired = true) |
| private RouteTemplateLoaderListener routeTemplateLoaderListener; |
| |
| // counter that is used for producers to keep track if any consumer was added/removed since they last checked |
| // this is used for optimization to avoid each producer to get consumer for each message processed |
| // (locking via synchronized, and then lookup in the map as the cost) |
| // consumers and producers are only added/removed during startup/shutdown or if routes is manually controlled |
| private volatile int stateCounter; |
| |
| @Metadata(label = "producer", defaultValue = "true") |
| private boolean block = true; |
| @Metadata(label = "producer", defaultValue = "30000") |
| private long timeout = 30000L; |
| @Metadata(label = "advanced", defaultValue = "true") |
| private boolean noErrorHandler = true; |
| |
| @Metadata |
| private Map<String, Properties> templateProperties; |
| @Metadata |
| private Map<String, Properties> routeProperties; |
| @Metadata(defaultValue = Kamelet.DEFAULT_LOCATION) |
| private String location = Kamelet.DEFAULT_LOCATION; |
| |
| public KameletComponent() { |
| } |
| |
| public void addKameletEip(String key, Processor callback) { |
| kameletEips.put(key, callback); |
| } |
| |
| public Processor removeKameletEip(String key) { |
| return kameletEips.remove(key); |
| } |
| |
| public Processor getKameletEip(String key) { |
| return kameletEips.get(key); |
| } |
| |
| @Override |
| protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception { |
| final String templateId = Kamelet.extractTemplateId(getCamelContext(), remaining, parameters); |
| final String uuid = Kamelet.extractUuid(); |
| final String routeId = Kamelet.extractRouteId(getCamelContext(), remaining, parameters, uuid); |
| final String loc = Kamelet.extractLocation(getCamelContext(), parameters); |
| |
| parameters.remove(PARAM_TEMPLATE_ID); |
| parameters.remove(PARAM_ROUTE_ID); |
| parameters.remove(PARAM_LOCATION); |
| parameters.remove(PARAM_UUID); |
| |
| // manually need to resolve raw parameters as input to the kamelet because |
| // resolveRawParameterValues is false |
| // this ensures that parameters such as passwords are used as-is and not encoded |
| // but this requires to use RAW() syntax in the kamelet template. |
| URISupport.resolveRawParameterValues(parameters); |
| |
| final KameletEndpoint endpoint; |
| |
| if (Kamelet.SOURCE_ID.equals(remaining) || Kamelet.SINK_ID.equals(remaining)) { |
| // |
| // if remaining is either `source` or `sink' then it is a virtual |
| // endpoint that is used inside the kamelet definition to mark it |
| // as in/out endpoint. |
| // |
| // The following snippet defines a template which will act as a |
| // consumer for this Kamelet: |
| // |
| // from("kamelet:source") |
| // .to("log:info") |
| // |
| // The following snippet defines a template which will act as a |
| // producer for this Kamelet: |
| // |
| // from("telegram:bots") |
| // .to("kamelet:sink") |
| // |
| // Note that at the moment, there's no enforcement around `source` |
| // and `sink' to be defined on the right side (producer or consumer) |
| // |
| endpoint = new KameletEndpoint(uri, this, templateId, routeId); |
| |
| // forward component properties |
| endpoint.setNoErrorHandler(noErrorHandler); |
| endpoint.setBlock(block); |
| endpoint.setTimeout(timeout); |
| // endpoint specific location |
| endpoint.setLocation(loc); |
| |
| // set endpoint specific properties |
| setProperties(endpoint, parameters); |
| } else { |
| endpoint = new KameletEndpoint(uri, this, templateId, routeId) { |
| @Override |
| protected void doInit() throws Exception { |
| super.doInit(); |
| // |
| // since this is the real kamelet, then we need to hand it |
| // over to the tracker. |
| // |
| lifecycleHandler.track(this); |
| } |
| }; |
| |
| // forward component properties |
| endpoint.setNoErrorHandler(noErrorHandler); |
| endpoint.setBlock(block); |
| endpoint.setTimeout(timeout); |
| // endpoint specific location |
| endpoint.setLocation(loc); |
| |
| // set and remove endpoint specific properties |
| setProperties(endpoint, parameters); |
| |
| Map<String, Object> kameletProperties = new HashMap<>(); |
| |
| // |
| // Load properties from the component configuration. Template and route specific properties |
| // can be set through properties, as an example: |
| // |
| // camel.component.kamelet.template-properties[templateId].key = val |
| // camel.component.kamelet.route-properties[routeId].key = val |
| // |
| if (templateProperties != null) { |
| Properties props = templateProperties.get(templateId); |
| if (props != null) { |
| props.stringPropertyNames().forEach(name -> kameletProperties.put(name, props.get(name))); |
| } |
| } |
| if (routeProperties != null) { |
| Properties props = routeProperties.get(routeId); |
| if (props != null) { |
| props.stringPropertyNames().forEach(name -> kameletProperties.put(name, props.get(name))); |
| } |
| } |
| |
| // |
| // We can't mix configuration styles so if properties are not configured through the component, |
| // then fallback to the old - deprecated - style. |
| // |
| if (kameletProperties.isEmpty()) { |
| // |
| // The properties for the kamelets are determined by global properties |
| // and local endpoint parameters, |
| // |
| // Global parameters are loaded in the following order: |
| // |
| // camel.kamelet." + templateId |
| // camel.kamelet." + templateId + "." routeId |
| // |
| Kamelet.extractKameletProperties(getCamelContext(), kameletProperties, templateId, routeId); |
| } |
| |
| // |
| // Uri params have the highest precedence |
| // |
| kameletProperties.putAll(parameters); |
| |
| // |
| // And finally we have some specific properties that cannot be changed by the user. |
| // |
| kameletProperties.put(PARAM_TEMPLATE_ID, templateId); |
| kameletProperties.put(PARAM_ROUTE_ID, routeId); |
| kameletProperties.put(PARAM_UUID, uuid); |
| kameletProperties.put(NO_ERROR_HANDLER, endpoint.isNoErrorHandler()); |
| |
| // set kamelet specific properties |
| endpoint.setKameletProperties(kameletProperties); |
| |
| // |
| // Add a custom converter to convert a RouteTemplateDefinition to a RouteDefinition |
| // and make sure consumer URIs are unique. |
| // |
| ((ModelCamelContext) getCamelContext()).addRouteTemplateDefinitionConverter( |
| templateId, |
| Kamelet::templateToRoute); |
| } |
| |
| return endpoint; |
| } |
| |
| @Override |
| protected boolean resolveRawParameterValues() { |
| return false; |
| } |
| |
| public boolean isNoErrorHandler() { |
| return noErrorHandler; |
| } |
| |
| /** |
| * Kamelets, by default, will not do fine-grained error handling, but works in no-error-handler mode. This can be |
| * turned off, to use old behaviour in earlier versions of Camel. |
| */ |
| public void setNoErrorHandler(boolean noErrorHandler) { |
| this.noErrorHandler = noErrorHandler; |
| } |
| |
| public boolean isBlock() { |
| return block; |
| } |
| |
| /** |
| * If sending a message to a kamelet endpoint which has no active consumer, then we can tell the producer to block |
| * and wait for the consumer to become active. |
| */ |
| public void setBlock(boolean block) { |
| this.block = block; |
| } |
| |
| public long getTimeout() { |
| return timeout; |
| } |
| |
| /** |
| * The timeout value to use if block is enabled. |
| */ |
| public void setTimeout(long timeout) { |
| this.timeout = timeout; |
| } |
| |
| public Map<String, Properties> getTemplateProperties() { |
| return templateProperties; |
| } |
| |
| /** |
| * Set template local parameters. |
| */ |
| public void setTemplateProperties(Map<String, Properties> templateProperties) { |
| this.templateProperties = templateProperties; |
| } |
| |
| public Map<String, Properties> getRouteProperties() { |
| return routeProperties; |
| } |
| |
| /** |
| * Set route local parameters. |
| */ |
| public void setRouteProperties(Map<String, Properties> routeProperties) { |
| this.routeProperties = routeProperties; |
| } |
| |
| public String getLocation() { |
| return location; |
| } |
| |
| /** |
| * The location(s) of the Kamelets on the file system. Multiple locations can be set separated by comma. |
| */ |
| public void setLocation(String location) { |
| this.location = location; |
| } |
| |
| public RouteTemplateLoaderListener getRouteTemplateLoaderListener() { |
| return routeTemplateLoaderListener; |
| } |
| |
| /** |
| * To plugin a custom listener for when the Kamelet component is loading Kamelets from external resources. |
| */ |
| public void setRouteTemplateLoaderListener(RouteTemplateLoaderListener routeTemplateLoaderListener) { |
| this.routeTemplateLoaderListener = routeTemplateLoaderListener; |
| } |
| |
| int getStateCounter() { |
| return stateCounter; |
| } |
| |
| public void addConsumer(String key, KameletConsumer consumer) { |
| synchronized (consumers) { |
| if (consumers.putIfAbsent(key, consumer) != null) { |
| throw new IllegalArgumentException( |
| "Cannot add a 2nd consumer to the same endpoint: " + key |
| + ". KameletEndpoint only allows one consumer."); |
| } |
| // state changed so inc counter |
| stateCounter++; |
| consumers.notifyAll(); |
| } |
| } |
| |
| public void removeConsumer(String key, KameletConsumer consumer) { |
| synchronized (consumers) { |
| consumers.remove(key, consumer); |
| // state changed so inc counter |
| stateCounter++; |
| consumers.notifyAll(); |
| } |
| } |
| |
| protected KameletConsumer getConsumer(String key, boolean block, long timeout) throws InterruptedException { |
| synchronized (consumers) { |
| KameletConsumer answer = consumers.get(key); |
| if (answer == null && block) { |
| StopWatch watch = new StopWatch(); |
| for (;;) { |
| answer = consumers.get(key); |
| if (answer != null) { |
| break; |
| } |
| long rem = timeout - watch.taken(); |
| if (rem <= 0) { |
| break; |
| } |
| consumers.wait(rem); |
| } |
| } |
| return answer; |
| } |
| } |
| |
| @Override |
| protected void doInit() throws Exception { |
| getCamelContext().addLifecycleStrategy(lifecycleHandler); |
| |
| if (getCamelContext().isRunAllowed()) { |
| lifecycleHandler.setInitialized(true); |
| } |
| |
| super.doInit(); |
| } |
| |
| @Override |
| protected void doShutdown() throws Exception { |
| getCamelContext().getLifecycleStrategies().remove(lifecycleHandler); |
| |
| ServiceHelper.stopAndShutdownService(consumers); |
| consumers.clear(); |
| kameletEips.clear(); |
| super.doShutdown(); |
| } |
| |
| /* |
| * This LifecycleHandler is used to keep track of created kamelet endpoints during startup as |
| * we need to defer create routes from templates until camel context has finished loading |
| * all routes and whatnot. |
| * |
| * Once the camel context is initialized all the endpoint tracked by this LifecycleHandler will |
| * be used to create routes from templates. |
| */ |
| private class LifecycleHandler extends LifecycleStrategySupport { |
| private final List<KameletEndpoint> endpoints; |
| private final AtomicBoolean initialized; |
| |
| public LifecycleHandler() { |
| this.endpoints = new ArrayList<>(); |
| this.initialized = new AtomicBoolean(); |
| } |
| |
| public void createRouteForEndpoint(KameletEndpoint endpoint) throws Exception { |
| final ModelCamelContext context = (ModelCamelContext) getCamelContext(); |
| final String templateId = endpoint.getTemplateId(); |
| final String routeId = endpoint.getRouteId(); |
| final String loc = endpoint.getLocation() != null ? endpoint.getLocation() : getLocation(); |
| final String uuid = (String) endpoint.getKameletProperties().get(PARAM_UUID); |
| |
| if (context.getRouteTemplateDefinition(templateId) == null && loc != null) { |
| LOGGER.debug("Loading route template={} from {}", templateId, loc); |
| RouteTemplateHelper.loadRouteTemplateFromLocation(getCamelContext(), routeTemplateLoaderListener, templateId, |
| loc); |
| } |
| |
| LOGGER.debug("Creating route from template={} and id={}", templateId, routeId); |
| try { |
| String id = context.addRouteFromTemplate(routeId, templateId, uuid, endpoint.getKameletProperties()); |
| RouteDefinition def = context.getRouteDefinition(id); |
| |
| // start the route if not already started |
| ServiceStatus status = context.getRouteController().getRouteStatus(id); |
| boolean started = status != null && status.isStarted(); |
| if (!started) { |
| context.startRouteDefinitions(Collections.singletonList(def)); |
| } |
| |
| LOGGER.debug("Route with id={} created from template={}", id, templateId); |
| } catch (Exception e) { |
| throw new KameletNotFoundException(templateId, loc, e); |
| } |
| } |
| |
| @Override |
| public void onContextInitialized(CamelContext context) throws VetoCamelContextStartException { |
| if (this.initialized.compareAndSet(false, true)) { |
| for (KameletEndpoint endpoint : endpoints) { |
| try { |
| createRouteForEndpoint(endpoint); |
| } catch (Exception e) { |
| throw new VetoCamelContextStartException( |
| "Failure creating route from template: " + endpoint.getTemplateId(), e, context); |
| } |
| } |
| |
| endpoints.clear(); |
| } |
| } |
| |
| public void setInitialized(boolean initialized) { |
| this.initialized.set(initialized); |
| } |
| |
| public void track(KameletEndpoint endpoint) { |
| if (this.initialized.get()) { |
| try { |
| createRouteForEndpoint(endpoint); |
| } catch (Exception e) { |
| throw RuntimeCamelException.wrapRuntimeException(e); |
| } |
| } else { |
| LOGGER.debug("Tracking route template={} and id={}", endpoint.getTemplateId(), endpoint.getRouteId()); |
| this.endpoints.add(endpoint); |
| } |
| } |
| } |
| |
| } |