blob: cdbf409f8c791c2965b2e5fd5b97bd3d286e0195 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.k.cron;
import java.util.Optional;
import org.apache.camel.CamelContext;
import org.apache.camel.RoutesBuilder;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.k.Runtime;
import org.apache.camel.k.RuntimeAware;
import org.apache.camel.k.Source;
import org.apache.camel.k.SourceLoader;
import org.apache.camel.k.annotation.LoaderInterceptor;
import org.apache.camel.k.support.SourcesSupport;
import org.apache.camel.model.RouteDefinition;
import org.apache.camel.spi.CamelEvent;
import org.apache.camel.spi.Configurer;
import org.apache.camel.support.EventNotifierSupport;
import org.apache.camel.support.service.ServiceHelper;
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Configurer
@LoaderInterceptor("cron")
public class CronSourceLoaderInterceptor implements SourceLoader.Interceptor, RuntimeAware {
private String timerUri;
private String overridableComponents;
private Runtime runtime;
public CronSourceLoaderInterceptor() {
this.timerUri = "timer:camel-k-cron-override?delay=0&period=1&repeatCount=1";
}
public String getTimerUri() {
return timerUri;
}
public CronSourceLoaderInterceptor setTimerUri(String timerUri) {
this.timerUri = timerUri;
return this;
}
public String getOverridableComponents() {
return overridableComponents;
}
public void setOverridableComponents(String overridableComponents) {
this.overridableComponents = overridableComponents;
}
@Override
public void setRuntime(Runtime runtime) {
this.runtime = runtime;
}
@Override
public Runtime getRuntime() {
return runtime;
}
@Override
public void beforeLoad(SourceLoader loader, Source source) {
// no-op
}
@Override
public SourceLoader.Result afterLoad(SourceLoader loader, Source source, SourceLoader.Result result) {
return new SourceLoader.Result() {
@Override
public Optional<RoutesBuilder> builder() {
return result.builder().map(
builder -> SourcesSupport.afterConfigure(builder, CronSourceLoaderInterceptor.this::afterConfigure)
);
}
@Override
public Optional<Object> configuration() {
return result.configuration();
}
};
}
private void afterConfigure(RouteBuilder builder) {
if (ObjectHelper.isEmpty(overridableComponents)) {
return;
}
final CamelContext context = runtime.getCamelContext();
final String[] components = overridableComponents.split(",", -1);
for (RouteDefinition def : builder.getRouteCollection().getRoutes()) {
String uri = def.getInput() != null ? def.getInput().getUri() : null;
if (shouldBeOverridden(uri, components)) {
def.getInput().setUri(timerUri);
//
// Don't install the shutdown strategy more than once.
//
if (context.getManagementStrategy().getEventNotifiers().stream().noneMatch(CronShutdownStrategy.class::isInstance)) {
CronShutdownStrategy strategy = new CronShutdownStrategy(runtime);
ServiceHelper.startService(strategy);
context.getManagementStrategy().addEventNotifier(strategy);
}
}
}
}
private static boolean shouldBeOverridden(String uri, String... components) {
if (uri == null) {
return false;
}
for (String c : components) {
if (uri.startsWith(c + ":")) {
return true;
}
}
return false;
}
private static class CronShutdownStrategy extends EventNotifierSupport {
private static final Logger LOGGER = LoggerFactory.getLogger(CronShutdownStrategy.class);
private final Runtime runtime;
public CronShutdownStrategy(Runtime runtime) {
this.runtime = runtime;
}
@Override
public void notify(CamelEvent event) throws Exception {
LOGGER.info("Initiate runtime shutdown");
this.runtime.getCamelContext().getExecutorServiceManager().newThread("CronShutdownStrategy", () -> {
try {
LOGGER.info("Shutting down the runtime");
runtime.stop();
} catch (Exception e) {
LOGGER.warn("Error while shutting down the runtime", e);
}
}).start();
}
@Override
public boolean isEnabled(CamelEvent event) {
return event instanceof CamelEvent.ExchangeCompletedEvent || event instanceof CamelEvent.ExchangeFailedEvent;
}
}
}