| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package org.apache.jackrabbit.oak.plugins.index.lucene.directory; |
| |
| import java.io.BufferedInputStream; |
| import java.io.BufferedOutputStream; |
| import java.io.BufferedWriter; |
| import java.io.File; |
| import java.io.FileInputStream; |
| import java.io.FileOutputStream; |
| import java.io.IOException; |
| import java.io.InputStream; |
| import java.io.OutputStream; |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Properties; |
| import java.util.concurrent.BlockingQueue; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.LinkedBlockingQueue; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| |
| import com.google.common.base.Charsets; |
| import com.google.common.base.Joiner; |
| import com.google.common.collect.Lists; |
| import com.google.common.io.Closeables; |
| import com.google.common.io.Files; |
| import org.apache.commons.io.FileUtils; |
| import org.apache.commons.io.LineIterator; |
| import org.apache.commons.io.filefilter.IOFileFilter; |
| import org.apache.commons.io.filefilter.RegexFileFilter; |
| import org.apache.jackrabbit.core.data.DataStoreException; |
| import org.apache.jackrabbit.oak.commons.FileIOUtils; |
| import org.apache.jackrabbit.oak.commons.PerfLogger; |
| import org.apache.jackrabbit.oak.plugins.blob.BlobTrackingStore; |
| import org.apache.jackrabbit.oak.plugins.blob.datastore.BlobTracker; |
| import org.apache.jackrabbit.oak.plugins.blob.datastore.BlobTracker.Options; |
| import org.apache.jackrabbit.oak.plugins.index.IndexCommitCallback; |
| import org.apache.jackrabbit.oak.spi.blob.GarbageCollectableBlobStore; |
| import org.apache.jackrabbit.oak.stats.Clock; |
| import org.jetbrains.annotations.NotNull; |
| import org.slf4j.Logger; |
| import org.slf4j.LoggerFactory; |
| |
| import static com.google.common.base.Preconditions.checkArgument; |
| |
| public class ActiveDeletedBlobCollectorFactory { |
| public interface ActiveDeletedBlobCollector { |
| /** |
| * @return an instance of {@link BlobDeletionCallback} that can be used to track deleted blobs |
| */ |
| BlobDeletionCallback getBlobDeletionCallback(); |
| |
| void purgeBlobsDeleted(long before, GarbageCollectableBlobStore blobStore); |
| |
| void cancelBlobCollection(); |
| |
| void flagActiveDeletionUnsafe(boolean toFlag); |
| |
| boolean isActiveDeletionUnsafe(); |
| } |
| |
| public static ActiveDeletedBlobCollector NOOP = new ActiveDeletedBlobCollector() { |
| private volatile boolean activeDeletionUnsafe = false; |
| |
| @Override |
| public BlobDeletionCallback getBlobDeletionCallback() { |
| return BlobDeletionCallback.NOOP; |
| } |
| |
| @Override |
| public void purgeBlobsDeleted(long before, GarbageCollectableBlobStore blobStore) { |
| |
| } |
| |
| @Override |
| public void cancelBlobCollection() { |
| |
| } |
| |
| @Override |
| public void flagActiveDeletionUnsafe(boolean toFlag) { |
| activeDeletionUnsafe = toFlag; |
| } |
| |
| @Override |
| public boolean isActiveDeletionUnsafe() { |
| return activeDeletionUnsafe; |
| } |
| }; |
| |
| public interface BlobDeletionCallback extends IndexCommitCallback { |
| /** |
| * Tracks deleted blobs. From the pov of this interface, blobId is an opaque string |
| * that needs to be tracked. |
| * @param blobId blobId representing deleted blob. In theory, it has nothing to do with |
| * blobs though. |
| * @param ids Information that can be useful for debugging - this is not used for purging |
| * blobs. |
| */ |
| void deleted(String blobId, Iterable<String> ids); |
| |
| boolean isMarkingForActiveDeletionUnsafe(); |
| |
| BlobDeletionCallback NOOP = new BlobDeletionCallback() { |
| @Override |
| public void deleted(String blobId, Iterable<String> ids) { |
| } |
| |
| @Override |
| public void commitProgress(IndexProgress indexProgress) { |
| } |
| |
| @Override |
| public boolean isMarkingForActiveDeletionUnsafe() { |
| return ActiveDeletedBlobCollectorFactory.NOOP.isActiveDeletionUnsafe(); |
| } |
| }; |
| } |
| |
| public static ActiveDeletedBlobCollector newInstance(@NotNull File rootDirectory, |
| ExecutorService executorService) { |
| try { |
| FileUtils.forceMkdir(rootDirectory); |
| } catch (IOException ioe) { |
| ActiveDeletedBlobCollectorImpl.LOG.warn("Disabling active blob collector as we couldn't not create folder: " |
| + rootDirectory, ioe); |
| return NOOP; |
| } |
| if(!rootDirectory.canRead() || !rootDirectory.canWrite() || !rootDirectory.canExecute()) { |
| ActiveDeletedBlobCollectorImpl.LOG.warn("Insufficient access in directory - {}. Disabling active blob collector", |
| rootDirectory); |
| return NOOP; |
| } |
| return new ActiveDeletedBlobCollectorImpl(rootDirectory, executorService); |
| } |
| |
| /** |
| * Blob collector which takes *no* guarantees about checking whether the |
| * blob might be referred by paths other than one for which it is notified |
| * due deleted blob |
| */ |
| static class ActiveDeletedBlobCollectorImpl implements ActiveDeletedBlobCollector { |
| private static PerfLogger PERF_LOG = new PerfLogger( |
| LoggerFactory.getLogger(ActiveDeletedBlobCollectorImpl.class.getName() + ".perf")); |
| private static Logger LOG = LoggerFactory.getLogger(ActiveDeletedBlobCollectorImpl.class.getName()); |
| |
| private final Clock clock; |
| |
| private final File rootDirectory; |
| |
| private final ExecutorService executorService; |
| |
| private volatile boolean cancelled; |
| private volatile boolean activeDeletionUnsafe = false; |
| |
| |
| private static final String BLOB_FILE_PATTERN_PREFIX = "blobs-"; |
| private static final String BLOB_FILE_PATTERN_SUFFIX = ".txt"; |
| private static final String BLOB_FILE_PATTERN = BLOB_FILE_PATTERN_PREFIX + "%s" + BLOB_FILE_PATTERN_SUFFIX; |
| private static final IOFileFilter blobFileNameFilter = new RegexFileFilter("blobs-.*\\.txt"); |
| |
| private final BlockingQueue<BlobIdInfoStruct> deletedBlobs; |
| private final DeletedBlobsFileWriter deletedBlobsFileWriter; |
| |
| /** |
| * @param rootDirectory directory that may be used by this instance to |
| * keep temporary data (e.g. reported deleted blob-ids). |
| * @param executorService executor service to asynchronously flush deleted blobs |
| * to a file. |
| */ |
| ActiveDeletedBlobCollectorImpl(@NotNull File rootDirectory, @NotNull ExecutorService executorService) { |
| this(Clock.SIMPLE, rootDirectory, executorService); |
| } |
| |
| ActiveDeletedBlobCollectorImpl(Clock clock, @NotNull File rootDirectory, |
| @NotNull ExecutorService executorService) { |
| this.clock = clock; |
| this.rootDirectory = rootDirectory; |
| this.executorService = executorService; |
| this.deletedBlobs = new LinkedBlockingQueue<>(100000); |
| this.deletedBlobsFileWriter = new DeletedBlobsFileWriter(); |
| } |
| |
| /** |
| * Purges blobs form blob-store which were tracked earlier to deleted. |
| * @param before only purge blobs which were deleted before this timestamps |
| * @param blobStore used to purge blobs/chunks |
| */ |
| public void purgeBlobsDeleted(long before, @NotNull GarbageCollectableBlobStore blobStore) { |
| cancelled = false; |
| long start = clock.getTime(); |
| LOG.info("Starting purge of blobs deleted before {}", before); |
| long numBlobsDeleted = 0; |
| long numChunksDeleted = 0; |
| |
| File idTempDeleteFile = null; |
| BufferedWriter idTempDeleteWriter = null; |
| // If blob store support blob tracking |
| boolean blobIdsTracked = blobStore instanceof BlobTrackingStore; |
| |
| if (blobIdsTracked) { |
| try { |
| idTempDeleteFile = File.createTempFile("idTempDelete", null, rootDirectory); |
| idTempDeleteWriter = Files.newWriter(idTempDeleteFile, Charsets.UTF_8); |
| } catch (Exception e) { |
| LOG.warn("Unable to open a writer to a temp file, will ignore tracker sync"); |
| blobIdsTracked = false; |
| } |
| } |
| |
| long lastCheckedBlobTimestamp = readLastCheckedBlobTimestamp(); |
| long lastDeletedBlobTimestamp = lastCheckedBlobTimestamp; |
| String currInUseFileName = deletedBlobsFileWriter.inUseFileName; |
| deletedBlobsFileWriter.releaseInUseFile(); |
| for (File deletedBlobListFile : FileUtils.listFiles(rootDirectory, blobFileNameFilter, null)) { |
| if (cancelled) { |
| break; |
| } |
| if (deletedBlobListFile.getName().equals(deletedBlobsFileWriter.inUseFileName)) { |
| continue; |
| } |
| LOG.debug("Purging blobs from {}", deletedBlobListFile); |
| long timestamp; |
| try { |
| timestamp = getTimestampFromBlobFileName(deletedBlobListFile.getName()); |
| } catch (IllegalArgumentException iae) { |
| LOG.warn("Couldn't extract timestamp from filename - " + deletedBlobListFile, iae); |
| continue; |
| } |
| if (timestamp < before) { |
| LineIterator blobLineIter = null; |
| try { |
| blobLineIter = FileUtils.lineIterator(deletedBlobListFile); |
| while (blobLineIter.hasNext()) { |
| if (cancelled) { |
| break; |
| } |
| String deletedBlobLine = blobLineIter.next(); |
| |
| String[] parsedDeletedBlobIdLine = deletedBlobLine.split("\\|", 3); |
| if (parsedDeletedBlobIdLine.length != 3) { |
| LOG.warn("Unparseable line ({}) in file {}. It won't be retried.", |
| parsedDeletedBlobIdLine, deletedBlobListFile); |
| } else { |
| String deletedBlobId = parsedDeletedBlobIdLine[0]; |
| try { |
| long blobDeletionTimestamp = Long.valueOf(parsedDeletedBlobIdLine[1]); |
| |
| if (blobDeletionTimestamp < lastCheckedBlobTimestamp) { |
| continue; |
| } |
| |
| if (blobDeletionTimestamp >= before) { |
| break; |
| } |
| |
| lastDeletedBlobTimestamp = Math.max(lastDeletedBlobTimestamp, blobDeletionTimestamp); |
| |
| List<String> chunkIds = Lists.newArrayList(blobStore.resolveChunks(deletedBlobId)); |
| if (chunkIds.size() > 0) { |
| long deleted = blobStore.countDeleteChunks(chunkIds, 0); |
| if (deleted < 1) { |
| LOG.warn("Blob {} in file {} not deleted", deletedBlobId, deletedBlobListFile); |
| } else { |
| numBlobsDeleted++; |
| numChunksDeleted += deleted; |
| |
| if (blobIdsTracked) { |
| // Save deleted chunkIds to a temporary file |
| for (String id : chunkIds) { |
| FileIOUtils.writeAsLine(idTempDeleteWriter, id, true); |
| } |
| } |
| } |
| } |
| } catch (NumberFormatException nfe) { |
| LOG.warn("Couldn't parse blobTimestamp(" + parsedDeletedBlobIdLine[1] + |
| "). deletedBlobLine - " + deletedBlobLine + |
| "; file - " + deletedBlobListFile.getName(), nfe); |
| } catch (DataStoreException dse) { |
| LOG.debug("Exception occurred while attempting to delete blob " + deletedBlobId, dse); |
| } catch (Exception e) { |
| LOG.warn("Exception occurred while attempting to delete blob " + deletedBlobId, e); |
| } |
| } |
| } |
| } catch (IOException ioe) { |
| //log error and continue |
| LOG.warn("Couldn't read deleted blob list file - " + deletedBlobListFile, ioe); |
| } finally { |
| LineIterator.closeQuietly(blobLineIter); |
| } |
| |
| // OAK-6314 revealed that blobs appended might not be immediately available. So, we'd skip |
| // the file that was being processed when purge started - next cycle would re-process and |
| // delete |
| if (!deletedBlobListFile.getName().equals(currInUseFileName)) { |
| if (!deletedBlobListFile.delete()) { |
| LOG.warn("File {} couldn't be deleted while all blobs listed in it have been purged", deletedBlobListFile); |
| } else { |
| LOG.debug("File {} deleted", deletedBlobListFile); |
| } |
| } |
| } else { |
| LOG.debug("Skipping {} as its timestamp is newer than {}", deletedBlobListFile.getName(), before); |
| } |
| } |
| |
| long startBlobTrackerSyncTime = clock.getTime(); |
| // Synchronize deleted blob ids with the blob id tracker |
| try { |
| Closeables.close(idTempDeleteWriter, true); |
| |
| if (blobIdsTracked && numBlobsDeleted > 0) { |
| BlobTracker tracker = ((BlobTrackingStore) blobStore).getTracker(); |
| if (tracker != null) { |
| tracker.remove(idTempDeleteFile, Options.ACTIVE_DELETION); |
| } |
| } |
| } catch(Exception e) { |
| LOG.warn("Error refreshing tracked blob ids", e); |
| } |
| long endBlobTrackerSyncTime = clock.getTime(); |
| LOG.info("Synchronizing changes with blob tracker took {} ms", endBlobTrackerSyncTime - startBlobTrackerSyncTime); |
| |
| if (cancelled) { |
| LOG.info("Deletion run cancelled by user"); |
| } |
| long end = clock.getTime(); |
| LOG.info("Deleted {} blobs contained in {} chunks in {} ms", numBlobsDeleted, numChunksDeleted, end - start); |
| writeOutLastCheckedBlobTimestamp(lastDeletedBlobTimestamp); |
| } |
| |
| @Override |
| public void cancelBlobCollection() { |
| cancelled = true; |
| } |
| |
| @Override |
| public void flagActiveDeletionUnsafe(boolean toFlag) { |
| activeDeletionUnsafe = toFlag; |
| } |
| |
| @Override |
| public boolean isActiveDeletionUnsafe() { |
| return activeDeletionUnsafe; |
| } |
| |
| private long readLastCheckedBlobTimestamp() { |
| File blobCollectorInfoFile = new File(rootDirectory, "collection-info.txt"); |
| if (!blobCollectorInfoFile.exists()) { |
| LOG.debug("Couldn't read last checked blob timestamp (file not found). Would do a bit more scan"); |
| return -1; |
| } |
| InputStream is = null; |
| Properties p; |
| try { |
| is = new BufferedInputStream(new FileInputStream(blobCollectorInfoFile)); |
| p = new Properties(); |
| p.load(is); |
| } catch (IOException e) { |
| LOG.warn("Couldn't read last checked blob timestamp from {} ... would do a bit more scan", |
| blobCollectorInfoFile, e); |
| return -1; |
| } finally { |
| org.apache.commons.io.IOUtils.closeQuietly(is); |
| } |
| |
| String resString = p.getProperty("last-checked-blob-timestamp"); |
| if (resString == null) { |
| LOG.warn("Couldn't fine last checked blob timestamp property in collection-info.txt"); |
| return -1; |
| } |
| |
| try { |
| return Long.valueOf(resString); |
| } catch (NumberFormatException nfe) { |
| LOG.warn("Couldn't read last checked blob timestamp '" + resString + "' as long", nfe); |
| return -1; |
| } |
| } |
| |
| private void writeOutLastCheckedBlobTimestamp(long timestamp) { |
| Properties p = new Properties(); |
| p.setProperty("last-checked-blob-timestamp", String.valueOf(timestamp)); |
| File blobCollectorInfoFile = new File(rootDirectory, "collection-info.txt"); |
| OutputStream os = null; |
| try { |
| os = new BufferedOutputStream(new FileOutputStream(blobCollectorInfoFile)); |
| p.store(os, "Last checked blob timestamp"); |
| } catch (IOException e) { |
| LOG.warn("Couldn't write out last checked blob timestamp(" + timestamp + ")", e); |
| } finally { |
| org.apache.commons.io.IOUtils.closeQuietly(os); |
| } |
| |
| } |
| |
| public BlobDeletionCallback getBlobDeletionCallback() throws IllegalStateException { |
| return new DeletedBlobCollector(); |
| } |
| |
| static long getTimestampFromBlobFileName(String filename) throws IllegalArgumentException { |
| checkArgument(filename.startsWith(BLOB_FILE_PATTERN_PREFIX), |
| "Filename(%s) must start with %s", filename, BLOB_FILE_PATTERN_PREFIX); |
| checkArgument(filename.endsWith(BLOB_FILE_PATTERN_SUFFIX), |
| "Filename(%s) must end with %s", filename, BLOB_FILE_PATTERN_SUFFIX); |
| String timestampStr = filename.substring( |
| BLOB_FILE_PATTERN_PREFIX.length(), |
| filename.length() - BLOB_FILE_PATTERN_SUFFIX.length()); |
| |
| return Long.parseLong(timestampStr); |
| } |
| |
| private void addDeletedBlobs(Collection<BlobIdInfoStruct> deletedBlobs) { |
| int addedForFlush = 0; |
| for (BlobIdInfoStruct info : deletedBlobs) { |
| try { |
| if (!this.deletedBlobs.offer(info, 1, TimeUnit.SECONDS)) { |
| LOG.warn("Timed out while offer-ing {} into queue.", info); |
| } |
| if (LOG.isDebugEnabled()) { |
| addedForFlush++; |
| } |
| } catch (InterruptedException e) { |
| LOG.warn("Interrupted while adding " + info, e); |
| } |
| } |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Added {} (out of {} tried) to be flushed. QSize: {}", |
| addedForFlush, deletedBlobs.size(), this.deletedBlobs.size()); |
| } |
| deletedBlobsFileWriter.scheduleFileFlushIfNeeded(); |
| } |
| |
| private class DeletedBlobsFileWriter implements Runnable { |
| private final AtomicBoolean fileFlushScheduled = new AtomicBoolean(false); |
| |
| private volatile String inUseFileName = null; |
| |
| private synchronized void flushDeletedBlobs() { |
| List<BlobIdInfoStruct> localDeletedBlobs = new LinkedList<>(); |
| deletedBlobs.drainTo(localDeletedBlobs); |
| if (localDeletedBlobs.size() > 0) { |
| File outFile = new File(rootDirectory, getBlobFileName()); |
| try { |
| long start = PERF_LOG.start(); |
| FileUtils.writeLines(outFile, localDeletedBlobs, true); |
| PERF_LOG.end(start, 1, "Flushing deleted blobs"); |
| } catch (IOException e) { |
| LOG.error("Couldn't write out to " + outFile, e); |
| } |
| if (LOG.isDebugEnabled()) { |
| LOG.debug("Flushed {} blobs to {}", localDeletedBlobs.size(), outFile.getName()); |
| } |
| } |
| } |
| |
| private void scheduleFileFlushIfNeeded() { |
| if (fileFlushScheduled.compareAndSet(false, true)) { |
| executorService.submit(this); |
| } |
| } |
| |
| private synchronized void releaseInUseFile() { |
| inUseFileName = null; |
| } |
| |
| @Override |
| public void run() { |
| flushDeletedBlobs(); |
| fileFlushScheduled.set(false); |
| } |
| |
| private String getBlobFileName() { |
| if (inUseFileName == null) { |
| inUseFileName = String.format(BLOB_FILE_PATTERN, clock.getTime()); |
| } |
| return inUseFileName; |
| } |
| } |
| |
| /** |
| * This implementation would track deleted blobs and then pass them onto |
| * {@link ActiveDeletedBlobCollectorImpl} on a successful commit |
| */ |
| private class DeletedBlobCollector implements BlobDeletionCallback { |
| List<BlobIdInfoStruct> deletedBlobs = new ArrayList<>(); |
| |
| @Override |
| public void deleted(String blobId, Iterable<String> ids) { |
| deletedBlobs.add(new BlobIdInfoStruct(blobId, ids)); |
| } |
| |
| @Override |
| public void commitProgress(IndexProgress indexProgress) { |
| if (indexProgress != IndexProgress.COMMIT_SUCCEDED && indexProgress != IndexProgress.COMMIT_FAILED) { |
| LOG.debug("We only care for commit success/failure"); |
| return; |
| } |
| if (indexProgress == IndexProgress.COMMIT_SUCCEDED) { |
| addDeletedBlobs(deletedBlobs); |
| } |
| |
| deletedBlobs.clear(); |
| } |
| |
| @Override |
| public boolean isMarkingForActiveDeletionUnsafe() { |
| return activeDeletionUnsafe; |
| } |
| } |
| |
| private class BlobIdInfoStruct { |
| final String blobId; |
| final Iterable<String> ids; |
| |
| BlobIdInfoStruct(String blobId, Iterable<String> ids) { |
| this.blobId = blobId; |
| this.ids = ids; |
| } |
| |
| @Override |
| public String toString() { |
| return String.format("%s|%s|%s", blobId, clock.getTime(), Joiner.on("|").join(ids)); |
| } |
| } |
| } |
| } |