blob: 724e26e615c939b720cf5556ce34825dedc1dc0a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.nifi.controller.repository;
import java.io.ByteArrayInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.FileVisitResult;
import java.nio.file.Files;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.nio.file.SimpleFileVisitor;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.regex.Pattern;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.controller.repository.claim.ContentClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaim;
import org.apache.nifi.controller.repository.claim.ResourceClaimManager;
import org.apache.nifi.controller.repository.claim.StandardContentClaim;
import org.apache.nifi.controller.repository.io.LimitedInputStream;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.stream.io.ByteCountingOutputStream;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.stream.io.SynchronizedByteCountingOutputStream;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.LongHolder;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.file.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Is thread safe
*
*/
public class FileSystemRepository implements ContentRepository {
public static final int SECTIONS_PER_CONTAINER = 1024;
public static final String ARCHIVE_DIR_NAME = "archive";
public static final Pattern MAX_ARCHIVE_SIZE_PATTERN = Pattern.compile("\\d{1,2}%");
private static final Logger LOG = LoggerFactory.getLogger(FileSystemRepository.class);
private final Map<String, Path> containers;
private final List<String> containerNames;
private final AtomicLong index;
private final ScheduledExecutorService executor = new FlowEngine(4, "FileSystemRepository Workers", true);
private final ConcurrentMap<String, BlockingQueue<ResourceClaim>> reclaimable = new ConcurrentHashMap<>();
private final Map<String, ContainerState> containerStateMap = new HashMap<>();
// 1 MB. This could be adjusted but 1 MB seems reasonable, as it means that we won't continually write to one
// file that keeps growing but gives us a chance to bunch together a lot of small files. Before, we had issues
// with creating and deleting too many files, as we had to delete 100's of thousands of files every 2 minutes
// in order to avoid backpressure on session commits. With 1 MB as the target file size, 100's of thousands of
// files would mean that we are writing gigabytes per second - quite a bit faster than any disks can handle now.
private final long maxAppendClaimLength = 1024L * 1024L;
// Queue for claims that are kept open for writing. Size of 100 is pretty arbitrary. Ideally, this will be at
// least as large as the number of threads that will be updating the repository simultaneously but we don't want
// to get too large because it will hold open up to this many FileOutputStreams.
// The queue is used to determine which claim to write to and then the corresponding Map can be used to obtain
// the OutputStream that we can use for writing to the claim.
private final BlockingQueue<ClaimLengthPair> writableClaimQueue = new LinkedBlockingQueue<>(100);
private final ConcurrentMap<ResourceClaim, ByteCountingOutputStream> writableClaimStreams = new ConcurrentHashMap<>(100);
private final Set<ResourceClaim> activeResourceClaims = Collections.synchronizedSet(new HashSet<ResourceClaim>());
private final boolean archiveData;
private final long maxArchiveMillis;
private final Map<String, Long> minUsableContainerBytesForArchive = new HashMap<>();
private final boolean alwaysSync;
private final ScheduledExecutorService containerCleanupExecutor;
private ResourceClaimManager resourceClaimManager; // effectively final
// Map of contianer to archived files that should be deleted next.
private final Map<String, BlockingQueue<ArchiveInfo>> archivedFiles = new HashMap<>();
// guarded by synchronizing on this
private final AtomicLong oldestArchiveDate = new AtomicLong(0L);
public FileSystemRepository() throws IOException {
final NiFiProperties properties = NiFiProperties.getInstance();
// determine the file repository paths and ensure they exist
final Map<String, Path> fileRespositoryPaths = properties.getContentRepositoryPaths();
for (final Path path : fileRespositoryPaths.values()) {
Files.createDirectories(path);
}
this.containers = new HashMap<>(fileRespositoryPaths);
this.containerNames = new ArrayList<>(containers.keySet());
index = new AtomicLong(0L);
for (final String containerName : containerNames) {
reclaimable.put(containerName, new LinkedBlockingQueue<ResourceClaim>(10000));
archivedFiles.put(containerName, new LinkedBlockingQueue<ArchiveInfo>(100000));
}
final String enableArchiving = properties.getProperty(NiFiProperties.CONTENT_ARCHIVE_ENABLED);
final String maxArchiveRetentionPeriod = properties.getProperty(NiFiProperties.CONTENT_ARCHIVE_MAX_RETENTION_PERIOD);
final String maxArchiveSize = properties.getProperty(NiFiProperties.CONTENT_ARCHIVE_MAX_USAGE_PERCENTAGE);
final String archiveBackPressureSize = properties.getProperty(NiFiProperties.CONTENT_ARCHIVE_BACK_PRESSURE_PERCENTAGE);
if ("true".equalsIgnoreCase(enableArchiving)) {
archiveData = true;
if (maxArchiveSize == null) {
throw new RuntimeException("No value specified for property '"
+ NiFiProperties.CONTENT_ARCHIVE_MAX_USAGE_PERCENTAGE + "' but archiving is enabled. You must configure the max disk usage in order to enable archiving.");
}
if (!MAX_ARCHIVE_SIZE_PATTERN.matcher(maxArchiveSize.trim()).matches()) {
throw new RuntimeException("Invalid value specified for the '" + NiFiProperties.CONTENT_ARCHIVE_MAX_USAGE_PERCENTAGE + "' property. Value must be in format: <XX>%");
}
} else if ("false".equalsIgnoreCase(enableArchiving)) {
archiveData = false;
} else {
LOG.warn("No property set for '{}'; will not archive content", NiFiProperties.CONTENT_ARCHIVE_ENABLED);
archiveData = false;
}
double maxArchiveRatio = 0D;
double archiveBackPressureRatio = 0.01D;
if (maxArchiveSize != null && MAX_ARCHIVE_SIZE_PATTERN.matcher(maxArchiveSize.trim()).matches()) {
maxArchiveRatio = getRatio(maxArchiveSize);
if (archiveBackPressureSize != null && MAX_ARCHIVE_SIZE_PATTERN.matcher(archiveBackPressureSize.trim()).matches()) {
archiveBackPressureRatio = getRatio(archiveBackPressureSize);
} else {
archiveBackPressureRatio = maxArchiveRatio + 0.02D;
}
}
if (maxArchiveRatio > 0D) {
for (final Map.Entry<String, Path> container : containers.entrySet()) {
final String containerName = container.getKey();
final long capacity = Files.getFileStore(container.getValue()).getTotalSpace();
final long maxArchiveBytes = (long) (capacity * (1D - (maxArchiveRatio - 0.02)));
minUsableContainerBytesForArchive.put(container.getKey(), Long.valueOf(maxArchiveBytes));
LOG.info("Maximum Threshold for Container {} set to {} bytes; if volume exceeds this size, archived data will be deleted until it no longer exceeds this size",
containerName, maxArchiveBytes);
final long backPressureBytes = (long) (Files.getFileStore(container.getValue()).getTotalSpace() * archiveBackPressureRatio);
final ContainerState containerState = new ContainerState(containerName, true, backPressureBytes, capacity);
containerStateMap.put(containerName, containerState);
}
} else {
for (final String containerName : containerNames) {
containerStateMap.put(containerName, new ContainerState(containerName, false, Long.MAX_VALUE, Long.MAX_VALUE));
}
}
if (maxArchiveRatio <= 0D) {
maxArchiveMillis = 0L;
} else {
maxArchiveMillis = StringUtils.isEmpty(maxArchiveRetentionPeriod) ? Long.MAX_VALUE : FormatUtils.getTimeDuration(maxArchiveRetentionPeriod, TimeUnit.MILLISECONDS);
}
this.alwaysSync = Boolean.parseBoolean(properties.getProperty("nifi.content.repository.always.sync"));
LOG.info("Initializing FileSystemRepository with 'Always Sync' set to {}", alwaysSync);
initializeRepository();
containerCleanupExecutor = new FlowEngine(containers.size(), "Cleanup FileSystemRepository Container", true);
}
@Override
public void initialize(final ResourceClaimManager claimManager) {
this.resourceClaimManager = claimManager;
final NiFiProperties properties = NiFiProperties.getInstance();
final Map<String, Path> fileRespositoryPaths = properties.getContentRepositoryPaths();
executor.scheduleWithFixedDelay(new BinDestructableClaims(), 1, 1, TimeUnit.SECONDS);
for (int i = 0; i < fileRespositoryPaths.size(); i++) {
executor.scheduleWithFixedDelay(new ArchiveOrDestroyDestructableClaims(), 1, 1, TimeUnit.SECONDS);
}
final String archiveCleanupFrequency = properties.getProperty(NiFiProperties.CONTENT_ARCHIVE_CLEANUP_FREQUENCY);
final long cleanupMillis;
if (archiveCleanupFrequency == null) {
cleanupMillis = 1000L;
} else {
try {
cleanupMillis = FormatUtils.getTimeDuration(archiveCleanupFrequency.trim(), TimeUnit.MILLISECONDS);
} catch (final Exception e) {
throw new RuntimeException("Invalid value set for property " + NiFiProperties.CONTENT_ARCHIVE_CLEANUP_FREQUENCY);
}
}
for (final Map.Entry<String, Path> containerEntry : containers.entrySet()) {
final String containerName = containerEntry.getKey();
final Path containerPath = containerEntry.getValue();
final Runnable cleanup = new DestroyExpiredArchiveClaims(containerName, containerPath);
containerCleanupExecutor.scheduleWithFixedDelay(cleanup, cleanupMillis, cleanupMillis, TimeUnit.MILLISECONDS);
}
}
@Override
public void shutdown() {
executor.shutdown();
containerCleanupExecutor.shutdown();
// Close any of the writable claim streams that are currently open.
// Other threads may be writing to these streams, and that's okay.
// If that happens, we will simply close the stream, resulting in an
// IOException that will roll back the session. Since this is called
// only on shutdown of the application, we don't have to worry about
// partially written files - on restart, we will simply start writing
// to new files and leave those trailing bytes alone.
for (final OutputStream out : writableClaimStreams.values()) {
try {
out.close();
} catch (final IOException ioe) {
}
}
}
private static double getRatio(final String value) {
final String trimmed = value.trim();
final String percentage = trimmed.substring(0, trimmed.length() - 1);
return Integer.parseInt(percentage) / 100D;
}
private synchronized void initializeRepository() throws IOException {
final Map<String, Path> realPathMap = new HashMap<>();
final ExecutorService executor = Executors.newFixedThreadPool(containers.size());
final List<Future<Long>> futures = new ArrayList<>();
// Run through each of the containers. For each container, create the sections if necessary.
// Then, we need to scan through the archived data so that we can determine what the oldest
// archived data is, so that we know when we have to start aging data off.
for (final Map.Entry<String, Path> container : containers.entrySet()) {
final String containerName = container.getKey();
final ContainerState containerState = containerStateMap.get(containerName);
final Path containerPath = container.getValue();
final boolean pathExists = Files.exists(containerPath);
final Path realPath;
if (pathExists) {
realPath = containerPath.toRealPath();
} else {
realPath = Files.createDirectories(containerPath).toRealPath();
}
for (int i = 0; i < SECTIONS_PER_CONTAINER; i++) {
Files.createDirectories(realPath.resolve(String.valueOf(i)));
}
realPathMap.put(containerName, realPath);
// We need to scan the archive directories to find out the oldest timestamp so that know whether or not we
// will have to delete archived data based on time threshold. Scanning all of the directories can be very
// expensive because of all of the disk accesses. So we do this in multiple threads. Since containers are
// often unique to a disk, we just map 1 thread to each container.
final Callable<Long> scanContainer = new Callable<Long>() {
@Override
public Long call() throws IOException {
final LongHolder oldestDateHolder = new LongHolder(0L);
// the path already exists, so scan the path to find any files and update maxIndex to the max of
// all filenames seen.
Files.walkFileTree(realPath, new SimpleFileVisitor<Path>() {
@Override
public FileVisitResult visitFile(final Path file, final BasicFileAttributes attrs) throws IOException {
if (attrs.isDirectory()) {
return FileVisitResult.CONTINUE;
}
// Check if this is an 'archive' directory
final Path relativePath = realPath.relativize(file);
if (relativePath.getNameCount() > 3 && ARCHIVE_DIR_NAME.equals(relativePath.subpath(1, 2).toString())) {
final long lastModifiedTime = getLastModTime(file);
if (lastModifiedTime < oldestDateHolder.get()) {
oldestDateHolder.set(lastModifiedTime);
}
containerState.incrementArchiveCount();
}
return FileVisitResult.CONTINUE;
}
});
return oldestDateHolder.get();
}
};
// If the path didn't exist to begin with, there's no archive directory, so don't bother scanning.
if (pathExists) {
futures.add(executor.submit(scanContainer));
}
}
executor.shutdown();
for (final Future<Long> future : futures) {
try {
final Long oldestDate = future.get();
if (oldestDate < oldestArchiveDate.get()) {
oldestArchiveDate.set(oldestDate);
}
} catch (final ExecutionException | InterruptedException e) {
if (e.getCause() instanceof IOException) {
throw (IOException) e.getCause();
} else {
throw new RuntimeException(e);
}
}
}
containers.clear();
containers.putAll(realPathMap);
}
@Override
public Set<String> getContainerNames() {
return new HashSet<>(containerNames);
}
@Override
public long getContainerCapacity(final String containerName) throws IOException {
final Path path = containers.get(containerName);
if (path == null) {
throw new IllegalArgumentException("No container exists with name " + containerName);
}
return Files.getFileStore(path).getTotalSpace();
}
@Override
public long getContainerUsableSpace(String containerName) throws IOException {
final Path path = containers.get(containerName);
if (path == null) {
throw new IllegalArgumentException("No container exists with name " + containerName);
}
return Files.getFileStore(path).getUsableSpace();
}
@Override
public void cleanup() {
for (final Map.Entry<String, Path> entry : containers.entrySet()) {
final String containerName = entry.getKey();
final Path containerPath = entry.getValue();
final File[] sectionFiles = containerPath.toFile().listFiles();
if (sectionFiles != null) {
for (final File sectionFile : sectionFiles) {
removeIncompleteContent(containerName, containerPath, sectionFile.toPath());
}
}
}
}
private void removeIncompleteContent(final String containerName, final Path containerPath, final Path fileToRemove) {
if (Files.isDirectory(fileToRemove)) {
final Path lastPathName = fileToRemove.subpath(1, fileToRemove.getNameCount());
final String fileName = lastPathName.toFile().getName();
if (fileName.equals(ARCHIVE_DIR_NAME)) {
return;
}
final File[] children = fileToRemove.toFile().listFiles();
if (children != null) {
for (final File child : children) {
removeIncompleteContent(containerName, containerPath, child.toPath());
}
}
return;
}
final Path relativePath = containerPath.relativize(fileToRemove);
final Path sectionPath = relativePath.subpath(0, 1);
if (relativePath.getNameCount() < 2) {
return;
}
final Path idPath = relativePath.subpath(1, relativePath.getNameCount());
final String id = idPath.toFile().getName();
final String sectionName = sectionPath.toFile().getName();
final ResourceClaim resourceClaim = resourceClaimManager.newResourceClaim(containerName, sectionName, id, false);
if (resourceClaimManager.getClaimantCount(resourceClaim) == 0) {
removeIncompleteContent(fileToRemove);
}
}
private void removeIncompleteContent(final Path fileToRemove) {
String fileDescription = null;
try {
fileDescription = fileToRemove.toFile().getAbsolutePath() + " (" + Files.size(fileToRemove) + " bytes)";
} catch (final IOException e) {
fileDescription = fileToRemove.toFile().getAbsolutePath() + " (unknown file size)";
}
LOG.info("Found unknown file {} in File System Repository; {} file", fileDescription, archiveData ? "archiving" : "removing");
try {
if (archiveData) {
archive(fileToRemove);
} else {
Files.delete(fileToRemove);
}
} catch (final IOException e) {
final String action = archiveData ? "archive" : "remove";
LOG.warn("Unable to {} unknown file {} from File System Repository due to {}", action, fileDescription, e.toString());
LOG.warn("", e);
}
}
private Path getPath(final ContentClaim claim) {
final ResourceClaim resourceClaim = claim.getResourceClaim();
return getPath(resourceClaim);
}
private Path getPath(final ResourceClaim resourceClaim) {
final Path containerPath = containers.get(resourceClaim.getContainer());
if (containerPath == null) {
return null;
}
return containerPath.resolve(resourceClaim.getSection()).resolve(resourceClaim.getId());
}
private Path getPath(final ContentClaim claim, final boolean verifyExists) throws ContentNotFoundException {
final ResourceClaim resourceClaim = claim.getResourceClaim();
final Path containerPath = containers.get(resourceClaim.getContainer());
if (containerPath == null) {
if (verifyExists) {
throw new ContentNotFoundException(claim);
} else {
return null;
}
}
// Create the Path that points to the data
Path resolvedPath = containerPath.resolve(resourceClaim.getSection()).resolve(resourceClaim.getId());
// If the data does not exist, create a Path that points to where the data would exist in the archive directory.
if (!Files.exists(resolvedPath)) {
resolvedPath = getArchivePath(claim.getResourceClaim());
}
if (verifyExists && !Files.exists(resolvedPath)) {
throw new ContentNotFoundException(claim);
}
return resolvedPath;
}
@Override
public ContentClaim create(final boolean lossTolerant) throws IOException {
ResourceClaim resourceClaim;
// We need to synchronize on this queue because the act of pulling something off
// the queue and incrementing the associated claimant count MUST be done atomically.
// This way, if the claimant count is decremented to 0, we can ensure that the
// claim is not then pulled from the queue and used as another thread is destroying/archiving
// the claim. The logic in the remove() method dictates that the underlying file can be
// deleted (or archived) only if the claimant count becomes <= 0 AND there is no other claim on
// the queue that references that file. As a result, we need to ensure that those two conditions
// can be evaluated atomically. In order for that to be the case, we need to also treat the
// removal of a claim from the queue and the incrementing of its claimant count as an atomic
// action to ensure that the comparison of those two conditions is atomic also. As a result,
// we will synchronize on the queue while performing those actions.
final long resourceOffset;
synchronized (writableClaimQueue) {
final ClaimLengthPair pair = writableClaimQueue.poll();
if (pair == null) {
final long currentIndex = index.incrementAndGet();
String containerName = null;
boolean waitRequired = true;
ContainerState containerState = null;
for (long containerIndex = currentIndex; containerIndex < currentIndex + containers.size(); containerIndex++) {
final long modulatedContainerIndex = containerIndex % containers.size();
containerName = containerNames.get((int) modulatedContainerIndex);
containerState = containerStateMap.get(containerName);
if (!containerState.isWaitRequired()) {
waitRequired = false;
break;
}
}
if (waitRequired) {
containerState.waitForArchiveExpiration();
}
final long modulatedSectionIndex = currentIndex % SECTIONS_PER_CONTAINER;
final String section = String.valueOf(modulatedSectionIndex);
final String claimId = System.currentTimeMillis() + "-" + currentIndex;
resourceClaim = resourceClaimManager.newResourceClaim(containerName, section, claimId, lossTolerant);
resourceOffset = 0L;
LOG.debug("Creating new Resource Claim {}", resourceClaim);
} else {
resourceClaim = pair.getClaim();
resourceOffset = pair.getLength();
LOG.debug("Reusing Resource Claim {}", resourceClaim);
}
resourceClaimManager.incrementClaimantCount(resourceClaim, true);
}
final StandardContentClaim scc = new StandardContentClaim(resourceClaim, resourceOffset);
return scc;
}
@Override
public int incrementClaimaintCount(final ContentClaim claim) {
if (claim == null) {
return 0;
}
return resourceClaimManager.incrementClaimantCount(claim.getResourceClaim());
}
@Override
public int getClaimantCount(final ContentClaim claim) {
if (claim == null) {
return 0;
}
return resourceClaimManager.getClaimantCount(claim.getResourceClaim());
}
@Override
public int decrementClaimantCount(final ContentClaim claim) {
if (claim == null) {
return 0;
}
final int claimantCount = resourceClaimManager.decrementClaimantCount(claim.getResourceClaim());
return claimantCount;
}
@Override
public boolean remove(final ContentClaim claim) {
if (claim == null) {
return false;
}
return remove(claim.getResourceClaim());
}
private boolean remove(final ResourceClaim claim) {
if (claim == null) {
return false;
}
// we synchronize on the queue here because if the claimant count is 0,
// we need to be able to remove any instance of that resource claim from the
// queue atomically (i.e., the checking of the claimant count plus removal from the queue
// must be atomic). The create() method also synchronizes on the queue whenever it
// polls from the queue and increments a claimant count in order to ensure that these
// two conditions can be checked atomically.
synchronized (writableClaimQueue) {
final int claimantCount = resourceClaimManager.getClaimantCount(claim);
if (claimantCount > 0) {
// if other content claims are claiming the same resource, we have nothing to destroy,
// so just consider the destruction successful.
return true;
}
if (activeResourceClaims.contains(claim) || writableClaimQueue.contains(new ClaimLengthPair(claim, null))) {
// If we have an open OutputStream for the claim, we will not destroy the claim.
return false;
}
}
Path path = null;
try {
path = getPath(claim);
} catch (final ContentNotFoundException cnfe) {
}
final File file = path.toFile();
if (!file.delete() && file.exists()) {
LOG.warn("Unable to delete {} at path {}", new Object[]{claim, path});
return false;
}
return true;
}
@Override
public ContentClaim clone(final ContentClaim original, final boolean lossTolerant) throws IOException {
if (original == null) {
return null;
}
final ContentClaim newClaim = create(lossTolerant);
try (final InputStream in = read(original);
final OutputStream out = write(newClaim)) {
StreamUtils.copy(in, out);
} catch (final IOException ioe) {
decrementClaimantCount(newClaim);
remove(newClaim);
throw ioe;
}
return newClaim;
}
@Override
public long merge(final Collection<ContentClaim> claims, final ContentClaim destination, final byte[] header, final byte[] footer, final byte[] demarcator) throws IOException {
if (claims.contains(destination)) {
throw new IllegalArgumentException("destination cannot be within claims");
}
try (final ByteCountingOutputStream out = new ByteCountingOutputStream(write(destination))) {
if (header != null) {
out.write(header);
}
int i = 0;
for (final ContentClaim claim : claims) {
try (final InputStream in = read(claim)) {
StreamUtils.copy(in, out);
}
if (++i < claims.size() && demarcator != null) {
out.write(demarcator);
}
}
if (footer != null) {
out.write(footer);
}
return out.getBytesWritten();
}
}
@Override
public long importFrom(final Path content, final ContentClaim claim) throws IOException {
try (final InputStream in = Files.newInputStream(content, StandardOpenOption.READ)) {
return importFrom(in, claim);
}
}
@Override
public long importFrom(final InputStream content, final ContentClaim claim) throws IOException {
try (final OutputStream out = write(claim, false)) {
return StreamUtils.copy(content, out);
}
}
@Override
public long exportTo(final ContentClaim claim, final Path destination, final boolean append) throws IOException {
if (claim == null) {
if (append) {
return 0L;
}
Files.createFile(destination);
return 0L;
}
try (final InputStream in = read(claim);
final FileOutputStream fos = new FileOutputStream(destination.toFile(), append)) {
final long copied = StreamUtils.copy(in, fos);
if (alwaysSync) {
fos.getFD().sync();
}
return copied;
}
}
@Override
public long exportTo(final ContentClaim claim, final Path destination, final boolean append, final long offset, final long length) throws IOException {
if (claim == null && offset > 0) {
throw new IllegalArgumentException("Cannot specify an offset of " + offset + " for a null claim");
}
if (claim == null) {
if (append) {
return 0L;
}
Files.createFile(destination);
return 0L;
}
final long claimSize = size(claim);
if (offset > claimSize) {
throw new IllegalArgumentException("Offset of " + offset + " exceeds claim size of " + claimSize);
}
try (final InputStream in = read(claim);
final FileOutputStream fos = new FileOutputStream(destination.toFile(), append)) {
if (offset > 0) {
StreamUtils.skip(in, offset);
}
StreamUtils.copy(in, fos, length);
if (alwaysSync) {
fos.getFD().sync();
}
return length;
}
}
@Override
public long exportTo(final ContentClaim claim, final OutputStream destination) throws IOException {
if (claim == null) {
return 0L;
}
try (final InputStream in = read(claim)) {
return StreamUtils.copy(in, destination);
}
}
@Override
public long exportTo(final ContentClaim claim, final OutputStream destination, final long offset, final long length) throws IOException {
if (offset < 0) {
throw new IllegalArgumentException("offset cannot be negative");
}
final long claimSize = size(claim);
if (offset > claimSize) {
throw new IllegalArgumentException("offset of " + offset + " exceeds claim size of " + claimSize);
}
if (offset == 0 && length == claimSize) {
return exportTo(claim, destination);
}
try (final InputStream in = read(claim)) {
StreamUtils.skip(in, offset);
final byte[] buffer = new byte[8192];
int len;
long copied = 0L;
while ((len = in.read(buffer, 0, (int) Math.min(length - copied, buffer.length))) > 0) {
destination.write(buffer, 0, len);
copied += len;
}
return copied;
}
}
@Override
public long size(final ContentClaim claim) throws IOException {
if (claim == null) {
return 0L;
}
// see javadocs for claim.getLength() as to why we do this.
if (claim.getLength() < 0) {
return Files.size(getPath(claim, true)) - claim.getOffset();
}
return claim.getLength();
}
@Override
public InputStream read(final ContentClaim claim) throws IOException {
if (claim == null) {
return new ByteArrayInputStream(new byte[0]);
}
final Path path = getPath(claim, true);
final FileInputStream fis = new FileInputStream(path.toFile());
if (claim.getOffset() > 0L) {
StreamUtils.skip(fis, claim.getOffset());
}
// see javadocs for claim.getLength() as to why we do this.
if (claim.getLength() >= 0) {
return new LimitedInputStream(fis, claim.getLength());
} else {
return fis;
}
}
@Override
public OutputStream write(final ContentClaim claim) throws IOException {
return write(claim, false);
}
private OutputStream write(final ContentClaim claim, final boolean append) throws IOException {
if (claim == null) {
throw new NullPointerException("ContentClaim cannot be null");
}
if (!(claim instanceof StandardContentClaim)) {
// we know that we will only create Content Claims that are of type StandardContentClaim, so if we get anything
// else, just throw an Exception because it is not valid for this Repository
throw new IllegalArgumentException("Cannot write to " + claim + " because that Content Claim does belong to this Content Repository");
}
final StandardContentClaim scc = (StandardContentClaim) claim;
if (claim.getLength() > 0) {
throw new IllegalArgumentException("Cannot write to " + claim + " because it has already been written to.");
}
final ResourceClaim resourceClaim = claim.getResourceClaim();
// we always append because there may be another ContentClaim using the same resource claim.
// However, we know that we will never write to the same claim from two different threads
// at the same time because we will call create() to get the claim before we write to it,
// and when we call create(), it will remove it from the Queue, which means that no other
// thread will get the same Claim until we've finished writing to it.
ByteCountingOutputStream claimStream = writableClaimStreams.remove(scc.getResourceClaim());
final long initialLength;
if (claimStream == null) {
final File file = getPath(scc).toFile();
// use a synchronized stream because we want to pass this OutputStream out from one thread to another.
claimStream = new SynchronizedByteCountingOutputStream(new FileOutputStream(file, true), file.length());
initialLength = 0L;
} else {
if (append) {
initialLength = Math.max(0, scc.getLength());
} else {
initialLength = 0;
}
}
activeResourceClaims.add(resourceClaim);
final ByteCountingOutputStream bcos = claimStream;
final OutputStream out = new OutputStream() {
private long bytesWritten = 0L;
private boolean recycle = true;
private boolean closed = false;
@Override
public String toString() {
return "FileSystemRepository Stream [" + scc + "]";
}
@Override
public synchronized void write(final int b) throws IOException {
if (closed) {
throw new IOException("Stream is closed");
}
try {
bcos.write(b);
} catch (final IOException ioe) {
recycle = false;
throw new IOException("Failed to write to " + this, ioe);
}
bytesWritten++;
scc.setLength(bytesWritten + initialLength);
}
@Override
public synchronized void write(final byte[] b) throws IOException {
if (closed) {
throw new IOException("Stream is closed");
}
try {
bcos.write(b);
} catch (final IOException ioe) {
recycle = false;
throw new IOException("Failed to write to " + this, ioe);
}
bytesWritten += b.length;
scc.setLength(bytesWritten + initialLength);
}
@Override
public synchronized void write(final byte[] b, final int off, final int len) throws IOException {
if (closed) {
throw new IOException("Stream is closed");
}
try {
bcos.write(b, off, len);
} catch (final IOException ioe) {
recycle = false;
throw new IOException("Failed to write to " + this, ioe);
}
bytesWritten += len;
scc.setLength(bytesWritten + initialLength);
}
@Override
public synchronized void flush() throws IOException {
if (closed) {
throw new IOException("Stream is closed");
}
bcos.flush();
}
@Override
public synchronized void close() throws IOException {
closed = true;
activeResourceClaims.remove(resourceClaim);
if (alwaysSync) {
((FileOutputStream) bcos.getWrappedStream()).getFD().sync();
}
if (scc.getLength() < 0) {
// If claim was not written to, set length to 0
scc.setLength(0L);
}
// if we've not yet hit the threshold for appending to a resource claim, add the claim
// to the writableClaimQueue so that the Resource Claim can be used again when create()
// is called. In this case, we don't have to actually close the file stream. Instead, we
// can just add it onto the queue and continue to use it for the next content claim.
final long resourceClaimLength = scc.getOffset() + scc.getLength();
if (recycle && resourceClaimLength < maxAppendClaimLength) {
// we do not have to synchronize on the writable claim queue here because we
// are only adding something to the queue. We must synchronize if we are
// using a ResourceClaim from the queue and incrementing the claimant count on that resource
// because those need to be done atomically, or if we are destroying a claim that is on
// the queue because we need to ensure that the latter operation does not cause problems
// with the former.
final ClaimLengthPair pair = new ClaimLengthPair(scc.getResourceClaim(), resourceClaimLength);
final boolean enqueued = writableClaimQueue.offer(pair);
if (enqueued) {
writableClaimStreams.put(scc.getResourceClaim(), bcos);
LOG.debug("Claim length less than max; Adding {} back to writableClaimStreams", this);
} else {
bcos.close();
LOG.debug("Claim length less than max; Closing {} because could not add back to queue", this);
if (LOG.isTraceEnabled()) {
LOG.trace("Stack trace: ", new RuntimeException("Stack Trace for closing " + this));
}
}
} else {
// we've reached the limit for this claim. Don't add it back to our queue.
// Instead, just remove it and move on.
// ensure that the claim is no longer on the queue
writableClaimQueue.remove(new ClaimLengthPair(scc.getResourceClaim(), resourceClaimLength));
bcos.close();
LOG.debug("Claim lenth >= max; Closing {}", this);
if (LOG.isTraceEnabled()) {
LOG.trace("Stack trace: ", new RuntimeException("Stack Trace for closing " + this));
}
}
}
};
LOG.debug("Writing to {}", out);
if (LOG.isTraceEnabled()) {
LOG.trace("Stack trace: ", new RuntimeException("Stack Trace for writing to " + out));
}
return out;
}
@Override
public void purge() {
// delete all content from repositories
for (final Path path : containers.values()) {
FileUtils.deleteFilesInDir(path.toFile(), null, LOG, true);
}
for (final Path path : containers.values()) {
if (!Files.exists(path)) {
throw new RepositoryPurgeException("File " + path.toFile().getAbsolutePath() + " does not exist");
}
// Try up to 10 times to see if the directory is writable, in case another process (like a
// virus scanner) has the directory temporarily locked
boolean writable = false;
for (int i = 0; i < 10; i++) {
if (Files.isWritable(path)) {
writable = true;
break;
} else {
try {
Thread.sleep(100L);
} catch (final Exception e) {
}
}
}
if (!writable) {
throw new RepositoryPurgeException("File " + path.toFile().getAbsolutePath() + " is not writable");
}
}
resourceClaimManager.purge();
}
private class BinDestructableClaims implements Runnable {
@Override
public void run() {
try {
// Get all of the Destructable Claims and bin them based on their Container. We do this
// because the Container generally maps to a physical partition on the disk, so we want a few
// different threads hitting the different partitions but don't want multiple threads hitting
// the same partition.
final List<ResourceClaim> toDestroy = new ArrayList<>();
while (true) {
toDestroy.clear();
resourceClaimManager.drainDestructableClaims(toDestroy, 10000);
if (toDestroy.isEmpty()) {
return;
}
for (final ResourceClaim claim : toDestroy) {
final String container = claim.getContainer();
final BlockingQueue<ResourceClaim> claimQueue = reclaimable.get(container);
try {
while (true) {
if (claimQueue.offer(claim, 10, TimeUnit.MINUTES)) {
break;
} else {
LOG.warn("Failed to clean up {} because old claims aren't being cleaned up fast enough. "
+ "This Content Claim will remain in the Content Repository until NiFi is restarted, at which point it will be cleaned up", claim);
}
}
} catch (final InterruptedException ie) {
LOG.warn("Failed to clean up {} because thread was interrupted", claim);
}
}
}
} catch (final Throwable t) {
LOG.error("Failed to cleanup content claims due to {}", t);
}
}
}
public static Path getArchivePath(final Path contentClaimPath) {
final Path sectionPath = contentClaimPath.getParent();
final String claimId = contentClaimPath.toFile().getName();
return sectionPath.resolve(ARCHIVE_DIR_NAME).resolve(claimId);
}
private Path getArchivePath(final ResourceClaim claim) {
final String claimId = claim.getId();
final Path containerPath = containers.get(claim.getContainer());
final Path archivePath = containerPath.resolve(claim.getSection()).resolve(ARCHIVE_DIR_NAME).resolve(claimId);
return archivePath;
}
@Override
public boolean isAccessible(final ContentClaim contentClaim) throws IOException {
if (contentClaim == null) {
return false;
}
final Path path = getPath(contentClaim);
if (path == null) {
return false;
}
if (Files.exists(path)) {
return true;
}
return Files.exists(getArchivePath(contentClaim.getResourceClaim()));
}
private boolean archive(final ResourceClaim claim) throws IOException {
if (!archiveData) {
return false;
}
synchronized (writableClaimQueue) {
final int claimantCount = claim == null ? 0 : resourceClaimManager.getClaimantCount(claim);
if (claimantCount > 0 || writableClaimQueue.contains(new ClaimLengthPair(claim, null))) {
return false;
}
}
final Path curPath = getPath(claim);
if (curPath == null) {
return false;
}
final boolean archived = archive(curPath);
LOG.debug("Successfully moved {} to archive", claim);
return archived;
}
private boolean archive(final Path curPath) throws IOException {
// check if already archived
final boolean alreadyArchived = ARCHIVE_DIR_NAME.equals(curPath.getParent().toFile().getName());
if (alreadyArchived) {
return false;
}
final Path archivePath = getArchivePath(curPath);
if (curPath.equals(archivePath)) {
LOG.warn("Cannot archive {} because it is already archived", curPath);
return false;
}
try {
Files.move(curPath, archivePath);
return true;
} catch (final NoSuchFileException nsfee) {
// If the current path exists, try to create archive path and do the move again.
// Otherwise, either the content was removed or has already been archived. Either way,
// there's nothing that can be done.
if (Files.exists(curPath)) {
// The archive directory doesn't exist. Create now and try operation again.
// We do it this way, rather than ensuring that the directory exists ahead of time because
// it will be rare for the directory not to exist and we would prefer to have the overhead
// of the Exception being thrown in these cases, rather than have the overhead of checking
// for the existence of the directory continually.
Files.createDirectories(archivePath.getParent());
Files.move(curPath, archivePath);
return true;
}
return false;
}
}
private long getLastModTime(final File file) {
// the content claim identifier is created by concatenating System.currentTimeMillis(), "-", and a one-up number.
// However, it used to be just a one-up number. As a result, we can check for the timestamp and if present use it.
// If not present, we will use the last modified time.
final String filename = file.getName();
final int dashIndex = filename.indexOf("-");
if (dashIndex > 0) {
final String creationTimestamp = filename.substring(0, dashIndex);
try {
return Long.parseLong(creationTimestamp);
} catch (final NumberFormatException nfe) {
}
}
return file.lastModified();
}
private long getLastModTime(final Path file) throws IOException {
return getLastModTime(file.toFile());
}
private boolean deleteBasedOnTimestamp(final BlockingQueue<ArchiveInfo> fileQueue, final long removalTimeThreshold) throws IOException {
// check next file's last mod time.
final ArchiveInfo nextFile = fileQueue.peek();
if (nextFile == null) {
// Continue on to queue up the files, in case the next file must be destroyed based on time.
return false;
}
// If the last mod time indicates that it should be removed, just continue loop.
final long oldestArchiveDate = getLastModTime(nextFile.toPath());
return (oldestArchiveDate <= removalTimeThreshold);
}
private long destroyExpiredArchives(final String containerName, final Path container) throws IOException {
final List<ArchiveInfo> notYetExceedingThreshold = new ArrayList<>();
final long removalTimeThreshold = System.currentTimeMillis() - maxArchiveMillis;
long oldestArchiveDateFound = System.currentTimeMillis();
// determine how much space we must have in order to stop deleting old data
final Long minRequiredSpace = minUsableContainerBytesForArchive.get(containerName);
if (minRequiredSpace == null) {
return -1L;
}
final long usableSpace = getContainerUsableSpace(containerName);
final ContainerState containerState = containerStateMap.get(containerName);
// First, delete files from our queue
final long startNanos = System.nanoTime();
final long toFree = minRequiredSpace - usableSpace;
final BlockingQueue<ArchiveInfo> fileQueue = archivedFiles.get(containerName);
ArchiveInfo toDelete;
int deleteCount = 0;
long freed = 0L;
while ((toDelete = fileQueue.peek()) != null) {
try {
final long fileSize = toDelete.getSize();
// we use fileQueue.peek above instead of fileQueue.poll() because we don't always want to
// remove the head of the queue. Instead, we want to remove it only if we plan to delete it.
// In order to accomplish this, we just peek at the head and check if it should be deleted.
// If so, then we call poll() to remove it
if (freed < toFree || getLastModTime(toDelete.toPath()) < removalTimeThreshold) {
toDelete = fileQueue.poll(); // remove the head of the queue, which is already stored in 'toDelete'
Files.deleteIfExists(toDelete.toPath());
containerState.decrementArchiveCount();
LOG.debug("Deleted archived ContentClaim with ID {} from Container {} because the archival size was exceeding the max configured size", toDelete.getName(), containerName);
freed += fileSize;
deleteCount++;
}
// If we'd freed up enough space, we're done... unless the next file needs to be destroyed based on time.
if (freed >= toFree) {
// If the last mod time indicates that it should be removed, just continue loop.
if (deleteBasedOnTimestamp(fileQueue, removalTimeThreshold)) {
continue;
}
final ArchiveInfo archiveInfo = fileQueue.peek();
final long oldestArchiveDate = archiveInfo == null ? System.currentTimeMillis() : getLastModTime(archiveInfo.toPath());
// Otherwise, we're done. Return the last mod time of the oldest file in the container's archive.
final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos);
if (deleteCount > 0) {
LOG.info("Deleted {} files from archive for Container {}; oldest Archive Date is now {}; container cleanup took {} millis",
deleteCount, containerName, new Date(oldestArchiveDate), millis);
} else {
LOG.debug("Deleted {} files from archive for Container {}; oldest Archive Date is now {}; container cleanup took {} millis",
deleteCount, containerName, new Date(oldestArchiveDate), millis);
}
return oldestArchiveDate;
}
} catch (final IOException ioe) {
LOG.warn("Failed to delete {} from archive due to {}", toDelete, ioe.toString());
if (LOG.isDebugEnabled()) {
LOG.warn("", ioe);
}
}
}
// Go through each container and grab the archived data into a List
final StopWatch stopWatch = new StopWatch(true);
for (int i = 0; i < SECTIONS_PER_CONTAINER; i++) {
final Path sectionContainer = container.resolve(String.valueOf(i));
final Path archive = sectionContainer.resolve("archive");
if (!Files.exists(archive)) {
continue;
}
try {
Files.walkFileTree(archive, new SimpleFileVisitor<Path>() {
@Override
public FileVisitResult visitFile(final Path file, final BasicFileAttributes attrs) throws IOException {
if (attrs.isDirectory()) {
return FileVisitResult.CONTINUE;
}
final long lastModTime = getLastModTime(file);
if (lastModTime < removalTimeThreshold) {
try {
Files.deleteIfExists(file);
containerState.decrementArchiveCount();
LOG.debug("Deleted archived ContentClaim with ID {} from Container {} because it was older than the configured max archival duration",
file.toFile().getName(), containerName);
} catch (final IOException ioe) {
LOG.warn("Failed to remove archived ContentClaim with ID {} from Container {} due to {}", file.toFile().getName(), containerName, ioe.toString());
if (LOG.isDebugEnabled()) {
LOG.warn("", ioe);
}
}
} else if (usableSpace < minRequiredSpace) {
notYetExceedingThreshold.add(new ArchiveInfo(container, file, attrs.size(), lastModTime));
}
return FileVisitResult.CONTINUE;
}
});
} catch (final IOException ioe) {
LOG.warn("Failed to cleanup archived files in {} due to {}", archive, ioe.toString());
if (LOG.isDebugEnabled()) {
LOG.warn("", ioe);
}
}
}
final long deleteExpiredMillis = stopWatch.getElapsed(TimeUnit.MILLISECONDS);
// Sort the list according to last modified time
Collections.sort(notYetExceedingThreshold, new Comparator<ArchiveInfo>() {
@Override
public int compare(final ArchiveInfo o1, final ArchiveInfo o2) {
return Long.compare(o1.getLastModTime(), o2.getLastModTime());
}
});
final long sortRemainingMillis = stopWatch.getElapsed(TimeUnit.MILLISECONDS) - deleteExpiredMillis;
// Delete the oldest data
final Iterator<ArchiveInfo> itr = notYetExceedingThreshold.iterator();
int counter = 0;
while (itr.hasNext()) {
final ArchiveInfo archiveInfo = itr.next();
try {
final Path path = archiveInfo.toPath();
Files.deleteIfExists(path);
containerState.decrementArchiveCount();
LOG.debug("Deleted archived ContentClaim with ID {} from Container {} because the archival size was exceeding the max configured size", archiveInfo.getName(), containerName);
// Check if we've freed enough space every 25 files that we destroy
if (++counter % 25 == 0) {
if (getContainerUsableSpace(containerName) > minRequiredSpace) { // check if we can stop now
LOG.debug("Finished cleaning up archive for Container {}", containerName);
break;
}
}
} catch (final IOException ioe) {
LOG.warn("Failed to delete {} from archive due to {}", archiveInfo, ioe.toString());
if (LOG.isDebugEnabled()) {
LOG.warn("", ioe);
}
}
itr.remove();
}
final long deleteOldestMillis = stopWatch.getElapsed(TimeUnit.MILLISECONDS) - sortRemainingMillis - deleteExpiredMillis;
long oldestContainerArchive;
if (notYetExceedingThreshold.isEmpty()) {
oldestContainerArchive = System.currentTimeMillis();
} else {
oldestContainerArchive = notYetExceedingThreshold.get(0).getLastModTime();
}
if (oldestContainerArchive < oldestArchiveDateFound) {
oldestArchiveDateFound = oldestContainerArchive;
}
// Queue up the files in the order that they should be destroyed so that we don't have to scan the directories for a while.
for (final ArchiveInfo toEnqueue : notYetExceedingThreshold.subList(0, Math.min(100000, notYetExceedingThreshold.size()))) {
fileQueue.offer(toEnqueue);
}
final long cleanupMillis = stopWatch.getElapsed(TimeUnit.MILLISECONDS) - deleteOldestMillis - sortRemainingMillis - deleteExpiredMillis;
LOG.debug("Oldest Archive Date for Container {} is {}; delete expired = {} ms, sort remaining = {} ms, delete oldest = {} ms, cleanup = {} ms",
containerName, new Date(oldestContainerArchive), deleteExpiredMillis, sortRemainingMillis, deleteOldestMillis, cleanupMillis);
return oldestContainerArchive;
}
private class ArchiveOrDestroyDestructableClaims implements Runnable {
@Override
public void run() {
try {
// while there are claims waiting to be destroyed...
while (true) {
// look through each of the binned queues of Content Claims
int successCount = 0;
final List<ResourceClaim> toRemove = new ArrayList<>();
for (final Map.Entry<String, BlockingQueue<ResourceClaim>> entry : reclaimable.entrySet()) {
// drain the queue of all ContentClaims that can be destroyed for the given container.
final String container = entry.getKey();
final ContainerState containerState = containerStateMap.get(container);
toRemove.clear();
entry.getValue().drainTo(toRemove);
if (toRemove.isEmpty()) {
continue;
}
// destroy each claim for this container
final long start = System.nanoTime();
for (final ResourceClaim claim : toRemove) {
if (archiveData) {
try {
if (archive(claim)) {
containerState.incrementArchiveCount();
successCount++;
}
} catch (final Exception e) {
LOG.warn("Failed to archive {} due to {}", claim, e.toString());
if (LOG.isDebugEnabled()) {
LOG.warn("", e);
}
}
} else {
if (remove(claim)) {
successCount++;
}
}
}
final long nanos = System.nanoTime() - start;
final long millis = TimeUnit.NANOSECONDS.toMillis(nanos);
if (successCount == 0) {
LOG.debug("No ContentClaims archived/removed for Container {}", container);
} else {
LOG.info("Successfully {} {} Resource Claims for Container {} in {} millis", archiveData ? "archived" : "destroyed", successCount, container, millis);
}
}
// if we didn't destroy anything, we're done.
if (successCount == 0) {
return;
}
}
} catch (final Throwable t) {
LOG.error("Failed to handle destructable claims due to {}", t.toString());
if (LOG.isDebugEnabled()) {
LOG.error("", t);
}
}
}
}
private static class ArchiveInfo {
private final Path containerPath;
private final String relativePath;
private final String name;
private final long size;
private final long lastModTime;
public ArchiveInfo(final Path containerPath, final Path path, final long size, final long lastModTime) {
this.containerPath = containerPath;
this.relativePath = containerPath.relativize(path).toString();
this.name = path.toFile().getName();
this.size = size;
this.lastModTime = lastModTime;
}
public String getName() {
return name;
}
public long getSize() {
return size;
}
public long getLastModTime() {
return lastModTime;
}
public Path toPath() {
return containerPath.resolve(relativePath);
}
}
private class DestroyExpiredArchiveClaims implements Runnable {
private final String containerName;
private final Path containerPath;
private DestroyExpiredArchiveClaims(final String containerName, final Path containerPath) {
this.containerName = containerName;
this.containerPath = containerPath;
}
@Override
public void run() {
try {
if (oldestArchiveDate.get() > System.currentTimeMillis() - maxArchiveMillis) {
final Long minRequiredSpace = minUsableContainerBytesForArchive.get(containerName);
if (minRequiredSpace == null) {
return;
}
try {
final long usableSpace = getContainerUsableSpace(containerName);
if (usableSpace > minRequiredSpace) {
return;
}
} catch (final Exception e) {
LOG.error("Failed to determine space available in container {}; will attempt to cleanup archive", containerName);
}
}
Thread.currentThread().setName("Cleanup Archive for " + containerName);
final long oldestContainerArchive;
try {
oldestContainerArchive = destroyExpiredArchives(containerName, containerPath);
final ContainerState containerState = containerStateMap.get(containerName);
containerState.signalCreationReady(); // indicate that we've finished cleaning up the archive.
} catch (final IOException ioe) {
LOG.error("Failed to cleanup archive for container {} due to {}", containerName, ioe.toString());
if (LOG.isDebugEnabled()) {
LOG.error("", ioe);
}
return;
}
if (oldestContainerArchive < 0L) {
boolean updated;
do {
final long oldest = oldestArchiveDate.get();
if (oldestContainerArchive < oldest) {
updated = oldestArchiveDate.compareAndSet(oldest, oldestContainerArchive);
if (updated && LOG.isDebugEnabled()) {
LOG.debug("Oldest Archive Date is now {}", new Date(oldestContainerArchive));
}
} else {
updated = true;
}
} while (!updated);
}
} catch (final Throwable t) {
LOG.error("Failed to cleanup archive for container {} due to {}", containerName, t.toString());
LOG.error("", t);
}
}
}
private class ContainerState {
private final String containerName;
private final AtomicLong archivedFileCount = new AtomicLong(0L);
private final long backPressureBytes;
private final long capacity;
private final boolean archiveEnabled;
private final Lock lock = new ReentrantLock();
private final Condition condition = lock.newCondition();
private volatile long bytesUsed = 0L;
public ContainerState(final String containerName, final boolean archiveEnabled, final long backPressureBytes, final long capacity) {
this.containerName = containerName;
this.archiveEnabled = archiveEnabled;
this.backPressureBytes = backPressureBytes;
this.capacity = capacity;
}
/**
* @return {@code true} if wait is required to create claims against this Container, based on whether or not the container has reached its back pressure threshold
*/
public boolean isWaitRequired() {
if (!archiveEnabled) {
return false;
}
long used = bytesUsed;
if (used == 0L) {
try {
final long free = getContainerUsableSpace(containerName);
used = capacity - free;
bytesUsed = used;
} catch (final IOException e) {
return false;
}
}
return used >= backPressureBytes && archivedFileCount.get() > 0;
}
public void waitForArchiveExpiration() {
if (!archiveEnabled) {
return;
}
lock.lock();
try {
while (isWaitRequired()) {
try {
LOG.info("Unable to write to container {} due to archive file size constraints; waiting for archive cleanup", containerName);
condition.await();
} catch (final InterruptedException e) {
}
}
} finally {
lock.unlock();
}
}
public void signalCreationReady() {
if (!archiveEnabled) {
return;
}
lock.lock();
try {
try {
final long free = getContainerUsableSpace(containerName);
bytesUsed = capacity - free;
} catch (final Exception e) {
bytesUsed = 0L;
}
LOG.debug("Container {} signaled to allow Content Claim Creation", containerName);
condition.signal();
} finally {
lock.unlock();
}
}
public void incrementArchiveCount() {
archivedFileCount.incrementAndGet();
}
public void decrementArchiveCount() {
archivedFileCount.decrementAndGet();
}
}
private static class ClaimLengthPair {
private final ResourceClaim claim;
private final Long length;
public ClaimLengthPair(final ResourceClaim claim, final Long length) {
this.claim = claim;
this.length = length;
}
public ResourceClaim getClaim() {
return claim;
}
public Long getLength() {
return length;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + (claim == null ? 0 : claim.hashCode());
return result;
}
/**
* Equality is determined purely by the ResourceClaim's equality
*
* @param obj the object to compare against
* @return -1, 0, or +1 according to the contract of Object.equals
*/
@Override
public boolean equals(final Object obj) {
if (this == obj) {
return true;
}
if (obj == null) {
return false;
}
if (getClass() != obj.getClass()) {
return false;
}
final ClaimLengthPair other = (ClaimLengthPair) obj;
return claim.equals(other.getClaim());
}
}
}