blob: 91e20a7d84b837eb6f1da4a4944c77c91f55378a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.component.knative;
import java.util.HashMap;
import java.util.Map;
import org.apache.camel.CamelContext;
import org.apache.camel.CamelContextAware;
import org.apache.camel.Endpoint;
import org.apache.camel.ExtendedCamelContext;
import org.apache.camel.component.knative.spi.Knative;
import org.apache.camel.component.knative.spi.KnativeConsumerFactory;
import org.apache.camel.component.knative.spi.KnativeEnvironment;
import org.apache.camel.component.knative.spi.KnativeProducerFactory;
import org.apache.camel.spi.Metadata;
import org.apache.camel.spi.annotations.Component;
import org.apache.camel.support.CamelContextHelper;
import org.apache.camel.support.DefaultComponent;
import org.apache.camel.support.service.ServiceHelper;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.PropertiesHelper;
import org.apache.camel.util.StringHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Component(KnativeConstants.SCHEME)
public class KnativeComponent extends DefaultComponent {
private static final Logger LOGGER = LoggerFactory.getLogger(KnativeComponent.class);
@Metadata
private KnativeConfiguration configuration;
@Metadata
private String environmentPath;
@Metadata(defaultValue = "http")
private Knative.Protocol protocol = Knative.Protocol.http;
@Metadata
private KnativeProducerFactory producerFactory;
@Metadata
private KnativeConsumerFactory consumerFactory;
private boolean managedProducer;
private boolean managedConsumer;
public KnativeComponent() {
this(null);
}
public KnativeComponent(CamelContext context) {
super(context);
this.configuration = new KnativeConfiguration();
this.configuration.setTransportOptions(new HashMap<>());
}
// ************************
//
// Properties
//
// ************************
public KnativeConfiguration getConfiguration() {
return configuration;
}
/**
* Set the configuration.
*/
public void setConfiguration(KnativeConfiguration configuration) {
this.configuration = ObjectHelper.notNull(configuration, "configuration");
}
public String getEnvironmentPath() {
return environmentPath;
}
/**
* The path ot the environment definition
*/
public void setEnvironmentPath(String environmentPath) {
this.environmentPath = environmentPath;
}
public KnativeEnvironment getEnvironment() {
return configuration.getEnvironment();
}
/**
* The environment
*/
public void setEnvironment(KnativeEnvironment environment) {
configuration.setEnvironment(environment);
}
public String getCloudEventsSpecVersion() {
return configuration.getCloudEventsSpecVersion();
}
/**
* Set the version of the cloudevents spec.
*/
public void setCloudEventsSpecVersion(String cloudEventSpecVersion) {
configuration.setCloudEventsSpecVersion(cloudEventSpecVersion);
}
public Knative.Protocol getProtocol() {
return protocol;
}
/**
* Protocol.
*/
public KnativeComponent setProtocol(Knative.Protocol protocol) {
this.protocol = protocol;
return this;
}
public KnativeProducerFactory getProducerFactory() {
return producerFactory;
}
/**
* The protocol producer factory.
*/
public void setProducerFactory(KnativeProducerFactory producerFactory) {
this.producerFactory = producerFactory;
}
public KnativeConsumerFactory getConsumerFactory() {
return consumerFactory;
}
/**
* The protocol consumer factory.
*/
public void setConsumerFactory(KnativeConsumerFactory consumerFactory) {
this.consumerFactory = consumerFactory;
}
public Map<String, Object> getTransportOptions() {
return configuration.getTransportOptions();
}
/**
* Transport options.
*/
public void setTransportOptions(Map<String, Object> transportOptions) {
configuration.setTransportOptions(transportOptions);
}
// ************************
//
// Lifecycle
//
// ************************
@Override
protected void doInit() throws Exception {
super.doInit();
setUpProducerFactory();
setUpConsumerFactory();
if (this.producerFactory != null && managedProducer) {
ServiceHelper.initService(this.producerFactory);
}
if (this.consumerFactory != null && managedConsumer) {
ServiceHelper.initService(this.consumerFactory);
}
}
@Override
protected void doStart() throws Exception {
super.doStart();
if (this.producerFactory != null && managedProducer) {
ServiceHelper.startService(this.producerFactory);
}
if (this.consumerFactory != null && managedConsumer) {
ServiceHelper.startService(this.consumerFactory);
}
if (this.producerFactory == null && this.consumerFactory == null) {
throw new IllegalStateException("No prodcuer or consumer factroy have been configured");
}
}
@Override
protected void doStop() throws Exception {
super.doStop();
if (this.producerFactory != null && managedProducer) {
ServiceHelper.stopService(this.producerFactory);
}
if (this.consumerFactory != null && managedConsumer) {
ServiceHelper.stopService(this.consumerFactory);
}
}
@SuppressWarnings("unchecked")
@Override
protected Endpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
if (ObjectHelper.isEmpty(remaining)) {
throw new IllegalArgumentException("Expecting URI in the form of: 'knative:type/name', got '" + uri + "'");
}
final String type = ObjectHelper.supplyIfEmpty(StringHelper.before(remaining, "/"), () -> remaining);
final String name = StringHelper.after(remaining, "/");
final KnativeConfiguration conf = getKnativeConfiguration();
conf.getFilters().putAll(
(Map)PropertiesHelper.extractProperties(parameters, "filter.", true)
);
conf.getCeOverride().putAll(
(Map)PropertiesHelper.extractProperties(parameters, "ce.override.", true)
);
conf.getTransportOptions().putAll(
PropertiesHelper.extractProperties(parameters, "transport.", true)
);
KnativeEndpoint endpoint = new KnativeEndpoint(uri, this, Knative.Type.valueOf(type), name, conf);
setProperties(endpoint, parameters);
return endpoint;
}
// ************************
//
// Helpers
//
// ************************
private KnativeConfiguration getKnativeConfiguration() throws Exception {
final String envConfig = System.getenv(KnativeConstants.CONFIGURATION_ENV_VARIABLE);
final KnativeConfiguration conf = configuration.copy();
if (conf.getTransportOptions() == null) {
conf.setTransportOptions(new HashMap<>());
}
if (conf.getFilters() == null) {
conf.setFilters(new HashMap<>());
}
if (conf.getCeOverride() == null) {
conf.setCeOverride(new HashMap<>());
}
if (conf.getEnvironment() == null) {
KnativeEnvironment env;
if (environmentPath != null) {
env = KnativeEnvironment.mandatoryLoadFromResource(getCamelContext(), this.environmentPath);
} else if (envConfig != null) {
env = envConfig.startsWith("file:") || envConfig.startsWith("classpath:")
? KnativeEnvironment.mandatoryLoadFromResource(getCamelContext(), envConfig)
: KnativeEnvironment.mandatoryLoadFromSerializedString(envConfig);
} else {
env = CamelContextHelper.findByType(getCamelContext(), KnativeEnvironment.class);
}
if (env == null) {
throw new IllegalStateException("Cannot load Knative configuration from file or env variable");
}
conf.setEnvironment(env);
}
return conf;
}
private void setUpProducerFactory() throws Exception {
if (producerFactory == null) {
this.producerFactory = CamelContextHelper.lookup(getCamelContext(), protocol.name(), KnativeProducerFactory.class);
if (this.producerFactory == null) {
this.producerFactory = getCamelContext()
.adapt(ExtendedCamelContext.class)
.getFactoryFinder(Knative.KNATIVE_TRANSPORT_RESOURCE_PATH)
.newInstance(protocol.name() + "-producer", KnativeProducerFactory.class)
.orElse(null);
if (this.producerFactory == null) {
return;
}
if (configuration.getTransportOptions() != null) {
setProperties(producerFactory, new HashMap<>(configuration.getTransportOptions()));
}
this.managedProducer = true;
CamelContextAware.trySetCamelContext(this.producerFactory, getCamelContext());
}
LOGGER.info("found knative producer factory: {} for protocol: {}", producerFactory, protocol.name());
}
}
private void setUpConsumerFactory() throws Exception {
if (consumerFactory == null) {
this.consumerFactory = CamelContextHelper.lookup(getCamelContext(), protocol.name(), KnativeConsumerFactory.class);
if (this.consumerFactory == null) {
this.consumerFactory = getCamelContext()
.adapt(ExtendedCamelContext.class)
.getFactoryFinder(Knative.KNATIVE_TRANSPORT_RESOURCE_PATH)
.newInstance(protocol.name() + "-consumer", KnativeConsumerFactory.class)
.orElse(null);
if (this.consumerFactory == null) {
return;
}
if (configuration.getTransportOptions() != null) {
setProperties(consumerFactory, new HashMap<>(configuration.getTransportOptions()));
}
this.managedConsumer = true;
CamelContextAware.trySetCamelContext(this.consumerFactory, getCamelContext());
}
LOGGER.info("found knative consumer factory: {} for protocol: {}", consumerFactory, protocol.name());
}
}
}