blob: 245765d81a002951f8c59e35377a9e02d8357bb7 [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.impl;
import java.io.File;
import java.lang.management.ManagementFactory;
import java.lang.management.MemoryMXBean;
import java.util.LinkedHashSet;
import java.util.Set;
import java.util.UUID;
import org.apache.camel.CamelContext;
import org.apache.camel.CamelContextAware;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.StreamCache;
import org.apache.camel.spi.StreamCachingStrategy;
import org.apache.camel.util.FilePathResolver;
import org.apache.camel.util.FileUtil;
import org.apache.camel.util.IOHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Default implementation of {@link StreamCachingStrategy}
*/
public class DefaultStreamCachingStrategy extends org.apache.camel.support.ServiceSupport implements CamelContextAware, StreamCachingStrategy {
@Deprecated
public static final String THRESHOLD = "CamelCachedOutputStreamThreshold";
@Deprecated
public static final String BUFFER_SIZE = "CamelCachedOutputStreamBufferSize";
@Deprecated
public static final String TEMP_DIR = "CamelCachedOutputStreamOutputDirectory";
@Deprecated
public static final String CIPHER_TRANSFORMATION = "CamelCachedOutputStreamCipherTransformation";
private static final Logger LOG = LoggerFactory.getLogger(DefaultStreamCachingStrategy.class);
private CamelContext camelContext;
private boolean enabled;
private File spoolDirectory;
private transient String spoolDirectoryName = "${java.io.tmpdir}/camel/camel-tmp-#uuid#";
private long spoolThreshold = StreamCache.DEFAULT_SPOOL_THRESHOLD;
private int spoolUsedHeapMemoryThreshold;
private SpoolUsedHeapMemoryLimit spoolUsedHeapMemoryLimit;
private String spoolChiper;
private int bufferSize = IOHelper.DEFAULT_BUFFER_SIZE;
private boolean removeSpoolDirectoryWhenStopping = true;
private final UtilizationStatistics statistics = new UtilizationStatistics();
private final Set<SpoolRule> spoolRules = new LinkedHashSet<SpoolRule>();
private boolean anySpoolRules;
public CamelContext getCamelContext() {
return camelContext;
}
public void setCamelContext(CamelContext camelContext) {
this.camelContext = camelContext;
}
public boolean isEnabled() {
return enabled;
}
public void setEnabled(boolean enabled) {
this.enabled = enabled;
}
public void setSpoolDirectory(String path) {
this.spoolDirectoryName = path;
}
public void setSpoolDirectory(File path) {
this.spoolDirectory = path;
}
public File getSpoolDirectory() {
return spoolDirectory;
}
public long getSpoolThreshold() {
return spoolThreshold;
}
public int getSpoolUsedHeapMemoryThreshold() {
return spoolUsedHeapMemoryThreshold;
}
public void setSpoolUsedHeapMemoryThreshold(int spoolHeapMemoryWatermarkThreshold) {
this.spoolUsedHeapMemoryThreshold = spoolHeapMemoryWatermarkThreshold;
}
public SpoolUsedHeapMemoryLimit getSpoolUsedHeapMemoryLimit() {
return spoolUsedHeapMemoryLimit;
}
public void setSpoolUsedHeapMemoryLimit(SpoolUsedHeapMemoryLimit spoolUsedHeapMemoryLimit) {
this.spoolUsedHeapMemoryLimit = spoolUsedHeapMemoryLimit;
}
public void setSpoolThreshold(long spoolThreshold) {
this.spoolThreshold = spoolThreshold;
}
public String getSpoolChiper() {
return spoolChiper;
}
public void setSpoolChiper(String spoolChiper) {
this.spoolChiper = spoolChiper;
}
public int getBufferSize() {
return bufferSize;
}
public void setBufferSize(int bufferSize) {
this.bufferSize = bufferSize;
}
public boolean isRemoveSpoolDirectoryWhenStopping() {
return removeSpoolDirectoryWhenStopping;
}
public void setRemoveSpoolDirectoryWhenStopping(boolean removeSpoolDirectoryWhenStopping) {
this.removeSpoolDirectoryWhenStopping = removeSpoolDirectoryWhenStopping;
}
public boolean isAnySpoolRules() {
return anySpoolRules;
}
public void setAnySpoolRules(boolean anySpoolTasks) {
this.anySpoolRules = anySpoolTasks;
}
public Statistics getStatistics() {
return statistics;
}
public boolean shouldSpoolCache(long length) {
if (!enabled || spoolRules.isEmpty()) {
return false;
}
boolean all = true;
boolean any = false;
for (SpoolRule rule : spoolRules) {
boolean result = rule.shouldSpoolCache(length);
if (!result) {
all = false;
if (!anySpoolRules) {
// no need to check anymore
break;
}
} else {
any = true;
if (anySpoolRules) {
// no need to check anymore
break;
}
}
}
boolean answer = anySpoolRules ? any : all;
LOG.debug("Should spool cache {} -> {}", length, answer);
return answer;
}
public void addSpoolRule(SpoolRule rule) {
spoolRules.add(rule);
}
public StreamCache cache(Exchange exchange) {
Message message = exchange.hasOut() ? exchange.getOut() : exchange.getIn();
StreamCache cache = message.getBody(StreamCache.class);
if (cache != null) {
if (LOG.isTraceEnabled()) {
LOG.trace("Cached stream to {} -> {}", cache.inMemory() ? "memory" : "spool", cache);
}
if (statistics.isStatisticsEnabled()) {
try {
if (cache.inMemory()) {
statistics.updateMemory(cache.length());
} else {
statistics.updateSpool(cache.length());
}
} catch (Exception e) {
LOG.debug("Error updating cache statistics. This exception is ignored.", e);
}
}
}
return cache;
}
protected String resolveSpoolDirectory(String path) {
String name = camelContext.getManagementNameStrategy().resolveManagementName(path, camelContext.getName(), false);
if (name != null) {
name = customResolveManagementName(name);
}
// and then check again with invalid check to ensure all ## is resolved
if (name != null) {
name = camelContext.getManagementNameStrategy().resolveManagementName(name, camelContext.getName(), true);
}
return name;
}
protected String customResolveManagementName(String pattern) {
if (pattern.contains("#uuid#")) {
String uuid = UUID.randomUUID().toString();
pattern = pattern.replaceFirst("#uuid#", uuid);
}
return FilePathResolver.resolvePath(pattern);
}
@Override
protected void doStart() throws Exception {
if (!enabled) {
LOG.debug("StreamCaching is not enabled");
return;
}
String bufferSize = camelContext.getProperty(BUFFER_SIZE);
String hold = camelContext.getProperty(THRESHOLD);
String chiper = camelContext.getProperty(CIPHER_TRANSFORMATION);
String dir = camelContext.getProperty(TEMP_DIR);
boolean warn = false;
if (bufferSize != null) {
warn = true;
this.bufferSize = camelContext.getTypeConverter().convertTo(Integer.class, bufferSize);
}
if (hold != null) {
warn = true;
this.spoolThreshold = camelContext.getTypeConverter().convertTo(Long.class, hold);
}
if (chiper != null) {
warn = true;
this.spoolChiper = chiper;
}
if (dir != null) {
warn = true;
this.spoolDirectory = camelContext.getTypeConverter().convertTo(File.class, dir);
}
if (warn) {
LOG.warn("Configuring of StreamCaching using CamelContext properties is deprecated - use StreamCachingStrategy instead.");
}
if (spoolUsedHeapMemoryThreshold > 99) {
throw new IllegalArgumentException("SpoolHeapMemoryWatermarkThreshold must not be higher than 99, was: " + spoolUsedHeapMemoryThreshold);
}
// if we can overflow to disk then make sure directory exists / is created
if (spoolThreshold > 0 || spoolUsedHeapMemoryThreshold > 0) {
if (spoolDirectory == null && spoolDirectoryName == null) {
throw new IllegalArgumentException("SpoolDirectory must be configured when using SpoolThreshold > 0");
}
if (spoolDirectory == null) {
String name = resolveSpoolDirectory(spoolDirectoryName);
if (name != null) {
spoolDirectory = new File(name);
spoolDirectoryName = null;
} else {
throw new IllegalStateException("Cannot resolve spool directory from pattern: " + spoolDirectoryName);
}
}
if (spoolDirectory.exists()) {
if (spoolDirectory.isDirectory()) {
LOG.debug("Using spool directory: {}", spoolDirectory);
} else {
LOG.warn("Spool directory: {} is not a directory. This may cause problems spooling to disk for the stream caching!", spoolDirectory);
}
} else {
boolean created = spoolDirectory.mkdirs();
if (!created) {
LOG.warn("Cannot create spool directory: {}. This may cause problems spooling to disk for the stream caching!", spoolDirectory);
} else {
LOG.debug("Created spool directory: {}", spoolDirectory);
}
}
if (spoolThreshold > 0) {
spoolRules.add(new FixedThresholdSpoolRule());
}
if (spoolUsedHeapMemoryThreshold > 0) {
if (spoolUsedHeapMemoryLimit == null) {
// use max by default
spoolUsedHeapMemoryLimit = SpoolUsedHeapMemoryLimit.Max;
}
spoolRules.add(new UsedHeapMemorySpoolRule(spoolUsedHeapMemoryLimit));
}
}
LOG.debug("StreamCaching configuration {}", this.toString());
if (spoolDirectory != null) {
LOG.info("StreamCaching in use with spool directory: {} and rules: {}", spoolDirectory.getPath(), spoolRules.toString());
} else {
LOG.info("StreamCaching in use with rules: {}", spoolRules.toString());
}
}
@Override
protected void doStop() throws Exception {
if (spoolThreshold > 0 & spoolDirectory != null && isRemoveSpoolDirectoryWhenStopping()) {
LOG.debug("Removing spool directory: {}", spoolDirectory);
FileUtil.removeDir(spoolDirectory);
}
if (LOG.isDebugEnabled() && statistics.isStatisticsEnabled()) {
LOG.debug("Stopping StreamCachingStrategy with statistics: {}", statistics.toString());
}
statistics.reset();
}
@Override
public String toString() {
return "DefaultStreamCachingStrategy["
+ "spoolDirectory=" + spoolDirectory
+ ", spoolChiper=" + spoolChiper
+ ", spoolThreshold=" + spoolThreshold
+ ", spoolUsedHeapMemoryThreshold=" + spoolUsedHeapMemoryThreshold
+ ", bufferSize=" + bufferSize
+ ", anySpoolRules=" + anySpoolRules + "]";
}
private final class FixedThresholdSpoolRule implements SpoolRule {
public boolean shouldSpoolCache(long length) {
if (spoolThreshold > 0 && length > spoolThreshold) {
LOG.trace("Should spool cache fixed threshold {} > {} -> true", length, spoolThreshold);
return true;
}
return false;
}
public String toString() {
if (spoolThreshold < 1024) {
return "Spool > " + spoolThreshold + " bytes body size";
} else {
return "Spool > " + (spoolThreshold >> 10) + "K body size";
}
}
}
private final class UsedHeapMemorySpoolRule implements SpoolRule {
private final MemoryMXBean heapUsage;
private final SpoolUsedHeapMemoryLimit limit;
private UsedHeapMemorySpoolRule(SpoolUsedHeapMemoryLimit limit) {
this.limit = limit;
this.heapUsage = ManagementFactory.getMemoryMXBean();
}
public boolean shouldSpoolCache(long length) {
if (spoolUsedHeapMemoryThreshold > 0) {
// must use double to calculate with decimals for the percentage
double used = heapUsage.getHeapMemoryUsage().getUsed();
double upper = limit == SpoolUsedHeapMemoryLimit.Committed
? heapUsage.getHeapMemoryUsage().getCommitted() : heapUsage.getHeapMemoryUsage().getMax();
double calc = (used / upper) * 100;
int percentage = (int) calc;
if (LOG.isTraceEnabled()) {
long u = heapUsage.getHeapMemoryUsage().getUsed();
long c = heapUsage.getHeapMemoryUsage().getCommitted();
long m = heapUsage.getHeapMemoryUsage().getMax();
LOG.trace("Heap memory: [used={}M ({}%), committed={}M, max={}M]", new Object[]{u >> 20, percentage, c >> 20, m >> 20});
}
if (percentage > spoolUsedHeapMemoryThreshold) {
LOG.trace("Should spool cache heap memory threshold {} > {} -> true", percentage, spoolUsedHeapMemoryThreshold);
return true;
}
}
return false;
}
public String toString() {
return "Spool > " + spoolUsedHeapMemoryThreshold + "% used of " + limit + " heap memory";
}
}
/**
* Represents utilization statistics.
*/
private static final class UtilizationStatistics implements Statistics {
private boolean statisticsEnabled;
private volatile long memoryCounter;
private volatile long memorySize;
private volatile long memoryAverageSize;
private volatile long spoolCounter;
private volatile long spoolSize;
private volatile long spoolAverageSize;
synchronized void updateMemory(long size) {
memoryCounter++;
memorySize += size;
memoryAverageSize = memorySize / memoryCounter;
}
synchronized void updateSpool(long size) {
spoolCounter++;
spoolSize += size;
spoolAverageSize = spoolSize / spoolCounter;
}
public long getCacheMemoryCounter() {
return memoryCounter;
}
public long getCacheMemorySize() {
return memorySize;
}
public long getCacheMemoryAverageSize() {
return memoryAverageSize;
}
public long getCacheSpoolCounter() {
return spoolCounter;
}
public long getCacheSpoolSize() {
return spoolSize;
}
public long getCacheSpoolAverageSize() {
return spoolAverageSize;
}
public synchronized void reset() {
memoryCounter = 0;
memorySize = 0;
memoryAverageSize = 0;
spoolCounter = 0;
spoolSize = 0;
spoolAverageSize = 0;
}
public boolean isStatisticsEnabled() {
return statisticsEnabled;
}
public void setStatisticsEnabled(boolean statisticsEnabled) {
this.statisticsEnabled = statisticsEnabled;
}
public String toString() {
return String.format("[memoryCounter=%s, memorySize=%s, memoryAverageSize=%s, spoolCounter=%s, spoolSize=%s, spoolAverageSize=%s]",
memoryCounter, memorySize, memoryAverageSize, spoolCounter, spoolSize, spoolAverageSize);
}
}
}