blob: 91735f257b0e08e8b163695ca4f1fa2df13f3830 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.kafkaconnector.utils;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.camel.CamelContext;
import org.apache.camel.CamelContextAware;
import org.apache.camel.ConsumerTemplate;
import org.apache.camel.Endpoint;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.main.BaseMainSupport;
import org.apache.camel.main.Main;
import org.apache.camel.main.MainListener;
import org.apache.camel.model.RouteDefinition;
import org.apache.camel.spi.DataFormat;
import org.apache.camel.support.PropertyBindingSupport;
import org.apache.camel.util.OrderedProperties;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class CamelMainSupport {
public static final String CAMEL_DATAFORMAT_PROPERTIES_PREFIX = "camel.dataformat.";
private static final Logger LOG = LoggerFactory.getLogger(CamelMainSupport.class);
private Main camelMain;
private CamelContext camel;
private final ExecutorService exService = Executors.newSingleThreadExecutor();
private final CountDownLatch startFinishedSignal = new CountDownLatch(1);
public CamelMainSupport(Map<String, String> props, String fromUrl, String toUrl, String marshal, String unmarshal) throws Exception {
this(props, fromUrl, toUrl, marshal, unmarshal, new DefaultCamelContext());
}
public CamelMainSupport(Map<String, String> props, String fromUrl, String toUrl, String marshal, String unmarshal, CamelContext camelContext) throws Exception {
camel = camelContext;
camelMain = new Main() {
@Override
protected ProducerTemplate findOrCreateCamelTemplate() {
return camel.createProducerTemplate();
}
@Override
protected CamelContext createCamelContext() {
return camel;
}
};
camelMain.addMainListener(new CamelMainFinishedListener());
// reordering properties to place the one starting with "#class:" first
Map<String, String> orderedProps = new LinkedHashMap<>();
props.keySet().stream()
.filter(k -> props.get(k).startsWith("#class:"))
.forEach(k -> orderedProps.put(k, props.get(k)));
props.keySet().stream()
.filter(k -> !props.get(k).startsWith("#class:"))
.forEach(k -> orderedProps.put(k, props.get(k)));
Properties camelProperties = new OrderedProperties();
camelProperties.putAll(orderedProps);
LOG.info("Setting initial properties in Camel context: [{}]", camelProperties);
this.camel.getPropertiesComponent().setInitialProperties(camelProperties);
//creating the actual route
this.camel.addRoutes(new RouteBuilder() {
public void configure() {
RouteDefinition rd = from(fromUrl);
if (marshal != null && unmarshal != null) {
throw new UnsupportedOperationException("Uses of both marshal (i.e. " + marshal + ") and unmarshal (i.e. " + unmarshal + ") is not supported");
} else if (marshal != null) {
LOG.info("Creating Camel route from({}).marshal().custom({}).to({})", fromUrl, marshal, toUrl);
camel.getRegistry().bind(marshal, lookupAndInstantiateDataformat(marshal));
rd.marshal().custom(marshal);
} else if (unmarshal != null) {
LOG.info("Creating Camel route from({}).unmarshal().custom({}).to({})", fromUrl, unmarshal, toUrl);
camel.getRegistry().bind(unmarshal, lookupAndInstantiateDataformat(unmarshal));
rd.unmarshal().custom(unmarshal);
} else {
LOG.info("Creating Camel route from({}).to({})", fromUrl, toUrl);
}
rd.to(toUrl);
}
});
}
public void start() throws Exception {
LOG.info("Starting CamelContext");
CamelContextStarter starter = new CamelContextStarter();
exService.execute(starter);
startFinishedSignal.await();
if (starter.hasException()) {
LOG.info("CamelContext failed to start", starter.getException());
throw starter.getException();
}
LOG.info("CamelContext started");
}
public void stop() {
LOG.info("Stopping CamelContext");
camelMain.stop();
exService.shutdown();
LOG.info("CamelContext stopped");
}
public ProducerTemplate createProducerTemplate() {
return camel.createProducerTemplate();
}
public Endpoint getEndpoint(String uri) {
return camel.getEndpoint(uri);
}
public Collection<Endpoint> getEndpoints() {
return camel.getEndpoints();
}
public ConsumerTemplate createConsumerTemplate() {
return camel.createConsumerTemplate();
}
private DataFormat lookupAndInstantiateDataformat(String dataformatName) {
DataFormat df = camel.resolveDataFormat(dataformatName);
if (df == null) {
df = camel.createDataFormat(dataformatName);
final String prefix = CAMEL_DATAFORMAT_PROPERTIES_PREFIX + dataformatName + ".";
final Properties props = camel.getPropertiesComponent().loadProperties(k -> k.startsWith(prefix));
CamelContextAware.trySetCamelContext(df, camel);
if (!props.isEmpty()) {
PropertyBindingSupport.build()
.withCamelContext(camel)
.withOptionPrefix(prefix)
.withRemoveParameters(false)
.withProperties((Map) props)
.withTarget(df)
.bind();
}
}
//TODO: move it to the caller?
if (df == null) {
throw new UnsupportedOperationException("No DataFormat found with name " + dataformatName);
}
return df;
}
private class CamelMainFinishedListener implements MainListener {
@Override
public void configure(CamelContext context) {
}
@Override
public void beforeStart(BaseMainSupport main) {
}
@Override
public void afterStart(BaseMainSupport main) {
LOG.trace("Signaling CamelContext startup is finished (startFinishedSignal.countDown();) due to CamelMainFinishedListener been called");
startFinishedSignal.countDown();
}
@Override
public void beforeStop(BaseMainSupport main) {
}
@Override
public void afterStop(BaseMainSupport main) {
}
@Override
public void beforeConfigure(BaseMainSupport main) {
}
}
private class CamelContextStarter implements Runnable {
private Exception startException;
@Override
public void run() {
try {
camelMain.run();
} catch (Exception e) {
LOG.error("An exception has occurred before CamelContext startup has finished", e);
startException = e;
if (startFinishedSignal.getCount() > 0) {
LOG.trace("Signaling CamelContext startup is finished (startFinishedSignal.countDown();) due to an exception");
startFinishedSignal.countDown();
}
}
}
public boolean hasException() {
return startException != null;
}
public Exception getException() {
return startException;
}
}
}